ワークフローにおける一時停止と再開
複雑なワークフローでは、外部からの入力やリソースを待つために実行を一時停止する必要がしばしばあります。
Mastraの一時停止と再開機能により、任意のステップでワークフロー実行を一時停止し、ワークフローのスナップショットをストレージに保存し、準備ができたら保存されたスナップショットから実行を再開することができます。 このプロセス全体はMastraによって自動的に管理されます。設定は不要で、ユーザーからの手動の操作も必要ありません。
ワークフローのスナップショットをストレージに保存することで、ワークフローの状態はセッション、デプロイメント、サーバーの再起動を超えて永続的に保存されます。この永続性は、外部からの入力やリソースを待つために数分、数時間、あるいは数日間一時停止したままになる可能性のあるワークフローにとって非常に重要です。
ワークフローの一時停止と再開を使用するタイミング
ワークフローを一時停止する一般的なシナリオには以下が含まれます:
- 人間の承認や入力を待つ
- 外部APIリソースが利用可能になるまで待機する
- 後のステップに必要な追加データを収集する
- 高コストな操作のレート制限やスロットリング
- 外部トリガーによるイベント駆動プロセスの処理
ステップを一時停止する方法
// suspend/resume機能を持つ人間の入力を処理するステップを作成する
const humanInputStep = createStep({
id: "human-input",
inputSchema: z.object({
suggestions: z.array(z.string()),
vacationDescription: z.string(),
}),
// ステップを再開するために必要なデータの構造を定義
resumeSchema: z.object({
selection: z.string(),
}),
// 一時停止時のデータの構造を定義(この場合は空)
suspendSchema: z.object({}),
outputSchema: z.object({
selection: z.string().describe("The selection of the user"),
vacationDescription: z.string(),
}),
execute: async ({ inputData, resumeData, suspend }) => {
// 再開データが提供されていない場合、ステップを一時停止してユーザー入力を待つ
if (!resumeData?.selection) {
await suspend({});
return {
selection: "",
vacationDescription: inputData?.vacationDescription,
};
}
return {
selection: resumeData.selection,
vacationDescription: inputData?.vacationDescription,
};
},
});
ステップ実行の再開方法
一時停止状態の識別
ワークフローを実行する際、その状態は以下のいずれかになります:
running
- ワークフローが現在実行中suspended
- ワークフローが一時停止中success
- ワークフローが完了failed
- ワークフローが失敗
状態がsuspended
の場合、ワークフローのsuspended
プロパティを確認することで、一時停止されているすべてのステップを識別できます。
const run = counterWorkflow.createRun();
const result = await run.start({ inputData: { startValue: 0 } });
// Check if the workflow is in suspended state
if (result.status === "suspended") {
// Resume the first suspended step with new data
const resumedResults = await run.resume({
step: result.suspended[0],
resumeData: { newValue: 0 },
});
}
この場合、一時停止として報告された最初のステップを再開するロジックとなっています。
suspended
プロパティはstring[][]
型で、各配列は一時停止されたステップへのパスを表します。最初の要素はメインワークフローのステップIDです。そのステップ自体がワークフローである場合、2番目の要素はネストされたワークフロー内の一時停止されたステップIDとなります。さらにそれがワークフローである場合は、3番目の要素がネストされたワークフロー内の一時停止されたステップIDとなり、以降も同様です。
再開
// Resume a suspended workflow with user input
const result = await workflowRun.resume({
step: userInputStep, // or 'myStepId' as a string
resumeData: {
userSelection: "User's choice",
},
});
ネストされた一時停止中のワークフローを再開するには:
// Resume a suspended nested workflow with user input
const result = await workflowRun.resume({
step: [nestedWorkflow, userInputStep], // or ['nestedWorkflowId', 'myStepId'] as a string array
resumeData: {
userSelection: "User's choice",
},
});