Skip to Content
ドキュメントワークフロー(レガシー)ワークフローの一時停止と再開(レガシー) | Human-in-the-Loop | Mastra ドキュメント

ワークフローにおけるサスペンドとレジューム(レガシー)

複雑なワークフローでは、外部からの入力やリソースを待つ間に実行を一時停止する必要がよくあります。

Mastra のサスペンドとレジューム機能を使うことで、ワークフローの実行を任意のステップで一時停止し、ワークフローのスナップショットをストレージに保存し、準備ができたら保存されたスナップショットから実行を再開できます。 この一連のプロセスはすべて Mastra によって自動的に管理されます。設定やユーザーによる手動操作は必要ありません。

ワークフローのスナップショットをストレージ(デフォルトでは LibSQL)に保存することで、ワークフローの状態はセッション、デプロイ、サーバーの再起動をまたいで永続的に保持されます。この永続性は、外部からの入力やリソースを待つ間に数分、数時間、あるいは数日間サスペンドされたままになる可能性があるワークフローにとって非常に重要です。

サスペンド/レジュームを使用するタイミング

ワークフローをサスペンドする一般的なシナリオには、以下が含まれます。

  • 人による承認や入力を待つ場合
  • 外部APIリソースが利用可能になるまで一時停止する場合
  • 後続のステップで必要となる追加データを収集する場合
  • 高コストな処理のレート制限やスロットリングを行う場合
  • 外部トリガーによるイベント駆動型プロセスを処理する場合

基本的なサスペンドの例

こちらは、値が低すぎる場合にサスペンドし、より高い値が与えられたときに再開するシンプルなワークフローです:

import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy"; const stepTwo = new LegacyStep({ id: "stepTwo", outputSchema: z.object({ incrementedValue: z.number(), }), execute: async ({ context, suspend }) => { if (context.steps.stepOne.status !== "success") { return { incrementedValue: 0 }; } const currentValue = context.steps.stepOne.output.doubledValue; if (currentValue < 100) { await suspend(); return { incrementedValue: 0 }; } return { incrementedValue: currentValue + 1 }; }, });

Async/Await ベースのフロー

Mastra のサスペンドとレジュームの仕組みは、async/await パターンを利用しており、サスペンドポイントを含む複雑なワークフローの実装を直感的に行うことができます。コード構造は実行フローを自然に反映します。

仕組み

  1. ステップの実行関数は、パラメータとして suspend 関数を受け取ります
  2. await suspend() を呼び出すと、その時点でワークフローが一時停止します
  3. ワークフローの状態が永続化されます
  4. 後で、適切なパラメータで workflow.resume() を呼び出すことでワークフローを再開できます
  5. 実行は suspend() 呼び出しの後のポイントから続行されます

複数のサスペンドポイントを持つ例

複数のステップでサスペンド可能なワークフローの例を示します:

// Define steps with suspend capability const promptAgentStep = new LegacyStep({ id: "promptAgent", execute: async ({ context, suspend }) => { // Some condition that determines if we need to suspend if (needHumanInput) { // Optionally pass payload data that will be stored with suspended state await suspend({ requestReason: "Need human input for prompt" }); // Code after suspend() will execute when the step is resumed return { modelOutput: context.userInput }; } return { modelOutput: "AI generated output" }; }, outputSchema: z.object({ modelOutput: z.string() }), }); const improveResponseStep = new LegacyStep({ id: "improveResponse", execute: async ({ context, suspend }) => { // Another condition for suspension if (needFurtherRefinement) { await suspend(); return { improvedOutput: context.refinedOutput }; } return { improvedOutput: "Improved output" }; }, outputSchema: z.object({ improvedOutput: z.string() }), }); // Build the workflow const workflow = new LegacyWorkflow({ name: "multi-suspend-workflow", triggerSchema: z.object({ input: z.string() }), }); workflow .step(getUserInput) .then(promptAgentStep) .then(evaluateTone) .then(improveResponseStep) .then(evaluateImproved) .commit(); // Register the workflow with Mastra export const mastra = new Mastra({ legacy_workflows: { workflow }, });

ワークフローの開始と再開

