はじめに
vNextワークフローを使用するには、まずvNextモジュールから必要な関数をインポートします:
import { createWorkflow, createStep } from "@mastra/core/workflows/vNext";
import { z } from "zod"; // For schema validation
主要な概念
vNextワークフローは以下で構成されています:
- スキーマ:Zodを使用した入力と出力の型定義
- ステップ:定義された入力と出力を持つ個々の作業単位
- ワークフロー:特定のタスクやプロセスを達成するために設計された、構造化されたステップのシーケンス。ワークフローはステップの順序、依存関係、実行ロジックを定義し、それ自体が他のワークフロー内のステップとして再利用できます。
- ワークフロー実行フロー:ワークフロー実行中に従うパスとロジック。ステップの実行方法、データの移動方法、条件分岐やエラー処理の方法などが含まれます。
スキーマはZodを使用して、ステップとワークフローの入力と出力の両方に対して定義されます。スキーマはまた、ステップが一時停止状態から再開する際に取得するデータや、ステップの実行を一時停止する際に渡すべき文脈情報を指定することもできます。
接続されているステップの入力と出力は一致する必要があります:例えば、あるステップのinputSchemaは前のステップのoutputSchemaと同じであるべきです。同様に、ワークフローを他のワークフローのステップとして使用する場合、ワークフローのinputSchemaはそれが使用されるステップのoutputSchemaと一致する必要があります。
ステップは、前のステップからの入力や、ステップが一時停止状態から再開される場合は再開データを含むコンテキストオブジェクトを受け取るexecute
関数を使用して実行されます。execute
関数はそのoutputSchemaに一致する値を返す必要があります。
.then()
、.parallel()
、.branch()
などのプリミティブは、ワークフローの実行フローと、その中のステップがどのように接続されるかを記述します。ワークフロー(単独でもステップとしても)を実行する場合、その実行はexecute
関数ではなく実行フローによって決定されます。ワークフローの最終結果は常にその最後のステップの結果となり、これはワークフローのoutputSchemaと一致する必要があります。
ワークフローの作成
ステップ
ステップはワークフローの構成要素です。createStep
を使用してステップを作成します:
// 定義された入力/出力スキーマと実行ロジックを持つステップを作成
const inputSchema = z.object({
inputValue: z.string(),
});
const myStep = createStep({
id: "my-step",
description: "何か役立つことを行う",
inputSchema,
outputSchema: z.object({
outputValue: z.string(),
}),
// オプション: ステップ再開のための再開スキーマを定義
resumeSchema: z.object({
resumeValue: z.string(),
}),
// オプション: ステップ一時停止のための一時停止スキーマを定義
suspendSchema: z.object({
suspendValue: z.string(),
}),
execute: async ({
inputData,
mastra,
getStepResult,
getInitData,
runtimeContext,
}) => {
const otherStepOutput = getStepResult(step2);
const initData = getInitData<typeof inputSchema>(); // 入力スキーマ変数(zodスキーマ)として型付け
return {
outputValue: `処理済み: ${inputData.inputValue}, ${initData.startValue} (runtimeContextValue: ${runtimeContext.get("runtimeContextValue")})`,
};
},
});
各ステップには以下が必要です:
id
: ステップの一意の識別子inputSchema
: 予想される入力を定義するZodスキーマoutputSchema
: 出力の形を定義するZodスキーマresumeSchema
: オプション。再開入力を定義するZodスキーマsuspendSchema
: オプション。一時停止入力を定義するZodスキーマexecute
: ステップの作業を実行する非同期関数
execute
関数は以下を含むコンテキストオブジェクトを受け取ります:
inputData
: inputSchemaに一致する入力データresumeData
: 一時停止状態からステップを再開する際のresumeSchemaに一致する再開データ。ステップが再開される場合にのみ存在します。mastra
: mastraサービス(エージェント、ツールなど)へのアクセスgetStepResult
: 他のステップの結果にアクセスするための関数getInitData
: どのステップでもワークフローの初期入力データにアクセスするための関数suspend
: ワークフロー実行を一時停止するための関数(ユーザーとのインタラクション用)
ワークフロー構造
createWorkflow
を使用してワークフローを作成します:
// 定義されたステップと実行フローを持つワークフローを作成
const myWorkflow = createWorkflow({
id: "my-workflow",
// 予想される入力構造を定義(最初のステップのinputSchemaと一致する必要があります)
inputSchema: z.object({
startValue: z.string(),
}),
// 予想される出力構造を定義(最後のステップのoutputSchemaと一致する必要があります)
outputSchema: z.object({
result: z.string(),
}),
steps: [step1, step2, step3], // このワークフローで使用されるステップを宣言
})
.then(step1)
.then(step2)
.then(step3)
.commit();
// ワークフローをMastraインスタンスに登録
const mastra = new Mastra({
vnext_workflows: {
myWorkflow,
},
});
// ワークフローの実行インスタンスを作成
const run = mastra.vnext_getWorkflow("myWorkflow").createRun();
ワークフローオプションのsteps
プロパティは、ステップ結果へのアクセスに対して型安全性を提供します。ワークフローで使用するステップを宣言すると、TypeScriptはresult.steps
にアクセスする際の型安全性を確保します:
// ワークフローオプションでステップを宣言
const workflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({}),
outputSchema: z.object({}),
steps: [step1, step2], // TypeScriptはこれらのステップが存在することを知っています
})
.then(step1)
.then(step2)
.commit();
const result = await workflow.createRun().start({ inputData: {} });
if (result.status === "success") {
console.log(result.result); // ステータスがsuccessの場合のみ存在します
} else if (result.status === "failed") {
console.error(result.error); // ステータスがfailedの場合のみ存在します、これはErrorのインスタンスです
throw result.error;
} else if (result.status === "suspended") {
console.log(result.suspended); // ステータスがsuspendedの場合のみ存在します
}
// TypeScriptはこれらのプロパティが存在し、その型を知っています
console.log(result.steps.step1.output); // 完全に型付けされています
console.log(result.steps.step2.output); // 完全に型付けされています
ワークフロー定義には以下が必要です:
id
: ワークフローの一意の識別子inputSchema
: ワークフロー入力を定義するZodスキーマoutputSchema
: ワークフロー出力を定義するZodスキーマsteps
: ワークフローで使用されるステップの配列(オプションですが、型安全性のために推奨)
ステップとネストされたワークフローの再利用
ステップとネストされたワークフローは、クローンすることで再利用できます:
const clonedStep = cloneStep(myStep, { id: "cloned-step" });
const clonedWorkflow = cloneWorkflow(myWorkflow, { id: "cloned-workflow" });
このようにして、同じステップまたはネストされたワークフローを同じワークフロー内で複数回使用することができます。
import {
createWorkflow,
createStep,
cloneStep,
cloneWorkflow,
} from "@mastra/core/workflows/vNext";
const myWorkflow = createWorkflow({
id: "my-workflow",
steps: [step1, step2, step3],
});
myWorkflow.then(step1).then(step2).then(step3).commit();
const parentWorkflow = createWorkflow({
id: "parent-workflow",
steps: [myWorkflow, step4],
});
parentWorkflow
.then(myWorkflow) // nested workflow
.then(step4)
.then(cloneWorkflow(myWorkflow, { id: "cloned-workflow" })) // cloned workflow
.then(cloneStep(step4, { id: "cloned-step-4" })) // cloned step
.commit();
ワークフローの実行
ワークフローを定義した後、以下のように実行します:
// 実行インスタンスを作成
const run = myWorkflow.createRun();
// 入力データでワークフローを開始
const result = await run.start({
inputData: {
startValue: "initial data",
},
});
// 結果にアクセス
console.log(result.steps); // すべてのステップ結果
console.log(result.steps["step-id"].output); // 特定のステップからの出力
if (result.status === "success") {
console.log(result.result); // ワークフローの最終結果、最後のステップの結果(または最後のステップとして`.map()`が使用された場合はその出力)
} else if (result.status === "suspended") {
const resumeResult = await run.resume({
step: result.suspended[0], // 一時停止された実行パスには常に少なくとも1つのステップIDがあります。この場合、最初の一時停止された実行パスを再開します
resumeData: {
/* ユーザー入力 */
},
});
} else if (result.status === "failed") {
console.error(result.error); // ステータスが失敗の場合にのみ存在し、これはErrorのインスタンスです
}
注意: run
インスタンスを分割代入しないでください
つまり、以下のようにしないでください
const { start, resume, ... } = myWorkflow.createRun();
// または
const run = myWorkflow.createRun();
const { start, resume, ... } = run;
これはワークフローとの接続を切断し、実行を妨げます
ワークフロー実行結果のスキーマ
ワークフローの実行結果(start()
またはresume()
からの)は、次のTypeScriptインターフェースに従います:
export type WorkflowResult<...> =
| {
status: 'success';
result: z.infer<TOutput>;
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
}
| {
status: 'failed';
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
error: Error;
}
| {
status: 'suspended';
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
suspended: [string[], ...string[][]];
};
結果プロパティの説明
-
status: ワークフロー実行の最終状態を示します
'success'
: ワークフローが正常に完了'failed'
: ワークフローでエラーが発生'suspended'
: ワークフローがユーザー入力を待機して一時停止中
-
result: ワークフローの最終出力を含み、ワークフローの
outputSchema
に従って型付けされます -
suspended: 現在一時停止中のステップIDの配列(オプション)。
status
が'suspended'
の場合のみ存在します -
steps: 実行されたすべてのステップの結果を含むレコード
- キーはステップID
- 値はステップの出力を含む
StepResult
オブジェクト - 各ステップの
outputSchema
に基づいて型安全
-
error:
status
が'failed'
の場合に存在するエラーオブジェクト(オプション)
ワークフロー実行の監視
ワークフロー実行を監視することもできます:
const run = myWorkflow.createRun();
// 実行を監視するウォッチャーを追加
run.watch(event => {
console.log('ステップ完了:', event.payload.currentStep.id);
});
// ワークフローを開始
const result = await run.start({ inputData: {...} });
event
オブジェクトは以下のスキーマを持っています:
type WatchEvent = {
type: "watch";
payload: {
currentStep?: {
id: string;
status: "running" | "completed" | "failed" | "suspended";
output?: Record<string, any>;
payload?: Record<string, any>;
};
workflowState: {
status: "running" | "success" | "failed" | "suspended";
steps: Record<
string,
{
status: "running" | "completed" | "failed" | "suspended";
output?: Record<string, any>;
payload?: Record<string, any>;
}
>;
result?: Record<string, any>;
error?: Record<string, any>;
payload?: Record<string, any>;
};
};
eventTimestamp: Date;
};
currentStep
プロパティはワークフローの実行中にのみ存在します。ワークフローが終了すると、workflowState
のステータスが変更され、result
およびerror
プロパティも更新されます。同時にcurrentStep
プロパティは削除されます。