スナップショット
Mastra において、スナップショットは特定時点におけるワークフローの完全な実行状態をシリアライズ可能な形で表現したものです。スナップショットは、次の情報を含め、ワークフローを中断した地点から正確に再開するために必要なすべての情報を保持します。
- ワークフロー内の各ステップの現在の状態
- 完了したステップの出力
- ワークフローで辿った実行経路
- 一時停止中のステップとそのメタデータ
- 各ステップに残っている再試行回数
- 実行再開に必要な追加のコンテキストデータ
スナップショットは、ワークフローが一時停止されるたびに Mastra によって自動的に作成・管理され、設定されたストレージシステムに永続化されます。
サスペンドと再開におけるスナップショットの役割
スナップショットは、Mastra のサスペンド/再開機能を支える中核的な仕組みです。ワークフローのステップが await suspend()
を呼び出すと、
- ワークフローの実行がその時点で一時停止される
- ワークフローの現在の状態がスナップショットとして取得される
- 取得したスナップショットがストレージに永続化される
- そのステップはステータス
'suspended'
の「suspended」としてマークされる - 後にサスペンド中のステップで
resume()
が呼び出されると、スナップショットが読み出される - ワークフローの実行は中断したまさにその地点から再開される
この仕組みにより、ヒューマン・イン・ザ・ループのワークフローの実装、レート制限への対応、外部リソースの待機、長時間の一時停止を要し得る複雑な分岐ワークフローの実装が強力に可能になります。
スナップショットの構成
各スナップショットには、runId
、入力、ステップのステータス(success
、suspended
など)、サスペンドおよび再開時のペイロード、そして最終出力が含まれます。これにより、実行再開時に完全なコンテキストを参照できます。
{
"runId": "34904c14-e79e-4a12-9804-9655d4616c50",
"status": "success",
"value": {},
"context": {
"input": { "value": 100, "user": "Michael", "requiredApprovers": ["manager", "finance"] },
"approval-step": {
"payload": { "value": 100, "user": "Michael", "requiredApprovers": ["manager", "finance"] },
"startedAt": 1758027577955,
"status": "success",
"suspendPayload": { "message": "Workflow suspended", "requestedBy": "Michael", "approvers": ["manager", "finance"] },
"suspendedAt": 1758027578065,
"resumePayload": { "confirm": true, "approver": "manager" },
"resumedAt": 1758027578517,
"output": { "value": 100, "approved": true },
"endedAt": 1758027578634
}
},
"activePaths": [],
"serializedStepGraph": [{ "type": "step", "step": { "id": "approval-step", "description": "Accepts a value, waits for confirmation" } }],
"suspendedPaths": {},
"waitingPaths": {},
"result": { "value": 100, "approved": true },
"runtimeContext": {},
"timestamp": 1758027578740
}
スナップショットの保存と取得方法
スナップショットは、設定済みのストレージシステムに保存されます。既定では LibSQL を使用しますが、代わりに Upstash または PostgreSQL を設定できます。各スナップショットは workflow_snapshots
テーブルに保存され、ワークフローの runId
で識別されます。
詳しくは以下をご覧ください:
スナップショットの保存
ワークフローが一時停止されると、Mastra は次の手順でワークフローのスナップショットを自動的に永続化します。
- ステップ実行内で
suspend()
関数がスナップショット処理をトリガーする WorkflowInstance.suspend()
メソッドが一時停止中のマシンを記録する- 現在の状態を保存するために
persistWorkflowSnapshot()
が呼び出される - スナップショットはシリアライズされ、設定されたデータベースの
workflow_snapshots
テーブルに保存される - 保存レコードには、ワークフロー名、実行 ID、シリアライズ済みのスナップショットが含まれる
スナップショットの取得
ワークフローが再開されると、Mastra は次の手順で永続化されたスナップショットを取得します:
- 特定のステップ ID を指定して
resume()
メソッドが呼び出される loadWorkflowSnapshot()
を使用してストレージからスナップショットを読み込む- スナップショットを解析し、再開のために準備する
- スナップショットの状態を用いてワークフローの実行を再構築する
- 一時停止していたステップを再開し、実行を継続する
const storage = mastra.getStorage();
const snapshot = await storage!.loadWorkflowSnapshot({
runId: "<run-id>",
workflowName: "<workflow-id>"
});
console.log(snapshot);
スナップショットのストレージオプション
スナップショットは、Mastra
クラスで構成された storage
インスタンスを使用して永続化されます。このストレージレイヤーは、そのインスタンスに登録されたすべてのワークフローで共有されます。Mastra は、さまざまな環境に柔軟に対応するため、複数のストレージオプションをサポートしています。
LibSQL @mastra/libsql
この例では、LibSQL でスナップショットを使う方法を示します。
import { Mastra } from "@mastra/core/mastra";
import { LibSQLStore } from "@mastra/libsql";
export const mastra = new Mastra({
// ...
storage: new LibSQLStore({
url: ":memory:"
})
});
Upstash @mastra/upstash
この例では、Upstash でスナップショットを使う方法を紹介します。
import { Mastra } from "@mastra/core/mastra";
import { UpstashStore } from "@mastra/upstash";
export const mastra = new Mastra({
// ...
storage: new UpstashStore({
url: "<upstash-redis-rest-url>",
token: "<upstash-redis-rest-token>"
})
})
Postgres @mastra/pg
この例では、PostgreSQLでスナップショットを利用する方法を紹介します。
import { Mastra } from "@mastra/core/mastra";
import { PostgresStore } from "@mastra/pg";
export const mastra = new Mastra({
// ...
storage: new PostgresStore({
connectionString: "<database-url>"
})
});
ベストプラクティス
- シリアライズ可能であることを確保する: スナップショットに含める必要があるデータは、必ずシリアライズ可能(JSON に変換可能)であること。
- スナップショットのサイズを最小化する: 大きなデータオブジェクトをワークフローのコンテキストに直接保存しない。代わりに ID などの参照を保存し、必要に応じてデータを取得する。
- 再開時のコンテキストを慎重に扱う: ワークフローを再開する際にどのコンテキストを与えるかを慎重に検討する。これは既存のスナップショットデータにマージされる。
- 適切なモニタリングを構築する: 一時停止中のワークフロー、特に長時間実行のものに対して監視を実装し、適切に再開されるようにする。
- ストレージのスケーリングを考慮する: 多数の一時停止ワークフローがあるアプリケーションでは、ストレージソリューションが適切にスケールしていることを確認する。
カスタムスナップショットメタデータ
suspendSchema
を定義することで、ワークフローを一時停止する際にカスタムメタデータを添付できます。このメタデータはスナップショットに保存され、ワークフロー再開時に利用可能になります。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const approvalStep = createStep({
id: "approval-step",
description: "Accepts a value, waits for confirmation",
inputSchema: z.object({
value: z.number(),
user: z.string(),
requiredApprovers: z.array(z.string())
}),
suspendSchema: z.object({
message: z.string(),
requestedBy: z.string(),
approvers: z.array(z.string())
}),
resumeSchema: z.object({
confirm: z.boolean(),
approver: z.string()
}),
outputSchema: z.object({
value: z.number(),
approved: z.boolean()
}),
execute: async ({ inputData, resumeData, suspend }) => {
const { value, user, requiredApprovers } = inputData;
const { confirm } = resumeData ?? {};
if (!confirm) {
return await suspend({
message: "Workflow suspended",
requestedBy: user,
approvers: [...requiredApprovers]
});
}
return {
value,
approved: confirm
};
}
});
再開用データの提供
一時停止されたステップを再開する際に、構造化された入力を渡すには resumeData
を使用します。これはステップの resumeSchema
に適合している必要があります。
const workflow = mastra.getWorkflow("approvalWorkflow");
const run = await workflow.createRunAsync();
const result = await run.start({
inputData: {
value: 100,
user: "Michael",
requiredApprovers: ["manager", "finance"]
}
});
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: "approval-step",
resumeData: {
confirm: true,
approver: "manager"
}
});
}