Skip to Content
ドキュメントワークフローワークフローの一時停止と再開 | Human-in-the-Loop | Mastra ドキュメント

Suspend & Resume

ワークフローは任意のステップで一時停止でき、現在の状態はスナップショットとしてストレージに永続化されます。その後、準備が整った時点で、この保存されたスナップショットから実行を再開できます。スナップショットを永続化することで、セッション、デプロイメント、サーバー再起動を跨いでワークフローの状態が維持され、外部入力やリソースを待機している間に一時停止される可能性があるワークフローにとって不可欠です。

ワークフローを一時停止する一般的なシナリオには以下があります:

  • 人間の承認や入力を待機する場合
  • 外部APIリソースが利用可能になるまで一時停止する場合
  • 後のステップで必要な追加データを収集する場合
  • 高コストな操作のレート制限やスロットリング
  • 外部トリガーを持つイベント駆動プロセスの処理

ワークフローのステータス

ワークフローを実行する際、そのstatusは以下のいずれかになります:

  • running - ワークフローが現在実行中
  • suspended - ワークフローが一時停止中
  • success - ワークフローが完了
  • failed - ワークフローが失敗

Suspend

状態がsuspendedの場合、ワークフロー結果出力のsuspended配列を確認することで、中断されたすべてのステップを特定できます。

src/mastra/workflows/test-workflow.ts
const step1 = createStep({ id: "step-1", description: "Test suspend", inputSchema: z.object({ input: z.string() }), outputSchema: z.object({ output: z.string() }), suspendSchema: z.object({}), resumeSchema: z.object({ city: z.string() }), execute: async ({ resumeData, suspend }) => { if (!(resumeData ?? {}).city) { await suspend({}); return { output: "" }; } return { output: "" }; } }); export const testWorkflow = createWorkflow({}) .then(step1) .commit();

詳細については、中断可能なワークフローの定義を参照してください。

中断されたステップの特定

中断されたワークフローを再開するには、結果のsuspended配列を調べて、どのステップが入力を必要としているかを判断します:

src/test-workflow.ts
import { mastra } from "./mastra"; const run = await mastra.getWorkflow("testWorkflow").createRunAsync(); const result = await run.start({ inputData: { city: "London" } }); console.log(JSON.stringify(result, null, 2)); if (result.status === "suspended") { const resumedResult = await run.resume({ step: result.suspended[0], resumeData: { city: "Berlin" } }); }

この場合、ロジックはsuspended配列にリストされている最初のステップを再開します。stepは、例えば’step-1’のように、そのidを使用して定義することもできます。

{ "status": "suspended", "steps": { // ... "step-1": { // ... "status": "suspended", } }, "suspended": [ [ "step-1" ] ] }

詳細については、ワークフロー実行結果を参照してください。

Resume

ワークフローはresumeを呼び出し、必要なresumeDataを提供することで再開できます。

src/test-workflow.ts
import { mastra } from "./mastra"; const run = await mastra.getWorkflow("testWorkflow").createRunAsync(); const result = await run.start({ inputData: { city: "London" } }); console.log(JSON.stringify(result, null, 2)); if (result.status === "suspended") { const resumedResult = await run.resume({ step: 'step-1', resumeData: { city: "Berlin" } }); console.log(JSON.stringify(resumedResult, null, 2)); }

ネストされたワークフロー

中断されたネストされたワークフローを再開するには、ワークフローインスタンスをresume関数のstepパラメータに渡します。

src/test-workflow.ts
const dowhileWorkflow = createWorkflow({ id: 'dowhile-workflow', inputSchema: z.object({ value: z.number() }), outputSchema: z.object({ value: z.number() }), }) .dountil( createWorkflow({ id: 'simple-resume-workflow', inputSchema: z.object({ value: z.number() }), outputSchema: z.object({ value: z.number() }), steps: [incrementStep, resumeStep], }) .then(incrementStep) .then(resumeStep) .commit(), async ({ inputData }) => inputData.value >= 10, ) .then( createStep({ id: 'final', inputSchema: z.object({ value: z.number() }), outputSchema: z.object({ value: z.number() }), execute: async ({ inputData }) => ({ value: inputData.value }), }), ) .commit(); const run = await dowhileWorkflow.createRunAsync(); const result = await run.start({ inputData: { value: 0 } }); if (result.status === "suspended") { const resumedResult = await run.resume({ resumeData: { value: 2 }, step: ['simple-resume-workflow', 'resume'], }); console.log(JSON.stringify(resumedResult, null, 2)); }

RuntimeContext

RuntimeContextでsuspend/resumeを使用する場合、インスタンスを自分で作成し、それをstartresume関数に渡すことができます。 RuntimeContextはワークフロー実行時に自動的に共有されません。

src/test-workflow.ts
import { RuntimeContext } from "@mastra/core/di"; import { mastra } from "./mastra"; const runtimeContext = new RuntimeContext(); const run = await mastra.getWorkflow("testWorkflow").createRunAsync(); const result = await run.start({ inputData: { suggestions: ["London", "Paris", "New York"] }, runtimeContext }); if (result.status === "suspended") { const resumedResult = await run.resume({ step: 'step-1', resumeData: { city: "New York" }, runtimeContext }); }