スナップショット
Mastraにおけるスナップショットとは、ワークフローの完全な実行状態を特定の時点でシリアライズ可能な形で表現したものです。スナップショットは、ワークフローを中断したまさにその場所から再開するために必要なすべての情報を記録します。これには以下が含まれます:
- ワークフロー内の各ステップの現在の状態
- 完了したステップの出力
- ワークフロー内でたどった実行経路
- 一時停止中のステップとそのメタデータ
- 各ステップの残りのリトライ回数
- 実行を再開するために必要な追加のコンテキストデータ
スナップショットは、ワークフローが一時停止されるたびにMastraによって自動的に作成・管理され、設定されたストレージシステムに永続化されます。
サスペンドとレジュームにおけるスナップショットの役割
スナップショットは、Mastra のサスペンドおよびレジューム機能を実現するための重要な仕組みです。ワークフローステップで await suspend()
が呼び出されると:
- ワークフローの実行はその時点で一時停止されます
- ワークフローの現在の状態がスナップショットとして記録されます
- スナップショットはストレージに保存されます
- ワークフローステップは「サスペンド中」として
'suspended'
ステータスでマークされます - 後で、サスペンドされたステップに対して
resume()
が呼び出されると、スナップショットが取得されます - ワークフローの実行は中断された場所から正確に再開されます
この仕組みにより、人間が介在するワークフローの実装や、レート制限への対応、外部リソースの待機、長期間の一時停止が必要な複雑な分岐ワークフローの実装が強力に可能となります。
スナップショットの構成
Mastra ワークフローのスナップショットは、いくつかの主要なコンポーネントで構成されています。
export interface WorkflowRunState {
// Core state info
value: Record<string, string>; // Current state machine value
context: {
// Workflow context
[key: string]: Record<
string,
| {
status: "success";
output: any;
}
| {
status: "failed";
error: string;
}
| {
status: "suspended";
payload?: any;
}
>;
input: Record<string, any>; // Initial input data
};
activePaths: Array<{
// Currently active execution paths
stepPath: string[];
stepId: string;
status: string;
}>;
// Paths to suspended steps
suspendedPaths: Record<string, number[]>;
// Metadata
runId: string; // Unique run identifier
timestamp: number; // Time snapshot was created
}
スナップショットの保存と取得方法
Mastraは、設定されたストレージシステムにスナップショットを永続化します。デフォルトでは、スナップショットはLibSQLデータベースに保存されますが、Upstashなど他のストレージプロバイダーを使用するように設定することも可能です。
スナップショットはworkflow_snapshots
テーブルに保存され、libsqlを使用している場合は関連する実行のrun_id
によって一意に識別されます。
永続化レイヤーを利用することで、スナップショットはワークフローの実行をまたいで保持され、高度なヒューマン・イン・ザ・ループ機能を実現できます。
libsqlストレージおよびupstashストレージについての詳細はこちらをご覧ください。
スナップショットの保存
ワークフローが一時停止されると、Mastraは以下の手順でワークフロースナップショットを自動的に永続化します。
- ステップ実行中に
suspend()
関数がスナップショット処理をトリガーします WorkflowInstance.suspend()
メソッドが一時停止したマシンを記録しますpersistWorkflowSnapshot()
が呼び出され、現在の状態を保存します- スナップショットはシリアライズされ、設定されたデータベースの
workflow_snapshots
テーブルに保存されます - ストレージレコードにはワークフロー名、実行ID、シリアライズされたスナップショットが含まれます
スナップショットの取得
ワークフローが再開されると、Mastraは以下の手順で永続化されたスナップショットを取得します。
- 特定のステップIDで
resume()
メソッドが呼び出されます loadWorkflowSnapshot()
を使用してストレージからスナップショットが読み込まれます- スナップショットが解析され、再開の準備が行われます
- ワークフローの実行がスナップショットの状態で再構築されます
- 一時停止していたステップが再開され、実行が続行されます
スナップショットのストレージオプション
Mastra は、スナップショットを永続化するための複数のストレージオプションを提供しています。
storage
インスタンスは Mastra
クラスで設定され、Mastra
インスタンスに登録されたすべてのワークフローのためのスナップショット永続化レイヤーとして使用されます。
つまり、同じ Mastra
インスタンスに登録されたすべてのワークフローでストレージが共有されます。
LibSQL(デフォルト)
デフォルトのストレージオプションは LibSQL で、SQLite 互換のデータベースです:
import { Mastra } from "@mastra/core/mastra";
import { DefaultStorage } from "@mastra/core/storage/libsql";
const mastra = new Mastra({
storage: new DefaultStorage({
config: {
url: "file:storage.db", // ローカルファイルベースのデータベース
// 本番環境の場合:
// url: process.env.DATABASE_URL,
// authToken: process.env.DATABASE_AUTH_TOKEN,
},
}),
workflows: {
weatherWorkflow,
travelWorkflow,
},
});
Upstash(Redis 互換)
サーバーレス環境向け:
import { Mastra } from "@mastra/core/mastra";
import { UpstashStore } from "@mastra/upstash";
const mastra = new Mastra({
storage: new UpstashStore({
url: process.env.UPSTASH_URL,
token: process.env.UPSTASH_TOKEN,
}),
workflows: {
weatherWorkflow,
travelWorkflow,
},
});
スナップショット作業のベストプラクティス
-
シリアライズ可能であることを確認する: スナップショットに含める必要があるデータは、必ずシリアライズ可能(JSONに変換可能)でなければなりません。
-
スナップショットのサイズを最小限に抑える: 大きなデータオブジェクトをワークフローコンテキストに直接保存するのは避けましょう。代わりにIDなどの参照情報を保存し、必要なときにデータを取得してください。
-
レジュームコンテキストの取り扱いに注意する: ワークフローを再開する際、どのコンテキストを提供するか慎重に検討してください。これは既存のスナップショットデータとマージされます。
-
適切なモニタリングを設定する: 特に長時間実行されるワークフローについては、中断されたワークフローのモニタリングを実装し、確実に再開されるようにしましょう。
-
ストレージのスケーリングを考慮する: 多数の中断されたワークフローがあるアプリケーションでは、ストレージソリューションが適切にスケールされていることを確認してください。
高度なスナップショットパターン
カスタムスナップショットメタデータ
ワークフローを一時停止する際、再開時に役立つカスタムメタデータを含めることができます:
await suspend({
reason: "顧客の承認待ち",
requiredApprovers: ["manager", "finance"],
requestedBy: currentUser,
urgency: "high",
expires: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000),
});
このメタデータはスナップショットと共に保存され、再開時に利用できます。
条件付き再開
再開時にサスペンドペイロードに基づいた条件付きロジックを実装できます:
run.watch(async ({ activePaths }) => {
const isApprovalStepSuspended =
activePaths.get("approval")?.status === "suspended";
if (isApprovalStepSuspended) {
const payload = activePaths.get("approval")?.suspendPayload;
if (payload.urgency === "high" && currentUser.role === "manager") {
await resume({
stepId: "approval",
context: { approved: true, approver: currentUser.id },
});
}
}
});