スナップショット
Mastra において、スナップショットは特定の時点におけるワークフローの完全な実行状態をシリアライズ可能な形で表したものです。スナップショットは、以下を含む、ワークフローを中断した箇所から正確に再開するために必要なすべての情報を保持します。
- ワークフロー内の各ステップの現在の状態
- 完了したステップの出力
- ワークフロー内でたどった実行パス
- 一時停止中のステップとそのメタデータ
- 各ステップの残りの再試行回数
- 実行再開に必要な追加のコンテキストデータ
スナップショットは、ワークフローが一時停止されるたびに Mastra によって自動的に作成・管理され、設定されたストレージシステムに永続化されます。
サスペンドと再開におけるスナップショットの役割
スナップショットは、Mastra のサスペンドおよび再開機能を支える中核的な仕組みです。ワークフローのステップが await suspend()
を呼び出すと:
- ワークフローの実行がその時点で一時停止される
- ワークフローの現在の状態がスナップショットとして保存される
- スナップショットがストレージに永続化される
- ワークフローのステップはステータス
'suspended'
の「サスペンド」状態としてマークされる - 後にサスペンドされたステップで
resume()
が呼び出されると、スナップショットが取得される - ワークフローの実行は中断したちょうどその地点から再開される
この仕組みにより、ヒューマン・イン・ザ・ループのワークフローの実装、レート制限への対処、外部リソースの待機、長時間の一時停止を要し得る複雑な分岐ワークフローの実装を強力に支援します。
スナップショットの構成
Mastra のワークフロースナップショットは、いくつかの主要コンポーネントから成ります:
export interface WorkflowRunState {
// 中核となる状態情報
value: Record<string, string>; // 現在のステートマシンの値
context: {
// ワークフローのコンテキスト
[key: string]: Record<
string,
| {
status: "success";
output: any;
}
| {
status: "failed";
error: string;
}
| {
status: "suspended";
payload?: any;
}
>;
input: Record<string, any>; // 初期入力データ
};
activePaths: Array<{
// 現在アクティブな実行パス
stepPath: string[];
stepId: string;
status: string;
}>;
// サスペンド中のステップへのパス
suspendedPaths: Record<string, number[]>;
// メタデータ
runId: string; // 一意の実行 ID
timestamp: number; // スナップショットの作成時刻
}
スナップショットの保存と取得方法
Mastra は、設定済みのストレージシステムにスナップショットを永続化します。デフォルトでは LibSQL データベースに保存されますが、Upstash など他のストレージプロバイダーを使用するように設定できます。
スナップショットは workflow_snapshots
テーブルに保存され、libsql を使用する場合は、関連する実行の run_id
によって一意に識別されます。
永続化レイヤーを利用することで、ワークフローの複数回にわたる実行間でもスナップショットを保持でき、HITL(人間参加)機能を高度に実現できます。
libsql storage と upstash storage についてはこちらをご覧ください。
スナップショットの保存
ワークフローが一時停止されると、Mastra は次の手順でワークフローのスナップショットを自動的に永続化します:
- ステップ実行内の
suspend()
関数がスナップショット処理をトリガーする WorkflowInstance.suspend()
メソッドが一時停止中のマシンを記録する- 現在の状態を保存するために
persistWorkflowSnapshot()
が呼び出される - スナップショットをシリアライズし、設定済みデータベースの
workflow_snapshots
テーブルに保存する - ストレージレコードにはワークフロー名、実行 ID、シリアライズ済みスナップショットが含まれる
スナップショットの取得
ワークフローが再開されると、Mastra は次の手順で永続化されたスナップショットを取得します:
- 特定のステップ ID を指定して
resume()
メソッドを呼び出す loadWorkflowSnapshot()
を使用してストレージからスナップショットを読み込む- スナップショットを解析し、再開に向けて準備する
- スナップショットの状態を用いてワークフロー実行を再構築する
- 一時停止されていたステップを再開し、実行を続行する
スナップショットのストレージオプション
Mastra では、スナップショットを永続化するための複数のストレージオプションを用意しています。
storage
インスタンスは Mastra
クラスで設定され、Mastra
インスタンスに登録されたすべてのワークフローのためのスナップショット永続化レイヤーを構成する際に使用されます。
つまり、同じ Mastra
インスタンスに登録されたすべてのワークフロー間でストレージが共有されます。
LibSQL(デフォルト)
デフォルトのストレージオプションは LibSQL(SQLite 互換のデータベース)です:
import { Mastra } from "@mastra/core/mastra";
import { DefaultStorage } from "@mastra/core/storage/libsql";
const mastra = new Mastra({
storage: new DefaultStorage({
config: {
url: "file:storage.db", // ローカルのファイルベース・データベース
// 本番環境では:
// url: process.env.DATABASE_URL,
// authToken: process.env.DATABASE_AUTH_TOKEN,
},
}),
workflows: {
weatherWorkflow,
travelWorkflow,
},
});
Upstash(Redis 互換)
サーバーレス環境向け:
import { Mastra } from "@mastra/core/mastra";
import { UpstashStore } from "@mastra/upstash";
const mastra = new Mastra({
storage: new UpstashStore({
url: process.env.UPSTASH_URL,
token: process.env.UPSTASH_TOKEN,
}),
workflows: {
weatherWorkflow,
travelWorkflow,
},
});
スナップショットを扱う際のベストプラクティス
-
シリアライズ可能であることを確保する: スナップショットに含める必要があるデータは、必ずシリアライズ可能(JSON に変換可能)であること。
-
スナップショットのサイズを抑える: 大きなデータオブジェクトをワークフローのコンテキストに直接保存するのは避ける。代わりに ID などの参照を保存し、必要に応じてデータを取得する。
-
再開時のコンテキストを慎重に扱う: ワークフローを再開する際に提供するコンテキストは慎重に検討する。既存のスナップショットデータとマージされるため。
-
適切な監視を設定する: 中断中のワークフロー、特に長時間実行されるものについては監視を実装し、確実に再開されるようにする。
-
ストレージのスケーリングを検討する: 多数の中断中ワークフローを扱うアプリケーションでは、ストレージソリューションが適切にスケールしていることを確認する。
高度なスナップショットパターン
カスタムスナップショットメタデータ
ワークフローを一時停止する際、再開時に役立つカスタムメタデータを含めることができます:
await suspend({
reason: "Waiting for customer approval",
requiredApprovers: ["manager", "finance"],
requestedBy: currentUser,
urgency: "high",
expires: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000),
});
このメタデータはスナップショットとともに保存され、再開時に利用できます。
条件付き再開
再開時に、suspend のペイロードに基づく条件付きロジックを実装できます:
run.watch(async ({ activePaths }) => {
const isApprovalStepSuspended =
activePaths.get("approval")?.status === "suspended";
if (isApprovalStepSuspended) {
const payload = activePaths.get("approval")?.suspendPayload;
if (payload.urgency === "high" && currentUser.role === "manager") {
await resume({
stepId: "approval",
context: { approved: true, approver: currentUser.id },
});
}
}
});