はじめに
vNextワークフローを使用するには、まずvNextモジュールから必要な関数をインポートします:
import { createWorkflow, createStep } from "@mastra/core/workflows/vNext";
import { z } from "zod"; // For schema validation
主要な概念
vNextワークフローは以下で構成されています:
- スキーマ:Zodを使用した入力と出力の型定義
- ステップ:定義された入力と出力を持つ個々の作業単位
- ワークフロー:定義された実行パターンを持つステップのオーケストレーション。ワークフローもステップであり、他のワークフローでステップとして使用できます。
- ワークフロー実行フロー:ステップがどのように実行され、相互に接続されるか
スキーマはZodを使用して、ステップとワークフローの入力と出力の両方に対して定義されます。スキーマはまた、ステップが一時停止状態から再開する際に取得するデータや、ステップの実行を一時停止する際に渡すべき文脈情報を指定することもできます。
接続されているステップの入力と出力は一致する必要があります:例えば、あるステップのinputSchemaは前のステップのoutputSchemaと同じであるべきです。同様に、ワークフローを他のワークフローでステップとして使用する場合、ワークフローのinputSchemaは、それが使用されるステップのoutputSchemaと一致する必要があります。
ステップは、前のステップからの入力や、ステップが一時停止状態から再開される場合は再開データを含むコンテキストオブジェクトを受け取るexecute
関数を使用して実行されます。execute
関数はそのoutputSchemaに一致する値を返す必要があります。
.then()
、.parallel()
、.branch()
などのプリミティブは、ワークフローの実行フローと、その中のステップがどのように接続されるかを記述します。ワークフロー(単独でもステップとしても)を実行する場合、その実行はexecute
関数ではなく、実行フローによって決定されます。ワークフローの最終結果は常に最後のステップの結果となり、これはワークフローのoutputSchemaと一致する必要があります。
ワークフローの作成
ステップ
ステップはワークフローの構成要素です。createStep
を使用してステップを作成します:
const myStep = createStep({
id: "my-step",
description: "Does something useful",
inputSchema: z.object({
inputValue: z.string(),
}),
outputSchema: z.object({
outputValue: z.string(),
}),
resumeSchema: z.object({
resumeValue: z.string(),
}),
suspendSchema: z.object({
suspendValue: z.string(),
}),
execute: async ({
inputData,
mastra,
getStepResult,
getInitData,
runtimeContext,
}) => {
const otherStepOutput = getStepResult(step2);
const initData = getInitData<typeof workflow>(); // typed as the workflow input schema
return {
outputValue: `Processed: ${inputData.inputValue}, ${initData.startValue} (runtimeContextValue: ${runtimeContext.get("runtimeContextValue")})`,
};
},
});
各ステップには以下が必要です:
id
: ステップの一意の識別子inputSchema
: 予想される入力を定義するZodスキーマoutputSchema
: 出力の形を定義するZodスキーマresumeSchema
: オプション。再開入力を定義するZodスキーマsuspendSchema
: オプション。一時停止入力を定義するZodスキーマexecute
: ステップの作業を実行する非同期関数
execute
関数は以下を含むコンテキストオブジェクトを受け取ります:
inputData
: inputSchemaに一致する入力データresumeData
: 一時停止状態からステップを再開する際に、resumeSchemaに一致する再開データ。ステップが再開される場合にのみ存在します。mastra
: mastraサービス(エージェント、ツールなど)へのアクセスgetStepResult
: 他のステップの結果にアクセスするための関数getInitData
: どのステップでもワークフローの初期入力データにアクセスするための関数suspend
: ワークフロー実行を一時停止するための関数(ユーザーとのインタラクション用)
ワークフロー構造
createWorkflow
を使用してワークフローを作成します:
const myWorkflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({
startValue: z.string(),
}),
outputSchema: z.object({
result: z.string(),
}),
steps: [step1, step2, step3], // Declare steps used in this workflow
})
.then(step1)
.then(step2)
.then(step3)
.commit();
const mastra = new Mastra({
vnext_workflows: {
myWorkflow,
},
});
const run = mastra.vnext_getWorkflow("myWorkflow").createRun();
ワークフローオプションのsteps
プロパティは、ステップ結果へのアクセスに対する型安全性を提供します。ワークフローで使用するステップを宣言すると、TypeScriptはresult.steps
へのアクセス時に型安全性を確保します:
// With steps declared in workflow options
const workflow = createWorkflow({
id: "my-workflow",
inputSchema: z.object({}),
outputSchema: z.object({}),
steps: [step1, step2], // TypeScript knows these steps exist
})
.then(step1)
.then(step2)
.commit();
const result = await workflow.createRun().start({ inputData: {} });
if (result.status === "success") {
console.log(result.result); // only exists if status is success
} else if (result.status === "failed") {
console.error(result.error); // only exists if status is failed, this is an instance of Error
throw result.error;
} else if (result.status === "suspended") {
console.log(result.suspended); // only exists if status is suspended
}
// TypeScript knows these properties exist and their types
console.log(result.steps.step1.output); // Fully typed
console.log(result.steps.step2.output); // Fully typed
ワークフロー定義には以下が必要です:
id
: ワークフローの一意の識別子inputSchema
: ワークフロー入力を定義するZodスキーマoutputSchema
: ワークフロー出力を定義するZodスキーマsteps
: ワークフローで使用されるステップの配列(オプションですが、型安全性のために推奨)
ステップとネストされたワークフローの再利用
ステップとネストされたワークフローをクローンして再利用できます:
const clonedStep = cloneStep(myStep, { id: "cloned-step" });
const clonedWorkflow = cloneWorkflow(myWorkflow, { id: "cloned-workflow" });
このようにして、同じステップやネストされたワークフローを同じワークフロー内で複数回使用できます。
import {
createWorkflow,
createStep,
cloneStep,
cloneWorkflow,
} from "@mastra/core/workflows/vNext";
const myWorkflow = createWorkflow({
id: "my-workflow",
steps: [step1, step2, step3],
});
myWorkflow.then(step1).then(step2).then(step3).commit();
const parentWorkflow = createWorkflow({
id: "parent-workflow",
steps: [myWorkflow, step4],
});
parentWorkflow
.then(myWorkflow)
.then(step4)
.then(cloneWorkflow(myWorkflow, { id: "cloned-workflow" }))
.then(cloneStep(step4, { id: "cloned-step-4" }))
.commit();
ワークフローの実行
ワークフローを定義した後、以下のように実行します:
// 実行インスタンスを作成
const run = myWorkflow.createRun();
// 入力データでワークフローを開始
const result = await run.start({
inputData: {
startValue: "initial data",
},
});
// 結果にアクセス
console.log(result.steps); // すべてのステップの結果
console.log(result.steps["step-id"].output); // 特定のステップからの出力
if (result.status === "success") {
console.log(result.result); // ワークフローの最終結果、最後のステップの結果(または最後のステップとして`.map()`が使用された場合はその出力)
} else if (result.status === "suspended") {
const resumeResult = await run.resume({
step: result.suspended[0], // 一時停止された実行パスには常に少なくとも1つのステップIDがあります。この場合、最初の一時停止された実行パスを再開します
resumeData: {
/* ユーザー入力 */
},
});
} else if (result.status === "failed") {
console.error(result.error); // ステータスが失敗の場合にのみ存在し、これはErrorのインスタンスです
}
ワークフロー実行結果のスキーマ
ワークフローの実行結果(start()
またはresume()
からの)は、次のTypeScriptインターフェースに従います:
export type WorkflowResult<...> =
| {
status: 'success';
result: z.infer<TOutput>;
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
}
| {
status: 'failed';
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
error: Error;
}
| {
status: 'suspended';
steps: {
[K in keyof StepsRecord<TSteps>]: StepsRecord<TSteps>[K]['outputSchema'] extends undefined
? StepResult<unknown>
: StepResult<z.infer<NonNullable<StepsRecord<TSteps>[K]['outputSchema']>>>;
};
suspended: [string[], ...string[][]];
};
結果プロパティの説明
-
status: ワークフロー実行の最終状態を示します
'success'
: ワークフローが正常に完了'failed'
: ワークフローでエラーが発生'suspended'
: ワークフローがユーザー入力を待機して一時停止中
-
result: ワークフローの最終出力を含み、ワークフローの
outputSchema
に従って型付けされます -
suspended: 現在一時停止中のステップIDの配列(オプション)。
status
が'suspended'
の場合のみ存在します -
steps: 実行されたすべてのステップの結果を含むレコード
- キーはステップID
- 値はステップの出力を含む
StepResult
オブジェクト - 各ステップの
outputSchema
に基づいて型安全
-
error:
status
が'failed'
の場合に存在するエラーオブジェクト(オプション)
ワークフロー実行の監視
ワークフロー実行を監視することもできます:
const run = myWorkflow.createRun();
// 実行を監視するウォッチャーを追加
run.watch(event => {
console.log('ステップ完了:', event.payload.currentStep.id);
});
// ワークフローを開始
const result = await run.start({ inputData: {...} });
event
オブジェクトは以下のスキーマを持っています:
type WatchEvent = {
type: "watch";
payload: {
currentStep?: {
id: string;
status: "running" | "completed" | "failed" | "suspended";
output?: Record<string, any>;
payload?: Record<string, any>;
};
workflowState: {
status: "running" | "success" | "failed" | "suspended";
steps: Record<
string,
{
status: "running" | "completed" | "failed" | "suspended";
output?: Record<string, any>;
payload?: Record<string, any>;
}
>;
result?: Record<string, any>;
error?: Record<string, any>;
payload?: Record<string, any>;
};
};
eventTimestamp: Date;
};
currentStep
プロパティはワークフローの実行中にのみ存在します。ワークフローが終了すると、workflowState
のステータスが変更され、result
およびerror
プロパティも更新されます。同時にcurrentStep
プロパティは削除されます。