Snapshots
Mastraにおいて、スナップショットは、特定の時点でのワークフローの完全な実行状態をシリアライズ可能な形で表現したものです。スナップショットは、ワークフローを中断した正確な位置から再開するために必要なすべての情報をキャプチャします。これには以下が含まれます:
- ワークフロー内の各ステップの現在の状態
- 完了したステップの出力
- ワークフローを通じて取られた実行パス
- 中断されたステップとそのメタデータ
- 各ステップの残りの再試行回数
- 実行を再開するために必要な追加のコンテキストデータ
スナップショットは、ワークフローが中断されるたびにMastraによって自動的に作成および管理され、設定されたストレージシステムに永続化されます。
スナップショットの役割: 一時停止と再開
スナップショットは、Mastraの一時停止と再開機能を可能にする主要なメカニズムです。ワークフローステップがawait suspend()
を呼び出すと:
- ワークフローの実行がその正確なポイントで一時停止されます
- ワークフローの現在の状態がスナップショットとしてキャプチャされます
- スナップショットがストレージに保存されます
- ワークフローステップは「一時停止」として、ステータスが
'suspended'
でマークされます - 後で、
resume()
が一時停止されたステップで呼び出されると、スナップショットが取得されます - ワークフローの実行は、正確に中断した場所から再開されます
このメカニズムは、人間が関与するワークフローを実装したり、レート制限を処理したり、外部リソースを待機したり、長期間の一時停止が必要な複雑な分岐ワークフローを実装するための強力な方法を提供します。
スナップショットの構造
Mastra ワークフロースナップショットは、いくつかの主要なコンポーネントで構成されています:
export interface WorkflowRunState {
// Core state info
value: Record<string, string>; // 現在の状態マシンの値
context: {
// ワークフローのコンテキスト
steps: Record<
string,
{
// ステップ実行結果
status: "success" | "failed" | "suspended" | "waiting" | "skipped";
payload?: any; // ステップ固有のデータ
error?: string; // 失敗した場合のエラー情報
}
>;
triggerData: Record<string, any>; // 初期トリガーデータ
attempts: Record<string, number>; // 残りの再試行回数
inputData: Record<string, any>; // 初期入力データ
};
activePaths: Array<{
// 現在アクティブな実行パス
stepPath: string[];
stepId: string;
status: string;
}>;
// メタデータ
runId: string; // ユニークな実行識別子
timestamp: number; // スナップショットが作成された時間
// ネストされたワークフローと中断されたステップのために
childStates?: Record<string, WorkflowRunState>; // 子ワークフローの状態
suspendedSteps?: Record<string, string>; // 中断されたステップのマッピング
}
スナップショットの保存と取得方法
Mastraは、設定されたストレージシステムにスナップショットを永続化します。デフォルトでは、スナップショットはLibSQLデータベースに保存されますが、Upstashのような他のストレージプロバイダーを使用するように設定することもできます。スナップショットはworkflow_snapshots
テーブルに保存され、libsqlを使用する場合、関連する実行のrun_id
によって一意に識別されます。永続化レイヤーを利用することで、ワークフローの実行をまたいでスナップショットを永続化でき、高度な人間を介したループ機能を可能にします。
libsqlストレージとupstashストレージについてさらに読むことができます。
スナップショットの保存
ワークフローが中断されると、Mastraは次の手順でワークフロースナップショットを自動的に永続化します:
- ステップ実行中の
suspend()
関数がスナップショットプロセスをトリガーします WorkflowInstance.suspend()
メソッドが中断されたマシンを記録しますpersistWorkflowSnapshot()
が呼び出され、現在の状態を保存します- スナップショットはシリアライズされ、
workflow_snapshots
テーブルの設定されたデータベースに保存されます - ストレージレコードには、ワークフロー名、実行ID、およびシリアライズされたスナップショットが含まれます
スナップショットの取得
ワークフローが再開されると、Mastraは次の手順で永続化されたスナップショットを取得します:
- 特定のステップIDで
resume()
メソッドが呼び出されます loadWorkflowSnapshot()
を使用してストレージからスナップショットが読み込まれます- スナップショットが解析され、再開の準備が整えられます
- スナップショット状態でワークフロー実行が再作成されます
- 中断されたステップが再開され、実行が続行されます
スナップショットのストレージオプション
Mastraは、スナップショットを永続化するための複数のストレージオプションを提供します。
storage
インスタンスは Mastra
クラスで設定され、Mastra
インスタンスに登録されたすべてのワークフローのためのスナップショット永続化レイヤーをセットアップするために使用されます。
これは、同じ Mastra
インスタンスに登録されたすべてのワークフローでストレージが共有されることを意味します。
LibSQL (デフォルト)
デフォルトのストレージオプションは、SQLite互換のデータベースであるLibSQLです:
import { Mastra } from "@mastra/core/mastra";
import { DefaultStorage } from "@mastra/core/storage/libsql";
const mastra = new Mastra({
storage: new DefaultStorage({
config: {
url: "file:storage.db", // ローカルファイルベースのデータベース
// 本番環境の場合:
// url: process.env.DATABASE_URL,
// authToken: process.env.DATABASE_AUTH_TOKEN,
},
}),
workflows: {
weatherWorkflow,
travelWorkflow,
},
});
Upstash (Redis互換)
サーバーレス環境向け:
import { Mastra } from "@mastra/core/mastra";
import { UpstashStore } from "@mastra/upstash";
const mastra = new Mastra({
storage: new UpstashStore({
url: process.env.UPSTASH_URL,
token: process.env.UPSTASH_TOKEN,
}),
workflows: {
weatherWorkflow,
travelWorkflow,
},
});
スナップショットを扱う際のベストプラクティス
-
シリアライズ可能性を確保する: スナップショットに含める必要があるデータは、シリアライズ可能(JSONに変換可能)でなければなりません。
-
スナップショットサイズを最小化する: 大きなデータオブジェクトをワークフローコンテキストに直接保存するのは避けてください。代わりに、それらへの参照(IDなど)を保存し、必要に応じてデータを取得します。
-
再開コンテキストを慎重に扱う: ワークフローを再開する際には、どのコンテキストを提供するかを慎重に考慮してください。これは既存のスナップショットデータとマージされます。
-
適切なモニタリングを設定する: 特に長時間実行されるワークフローのために、停止されたワークフローのモニタリングを実装し、適切に再開されることを確認します。
-
ストレージのスケーリングを考慮する: 多くの停止されたワークフローを持つアプリケーションの場合、ストレージソリューションが適切にスケーリングされていることを確認します。
高度なスナップショットパターン
カスタムスナップショットメタデータ
ワークフローを一時停止する際に、再開時に役立つカスタムメタデータを含めることができます:
await suspend({
reason: "顧客の承認待ち",
requiredApprovers: ["manager", "finance"],
requestedBy: currentUser,
urgency: "high",
expires: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000),
});
このメタデータはスナップショットと共に保存され、再開時に利用可能です。
条件付き再開
再開時に一時停止ペイロードに基づく条件付きロジックを実装できます:
run.watch(async ({ activePaths }) => {
const isApprovalStepSuspended =
activePaths.get("approval")?.status === "suspended";
if (isApprovalStepSuspended) {
const payload = activePaths.get("approval")?.suspendPayload;
if (payload.urgency === "high" && currentUser.role === "manager") {
await resume({
stepId: "approval",
context: { approved: true, approver: currentUser.id },
});
}
}
});