// Get the workflow and create a run const wf = mastra.legacy_getWorkflow("multi-suspend-workflow"); const run = wf.createRun(); // Start the workflow const initialResult = await run.start({ triggerData: { input: "initial input" }, }); let promptAgentStepResult = initialResult.activePaths.get("promptAgent"); let promptAgentResumeResult = undefined; // Check if a step is suspended if (promptAgentStepResult?.status === "suspended") { console.log("Workflow suspended at promptAgent step"); // Resume the workflow with new context const resumeResult = await run.resume({ stepId: "promptAgent", context: { userInput: "Human provided input" }, }); promptAgentResumeResult = resumeResult; } const improveResponseStepResult = promptAgentResumeResult?.activePaths.get("improveResponse"); if (improveResponseStepResult?.status === "suspended") { console.log("Workflow suspended at improveResponse step"); // Resume again with different context const finalResult = await run.resume({ stepId: "improveResponse", context: { refinedOutput: "Human refined output" }, }); console.log("Workflow completed:", finalResult?.results); }

イベントベースの一時停止と再開

手動でステップを一時停止する方法に加えて、Mastra では afterEvent メソッドを使ったイベントベースの一時停止が提供されています。これにより、ワークフローは特定のイベントが発生するまで自動的に一時停止し、発生後に処理を再開できます。

afterEvent と resumeWithEvent の使い方

afterEvent メソッドは、ワークフロー内に特定のイベントが発生するのを待つ一時停止ポイントを自動的に作成します。イベントが発生した際には、resumeWithEvent を使ってイベントデータとともにワークフローを再開できます。

仕組みは以下の通りです:

  1. ワークフロー設定でイベントを定義する
  2. afterEvent を使ってそのイベントを待つ一時停止ポイントを作成する
  3. イベントが発生したら、イベント名とデータを指定して resumeWithEvent を呼び出す

例:イベントベースのワークフロー

// Define steps const getUserInput = new LegacyStep({ id: "getUserInput", execute: async () => ({ userInput: "initial input" }), outputSchema: z.object({ userInput: z.string() }), }); const processApproval = new LegacyStep({ id: "processApproval", execute: async ({ context }) => { // Access the event data from the context const approvalData = context.inputData?.resumedEvent; return { approved: approvalData?.approved, approvedBy: approvalData?.approverName, }; }, outputSchema: z.object({ approved: z.boolean(), approvedBy: z.string(), }), }); // Create workflow with event definition const approvalWorkflow = new LegacyWorkflow({ name: "approval-workflow", triggerSchema: z.object({ requestId: z.string() }), events: { approvalReceived: { schema: z.object({ approved: z.boolean(), approverName: z.string(), }), }, }, }); // Build workflow with event-based suspension approvalWorkflow .step(getUserInput) .afterEvent("approvalReceived") // Workflow will automatically suspend here .step(processApproval) // This step runs after the event is received .commit();

イベントベースのワークフローの実行

// Get the workflow const workflow = mastra.legacy_getWorkflow("approval-workflow"); const run = workflow.createRun(); // Start the workflow const initialResult = await run.start({ triggerData: { requestId: "request-123" }, }); console.log("Workflow started, waiting for approval event"); console.log(initialResult.results); // Output will show the workflow is suspended at the event step: // { // getUserInput: { status: 'success', output: { userInput: 'initial input' } }, // __approvalReceived_event: { status: 'suspended' } // } // Later, when the approval event occurs: const resumeResult = await run.resumeWithEvent("approvalReceived", { approved: true, approverName: "Jane Doe", }); console.log("Workflow resumed with event data:", resumeResult.results); // Output will show the completed workflow: // { // getUserInput: { status: 'success', output: { userInput: 'initial input' } }, // __approvalReceived_event: { status: 'success', output: { executed: true, resumedEvent: { approved: true, approverName: 'Jane Doe' } } }, // processApproval: { status: 'success', output: { approved: true, approvedBy: 'Jane Doe' } } // }

