ワークフローにおける一時停止と再開
複雑なワークフローは、外部からの入力やリソースを待つ間、実行を一時停止する必要があることがよくあります。
Mastraの一時停止と再開機能を使用すると、ワークフローの実行を任意のステップで一時停止し、ワークフローのスナップショットをストレージに保存し、準備が整ったら保存されたスナップショットから実行を再開できます。 このプロセス全体はMastraによって自動的に管理されます。ユーザーからの設定や手動のステップは必要ありません。
ワークフローのスナップショットをストレージ(デフォルトではLibSQL)に保存することは、セッション、デプロイメント、サーバーの再起動を超えてワークフローの状態を永続的に保存することを意味します。この永続性は、外部からの入力やリソースを待つ間、数分、数時間、あるいは数日間一時停止したままになる可能性のあるワークフローにとって重要です。
サスペンド/レジュームを使用する場合
ワークフローをサスペンドする一般的なシナリオには以下が含まれます:
- 人間の承認や入力を待つ
- 外部APIリソースが利用可能になるまで一時停止する
- 後のステップで必要な追加データを収集する
- 高価な操作をレート制限またはスロットリングする
- 外部トリガーを伴うイベント駆動プロセスを処理する
基本的なサスペンドの例
こちらは、値が低すぎるとサスペンドし、より高い値が与えられると再開するシンプルなワークフローです:
const stepTwo = new Step({
id: "stepTwo",
outputSchema: z.object({
incrementedValue: z.number(),
}),
execute: async ({ context, suspend }) => {
if (context.steps.stepOne.status !== "success") {
return { incrementedValue: 0 };
}
const currentValue = context.steps.stepOne.output.doubledValue;
if (currentValue < 100) {
await suspend();
return { incrementedValue: 0 };
}
return { incrementedValue: currentValue + 1 };
},
});
非同期/待機ベースのフロー
Mastraの中断と再開のメカニズムは、非同期/待機パターンを使用しており、中断ポイントを持つ複雑なワークフローを直感的に実装できます。コード構造は自然に実行フローを反映します。
仕組み
- ステップの実行関数は、パラメータとして
suspend
関数を受け取ります await suspend()
を呼び出すと、そのポイントでワークフローが一時停止します- ワークフローの状態が保存されます
- 後で、適切なパラメータで
workflow.resume()
を呼び出すことでワークフローを再開できます suspend()
呼び出しの後のポイントから実行が続行されます
複数の中断ポイントを持つ例
以下は、複数のステップを持ち、中断可能なワークフローの例です:
// 中断機能を持つステップを定義
const promptAgentStep = new Step({
id: "promptAgent",
execute: async ({ context, suspend }) => {
// 中断が必要かどうかを決定する条件
if (needHumanInput) {
// 中断状態と共に保存されるペイロードデータをオプションで渡す
await suspend({ requestReason: "プロンプトのために人間の入力が必要" });
// suspend()の後のコードはステップが再開されたときに実行されます
return { modelOutput: context.userInput };
}
return { modelOutput: "AI生成の出力" };
},
outputSchema: z.object({ modelOutput: z.string() }),
});
const improveResponseStep = new Step({
id: "improveResponse",
execute: async ({ context, suspend }) => {
// 別の中断の条件
if (needFurtherRefinement) {
await suspend();
return { improvedOutput: context.refinedOutput };
}
return { improvedOutput: "改善された出力" };
},
outputSchema: z.object({ improvedOutput: z.string() }),
});
// ワークフローを構築
const workflow = new Workflow({
name: "multi-suspend-workflow",
triggerSchema: z.object({ input: z.string() }),
});
workflow
.step(getUserInput)
.then(promptAgentStep)
.then(evaluateTone)
.then(improveResponseStep)
.then(evaluateImproved)
.commit();
// Mastraにワークフローを登録
export const mastra = new Mastra({
workflows: { workflow },
});
ワークフローの開始と再開
// ワークフローを取得し、実行を作成
const wf = mastra.getWorkflow("multi-suspend-workflow");
const run = wf.createRun();
// ワークフローを開始
const initialResult = await run.start({
triggerData: { input: "初期入力" },
});
let promptAgentStepResult = initialResult.activePaths.get("promptAgent");
let promptAgentResumeResult = undefined;
// ステップが中断されているか確認
if (promptAgentStepResult?.status === "suspended") {
console.log("ワークフローはpromptAgentステップで中断されました");
// 新しいコンテキストでワークフローを再開
const resumeResult = await run.resume({
stepId: "promptAgent",
context: { userInput: "人間が提供した入力" },
});
promptAgentResumeResult = resumeResult;
}
const improveResponseStepResult =
promptAgentResumeResult?.activePaths.get("improveResponse");
if (improveResponseStepResult?.status === "suspended") {
console.log("ワークフローはimproveResponseステップで中断されました");
// 異なるコンテキストで再度再開
const finalResult = await run.resume({
stepId: "improveResponse",
context: { refinedOutput: "人間が改善した出力" },
});
console.log("ワークフローが完了しました:", finalResult?.results);
}
サスペンドとレジュームのためのストレージ
ワークフローが await suspend()
を使用して一時停止されると、Mastraは自動的にワークフロー全体の状態をストレージに永続化します。これは、長期間一時停止される可能性のあるワークフローにとって不可欠であり、アプリケーションの再起動やサーバーインスタンス間でも状態が保持されることを保証します。
デフォルトストレージ: LibSQL
デフォルトでは、MastraはストレージエンジンとしてLibSQLを使用します:
import { Mastra } from "@mastra/core/mastra";
import { LibSQLStore } from "@mastra/libsql";
const mastra = new Mastra({
storage: new LibSQLStore({
url: "file:./storage.db", // 開発用のローカルファイルベースのデータベース
// 本番環境では、永続的なURLを使用します:
// url: process.env.DATABASE_URL,
// authToken: process.env.DATABASE_AUTH_TOKEN, // 認証接続用のオプション
}),
});
LibSQLストレージは異なるモードで設定できます:
- インメモリデータベース(テスト用):
:memory:
- ファイルベースのデータベース(開発用):
file:storage.db
- リモートデータベース(本番用):
libsql://your-database.turso.io
のようなURL
代替ストレージオプション
Upstash(Redis互換)
サーバーレスアプリケーションやRedisが好ましい環境向け:
npm install @mastra/upstash@latest
import { Mastra } from "@mastra/core/mastra";
import { UpstashStore } from "@mastra/upstash";
const mastra = new Mastra({
storage: new UpstashStore({
url: process.env.UPSTASH_URL,
token: process.env.UPSTASH_TOKEN,
}),
});
ストレージに関する考慮事項
- すべてのストレージオプションは、サスペンドとレジューム機能を同じように対応しています
- ワークフローの状態は一時停止時に自動的にシリアライズされて保存されます
- サスペンド/レジュームをストレージで動作させるために追加の設定は必要ありません
- インフラストラクチャ、スケーリングのニーズ、既存の技術スタックに基づいてストレージオプションを選択してください
// ワークフローを取得
const workflow = mastra.getWorkflow("approval-workflow");
const run = workflow.createRun();
// ワークフローを開始
const initialResult = await run.start({
triggerData: { requestId: "request-123" },
});
console.log("ワークフローが開始され、承認イベントを待っています");
console.log(initialResult.results);
// 出力はワークフローがイベントステップで一時停止していることを示します:
// {
// getUserInput: { status: 'success', output: { userInput: 'initial input' } },
// __approvalReceived_event: { status: 'suspended' }
// }
// 後で、承認イベントが発生したとき:
const resumeResult = await run.resumeWithEvent("approvalReceived", {
approved: true,
approverName: "Jane Doe",
});
console.log(
"イベントデータでワークフローが再開されました:",
resumeResult.results,
);
// 出力は完了したワークフローを示します:
// {
// getUserInput: { status: 'success', output: { userInput: 'initial input' } },
// __approvalReceived_event: { status: 'success', output: { executed: true, resumedEvent: { approved: true, approverName: 'Jane Doe' } } },
// processApproval: { status: 'success', output: { approved: true, approvedBy: 'Jane Doe' } }
// }
イベントベースのワークフローに関する重要なポイント
-
suspend()
関数は、オプションで一時停止状態と共に保存されるペイロードオブジェクトを取ることができます -
await suspend()
呼び出しの後のコードは、ステップが再開されるまで実行されません -
ステップが一時停止されると、そのステータスはワークフロー結果で
'suspended'
になります -
再開されると、ステップのステータスは
'suspended'
から'success'
に変わります -
resume()
メソッドは、再開する一時停止ステップを識別するためにstepId
を必要とします -
再開時に新しいコンテキストデータを提供でき、それは既存のステップ結果とマージされます
-
イベントはスキーマと共にワークフロー設定で定義されなければなりません
-
afterEvent
メソッドは、イベントを待つ特別な一時停止ステップを作成します -
イベントステップは自動的に
__eventName_event
(例:__approvalReceived_event
)と命名されます -
resumeWithEvent
を使用してイベントデータを提供し、ワークフローを続行します -
イベントデータは、そのイベントのために定義されたスキーマに対して検証されます
-
イベントデータは
inputData.resumedEvent
としてコンテキストで利用可能です
サスペンドとレジュームのためのストレージ
ワークフローが await suspend()
を使用してサスペンドされると、Mastra はワークフローの状態全体を自動的にストレージに保存します。これは、アプリケーションの再起動やサーバーインスタンスを超えて状態を保持するために、長期間サスペンドされる可能性のあるワークフローにとって重要です。
デフォルトストレージ: LibSQL
デフォルトでは、Mastra は LibSQL をストレージエンジンとして使用します:
import { Mastra } from "@mastra/core/mastra";
import { DefaultStorage } from "@mastra/core/storage/libsql";
const mastra = new Mastra({
storage: new DefaultStorage({
config: {
url: "file:storage.db", // 開発用のローカルファイルベースのデータベース
// 本番環境では、永続的なURLを使用:
// url: process.env.DATABASE_URL,
// authToken: process.env.DATABASE_AUTH_TOKEN, // 認証された接続のためのオプション
},
}),
});
LibSQL ストレージは異なるモードで構成できます:
- インメモリデータベース(テスト用):
:memory:
- ファイルベースのデータベース(開発用):
file:storage.db
- リモートデータベース(本番用):
libsql://your-database.turso.io
のようなURL
代替ストレージオプション
Upstash (Redis互換)
サーバーレスアプリケーションやRedisが好まれる環境向け:
npm install @mastra/upstash
import { Mastra } from "@mastra/core/mastra";
import { UpstashStore } from "@mastra/upstash";
const mastra = new Mastra({
storage: new UpstashStore({
url: process.env.UPSTASH_URL,
token: process.env.UPSTASH_TOKEN,
}),
});
ストレージに関する考慮事項
- すべてのストレージオプションは、サスペンドとレジューム機能を同様にサポートします
- ワークフローの状態は、サスペンド時に自動的にシリアライズされ保存されます
- ストレージでサスペンド/レジュームを機能させるために追加の設定は不要です
- インフラストラクチャ、スケーリングのニーズ、既存の技術スタックに基づいてストレージオプションを選択してください
監視と再開
中断されたワークフローを処理するには、watch
メソッドを使用して実行ごとにワークフローのステータスを監視し、resume
を使用して実行を続行します:
import { mastra } from "./index";
// ワークフローを取得
const myWorkflow = mastra.getWorkflow("myWorkflow");
const { start, watch, resume } = myWorkflow.createRun();
// 実行前にワークフローを監視開始
watch(async ({ activePaths }) => {
const isStepTwoSuspended = activePaths.get("stepTwo")?.status === "suspended";
if (isStepTwoSuspended) {
console.log("ワークフローが中断されました。新しい値で再開します");
// 新しいコンテキストでワークフローを再開
await resume({
stepId: "stepTwo",
context: { secondValue: 100 },
});
}
});
// ワークフローの実行を開始
await start({ triggerData: { inputValue: 45 } });
イベントベースのワークフローの監視と再開
イベントベースのワークフローでも同じ監視パターンを使用できます:
const { start, watch, resumeWithEvent } = workflow.createRun();
// 中断されたイベントステップを監視
watch(async ({ activePaths }) => {
const isApprovalReceivedSuspended =
activePaths.get("__approvalReceived_event")?.status === "suspended";
if (isApprovalReceivedSuspended) {
console.log("承認イベントを待っているワークフロー");
// 実際のシナリオでは、実際のイベントが発生するのを待ちます
// 例えば、これはWebhookやユーザーの操作によってトリガーされる可能性があります
setTimeout(async () => {
await resumeWithEvent("approvalReceived", {
approved: true,
approverName: "Auto Approver",
});
}, 5000); // 5秒後にイベントをシミュレート
}
});
// ワークフローを開始
await start({ triggerData: { requestId: "auto-123" } });