Suspend & Resume
ワークフローは任意のステップで一時停止でき、現在の状態はスナップショットとしてストレージに永続化されます。その後、準備が整った時点で、この保存されたスナップショットから実行を再開できます。スナップショットを永続化することで、セッション、デプロイメント、サーバー再起動を跨いでワークフローの状態が維持され、外部入力やリソースを待機している間に一時停止される可能性があるワークフローにとって不可欠です。
ワークフローを一時停止する一般的なシナリオには以下があります:
- 人間の承認や入力を待機する場合
- 外部APIリソースが利用可能になるまで一時停止する場合
- 後のステップで必要な追加データを収集する場合
- 高コストな操作のレート制限やスロットリング
- 外部トリガーを持つイベント駆動プロセスの処理
ワークフローのステータス
ワークフローを実行する際、そのstatus
は以下のいずれかになります:
running
- ワークフローが現在実行中suspended
- ワークフローが一時停止中success
- ワークフローが完了failed
- ワークフローが失敗
Suspend
状態がsuspended
の場合、ワークフロー結果出力のsuspended
配列を確認することで、中断されたすべてのステップを特定できます。
const step1 = createStep({
id: "step-1",
description: "Test suspend",
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
}),
suspendSchema: z.object({}),
resumeSchema: z.object({
city: z.string()
}),
execute: async ({ resumeData, suspend }) => {
if (!(resumeData ?? {}).city) {
await suspend({});
return { output: "" };
}
return {
output: ""
};
}
});
export const testWorkflow = createWorkflow({})
.then(step1)
.commit();
詳細については、中断可能なワークフローの定義を参照してください。
中断されたステップの特定
中断されたワークフローを再開するには、結果のsuspended
配列を調べて、どのステップが入力を必要としているかを判断します:
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(JSON.stringify(result, null, 2));
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: result.suspended[0],
resumeData: {
city: "Berlin"
}
});
}
この場合、ロジックはsuspended
配列にリストされている最初のステップを再開します。step
は、例えば’step-1’のように、そのid
を使用して定義することもできます。
{
"status": "suspended",
"steps": {
// ...
"step-1": {
// ...
"status": "suspended",
}
},
"suspended": [
[
"step-1"
]
]
}
詳細については、ワークフロー実行結果を参照してください。
Resume
ワークフローはresume
を呼び出し、必要なresumeData
を提供することで再開できます。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(JSON.stringify(result, null, 2));
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: 'step-1',
resumeData: {
city: "Berlin"
}
});
console.log(JSON.stringify(resumedResult, null, 2));
}
ネストされたワークフロー
中断されたネストされたワークフローを再開するには、ワークフローインスタンスをresume
関数のstep
パラメータに渡します。
const dowhileWorkflow = createWorkflow({
id: 'dowhile-workflow',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
})
.dountil(
createWorkflow({
id: 'simple-resume-workflow',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
steps: [incrementStep, resumeStep],
})
.then(incrementStep)
.then(resumeStep)
.commit(),
async ({ inputData }) => inputData.value >= 10,
)
.then(
createStep({
id: 'final',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
execute: async ({ inputData }) => ({ value: inputData.value }),
}),
)
.commit();
const run = await dowhileWorkflow.createRunAsync();
const result = await run.start({ inputData: { value: 0 } });
if (result.status === "suspended") {
const resumedResult = await run.resume({
resumeData: { value: 2 },
step: ['simple-resume-workflow', 'resume'],
});
console.log(JSON.stringify(resumedResult, null, 2));
}
RuntimeContext
RuntimeContext
でsuspend/resumeを使用する場合、インスタンスを自分で作成し、それをstart
とresume
関数に渡すことができます。
RuntimeContext
はワークフロー実行時に自動的に共有されません。
import { RuntimeContext } from "@mastra/core/di";
import { mastra } from "./mastra";
const runtimeContext = new RuntimeContext();
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: { suggestions: ["London", "Paris", "New York"] },
runtimeContext
});
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: 'step-1',
resumeData: { city: "New York" },
runtimeContext
});
}