ワークフローにおける一時停止と再開
複雑なワークフローでは、外部からの入力やリソースを待つために実行を一時停止する必要がしばしばあります。
Mastraの一時停止と再開機能を使用すると、任意のステップでワークフロー実行を一時停止し、ワークフローのスナップショットをストレージに保存し、準備ができたら保存されたスナップショットから実行を再開することができます。 このプロセス全体はMastraによって自動的に管理されます。ユーザーからの設定や手動のステップは必要ありません。
ワークフローのスナップショットをストレージ(デフォルトではLibSQL)に保存することで、ワークフローの状態はセッション、デプロイメント、サーバーの再起動を超えて永続的に保存されます。この永続性は、外部からの入力やリソースを待つために数分、数時間、あるいは数日間一時停止したままになる可能性のあるワークフローにとって非常に重要です。
ワークフローの一時停止と再開を使用するタイミング
ワークフローを一時停止する一般的なシナリオには以下が含まれます:
- 人間の承認や入力を待つ
- 外部APIリソースが利用可能になるまで待機する
- 後のステップに必要な追加データを収集する
- 高コストの操作のレート制限やスロットリング
- 外部トリガーによるイベント駆動プロセスの処理
ステップを一時停止する方法
const humanInputStep = createStep({
id: "human-input",
inputSchema: z.object({
suggestions: z.array(z.string()),
vacationDescription: z.string(),
}),
resumeSchema: z.object({
selection: z.string(),
}),
suspendSchema: z.object({}),
outputSchema: z.object({
selection: z.string().describe("The selection of the user"),
vacationDescription: z.string(),
}),
execute: async ({ inputData, resumeData, suspend }) => {
if (!resumeData?.selection) {
await suspend({});
return {
selection: "",
vacationDescription: inputData?.vacationDescription,
};
}
return {
selection: resumeData.selection,
vacationDescription: inputData?.vacationDescription,
};
},
});
ステップ実行の再開方法
一時停止状態の識別
ワークフローを実行する際、その状態は以下のいずれかになります:
running
- ワークフローが現在実行中suspended
- ワークフローが一時停止中success
- ワークフローが完了failed
- ワークフローが失敗
状態がsuspended
の場合、ワークフローのsuspended
プロパティを確認することで、一時停止されているすべてのステップを識別できます。
const run = counterWorkflow.createRun();
const result = await run.start({ inputData: { startValue: 0 } });
if (result.status === "suspended") {
const resumedResults = await run.resume({
step: result.suspended[0],
resumeData: { newValue: 0 },
});
}
この場合、一時停止として報告された最初のステップを再開するロジックです。
suspended
プロパティはstring[][]
型で、各配列は一時停止されたステップへのパスを表します。最初の要素はメインワークフローのステップIDです。そのステップ自体がワークフローである場合、2番目の要素はネストされたワークフローで一時停止されたステップIDとなります。さらにそれがワークフローである場合、3番目の要素はネストされたワークフローで一時停止されたステップIDとなり、以降も同様です。
再開
// ユーザー入力の取得後
const result = await workflowRun.resume({
step: userInputStep, // または文字列として 'myStepId'
resumeData: {
userSelection: "ユーザーの選択",
},
});
ネストされた一時停止中のワークフローを再開するには:
const result = await workflowRun.resume({
step: [nestedWorkflow, userInputStep], // または文字列配列として ['nestedWorkflowId', 'myStepId']
resumeData: {
userSelection: "ユーザーの選択",
},
});