イベントベースワークフローの重要ポイント

  • suspend() 関数は、オプションで一時停止状態とともに保存されるペイロードオブジェクトを受け取ることができます

  • await suspend() 呼び出しの後のコードは、ステップが再開されるまで実行されません

  • ステップが一時停止されると、そのステータスはワークフロー結果で 'suspended' になります

  • 再開時には、ステップのステータスは 'suspended' から 'success' に変わります

  • resume() メソッドは、どの一時停止中のステップを再開するかを特定するために stepId が必要です

  • 再開時に新しいコンテキストデータを渡すことができ、既存のステップ結果とマージされます

  • イベントはワークフロー設定でスキーマとともに定義する必要があります

  • afterEvent メソッドは、そのイベントを待つ特別な一時停止ステップを作成します

  • イベントステップは自動的に __eventName_event(例:__approvalReceived_event)という名前になります

  • resumeWithEvent を使ってイベントデータを渡し、ワークフローを継続します

  • イベントデータは、そのイベント用に定義されたスキーマで検証されます

  • イベントデータはコンテキスト内の inputData.resumedEvent として利用できます

サスペンドとレジュームのためのストレージ

ワークフローが await suspend() を使ってサスペンドされると、Mastra はワークフローの全状態を自動的にストレージへ永続化します。これは、ワークフローが長期間サスペンドされたままになる可能性がある場合に重要であり、アプリケーションの再起動やサーバーインスタンスをまたいでも状態が保持されることを保証します。

デフォルトストレージ: LibSQL

デフォルトでは、Mastra は LibSQL をストレージエンジンとして使用します:

import { Mastra } from "@mastra/core/mastra"; import { LibSQLStore } from "@mastra/libsql"; const mastra = new Mastra({ storage: new LibSQLStore({ url: "file:./storage.db", // 開発用のローカルファイルベースデータベース // 本番環境では永続的なURLを使用してください: // url: process.env.DATABASE_URL, // authToken: process.env.DATABASE_AUTH_TOKEN, // 認証接続の場合はオプション }), });

LibSQL ストレージはさまざまなモードで設定できます:

  • インメモリデータベース(テスト用): :memory:
  • ファイルベースデータベース(開発用): file:storage.db
  • リモートデータベース(本番用): libsql://your-database.turso.io のようなURL

代替ストレージオプション

Upstash(Redis互換)

サーバーレスアプリケーションや Redis を好む環境向け:

npm install @mastra/upstash@latest
import { Mastra } from "@mastra/core/mastra"; import { UpstashStore } from "@mastra/upstash"; const mastra = new Mastra({ storage: new UpstashStore({ url: process.env.UPSTASH_URL, token: process.env.UPSTASH_TOKEN, }), });

ストレージに関する注意点

  • すべてのストレージオプションは、サスペンドとレジュームの機能を同じようにサポートします
  • ワークフローの状態はサスペンド時に自動的にシリアライズされ保存されます
  • サスペンド/レジュームがストレージで動作するために追加の設定は不要です
  • インフラ、スケーリングの必要性、既存の技術スタックに基づいてストレージオプションを選択してください

監視と再開

一時停止されたワークフローを処理するには、watch メソッドを使用して各実行ごとにワークフローのステータスを監視し、resume で実行を再開します。

import { mastra } from "./index"; // Get the workflow const myWorkflow = mastra.legacy_getWorkflow("myWorkflow"); const { start, watch, resume } = myWorkflow.createRun(); // Start watching the workflow before executing it watch(async ({ activePaths }) => { const isStepTwoSuspended = activePaths.get("stepTwo")?.status === "suspended"; if (isStepTwoSuspended) { console.log("Workflow suspended, resuming with new value"); // Resume the workflow with new context await resume({ stepId: "stepTwo", context: { secondValue: 100 }, }); } }); // Start the workflow execution await start({ triggerData: { inputValue: 45 } });

イベントベースのワークフローの監視と再開

同じ監視パターンをイベントベースのワークフローでも利用できます。

const { start, watch, resumeWithEvent } = workflow.createRun(); // Watch for suspended event steps watch(async ({ activePaths }) => { const isApprovalReceivedSuspended = activePaths.get("__approvalReceived_event")?.status === "suspended"; if (isApprovalReceivedSuspended) { console.log("Workflow waiting for approval event"); // In a real scenario, you would wait for the actual event to occur // For example, this could be triggered by a webhook or user interaction setTimeout(async () => { await resumeWithEvent("approvalReceived", { approved: true, approverName: "Auto Approver", }); }, 5000); // Simulate event after 5 seconds } }); // Start the workflow await start({ triggerData: { requestId: "auto-123" } });

参考文献

サスペンドとレジュームの仕組みについてより深く理解するには:

関連リソース