一時停止と再開
ワークフローは任意のステップで一時停止でき、現在の状態はストレージにスナップショットとして保存・永続化されます。準備が整えば、その保存済みスナップショットから実行を再開できます。スナップショットを永続化しておくことで、セッションやデプロイ、サーバーの再起動をまたいでもワークフローの状態が維持され、外部からの入力やリソースを待って長時間一時停止する可能性のあるワークフローにとって不可欠です。
ワークフローを一時停止する一般的なシナリオは次のとおりです:
- 人による承認や入力を待つ
- 外部 API やリソースが利用可能になるまで待機する
- 後続のステップに必要な追加データを収集する
- コストの高い処理をレート制限やスロットリングで抑制する
- 外部トリガーを伴うイベントドリブンなプロセスを処理する
一時停止と再開は初めてですか? 公式の動画チュートリアルをご覧ください:
- Mastering Human-in-the-Loop with Suspend & Resume - ワークフローを一時停止し、ユーザー入力を受け付ける方法を学べます
- Building Multi-Turn Chat Interfaces with React - React のチャットインターフェースで人を介したマルチターンのやり取りを実装します
ワークフローのステータス種別
ワークフローを実行すると、そのstatus
は次のいずれかになります:
running
- ワークフローは実行中ですsuspended
- ワークフローは一時停止中ですsuccess
- ワークフローは完了しましたfailed
- ワークフローは失敗しました
suspend()
を使ってワークフローを一時停止する
ユーザー入力が得られるまで特定のステップで実行を止めたい場合は、suspend
関数を使ってワークフローを一時停止し、必要なデータが提供されたときにのみ再開できるようにします。
const step1 = createStep({
id: "step-1",
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
}),
resumeSchema: z.object({
city: z.string()
}),
execute: async ({ resumeData, suspend }) => {
const { city } = resumeData ?? {};
if (!city) {
return await suspend({});
}
return { output: "" };
}
});
export const testWorkflow = createWorkflow({
// ...
})
.then(step1)
.commit();
詳細は、Suspend workflow example を参照してください。
一時停止中のステップの特定
一時停止されたワークフローを再開するには、結果の suspended
配列を確認して、どのステップが入力を必要としているかを判断します。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(JSON.stringify(result, null, 2));
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: result.suspended[0],
resumeData: {
city: "Berlin"
}
});
}
この場合、suspended
配列に並んだ最初のステップから再開します。step
は id
を使って指定することもできます。例: ‘step-1’。
{
"status": "suspended",
"steps": {
// ...
"step-1": {
// ...
"status": "suspended",
}
},
"suspended": [
[
"step-1"
]
]
}
詳細は Run Workflow Results を参照してください。
suspend を用いたユーザーフィードバックの提示
ワークフローが一時停止された場合、suspendSchema
を通じてユーザーにフィードバックを表示できます。ワークフローが一時停止した理由を伝えるために、suspend
のペイロードに理由を含めてください。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: "step-1",
inputSchema: z.object({
value: z.string()
}),
resumeSchema: z.object({
confirm: z.boolean()
}),
suspendSchema: z.object({
reason: z.string()
}),
outputSchema: z.object({
value: z.string()
}),
execute: async ({ resumeData, suspend }) => {
const { confirm } = resumeData ?? {};
if (!confirm) {
return await suspend({
reason: "Confirm to continue"
});
}
return { value: "" };
}
});
export const testWorkflow = createWorkflow({
// ...
})
.then(step1)
.commit();
この場合、理由として「続行するには確認が必要」であることが示されます。
{
"step-1": {
// ...
"status": "suspended",
"suspendPayload": {
"reason": "Confirm to continue"
},
}
}
詳細は Run Workflow Results を参照してください。
resume()
を使ってワークフローを再開する
ワークフローは resume
を呼び出し、必要な resumeData
を渡すことで再開できます。どのステップから再開するかを明示的に指定することもできますし、一時停止中のステップがちょうど1つだけの場合は step
パラメータを省略すると、そのステップが自動的に再開されます。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(JSON.stringify(result, null, 2));
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: 'step-1',
resumeData: {
city: "Berlin"
}
});
console.log(JSON.stringify(resumedResult, null, 2));
}
一時停止中のステップがちょうど1つだけの場合は、step
パラメータを省略できます:
const resumedResult = await run.resume({
resumeData: {
city: "Berlin"
},
// step parameter omitted - automatically resumes the single suspended step
});
ネストされたワークフローの再開
一時停止中のネストされたワークフローを再開するには、ワークフローインスタンスを resume
関数の step
パラメータに渡します。
const dowhileWorkflow = createWorkflow({
id: 'dowhile-workflow',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
})
.dountil(
createWorkflow({
id: 'simple-resume-workflow',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
steps: [incrementStep, resumeStep],
})
.then(incrementStep)
.then(resumeStep)
.commit(),
async ({ inputData }) => inputData.value >= 10,
)
.then(
createStep({
id: 'final',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
execute: async ({ inputData }) => ({ value: inputData.value }),
}),
)
.commit();
const run = await dowhileWorkflow.createRunAsync();
const result = await run.start({ inputData: { value: 0 } });
if (result.status === "suspended") {
const resumedResult = await run.resume({
resumeData: { value: 2 },
step: ['simple-resume-workflow', 'resume'],
});
console.log(JSON.stringify(resumedResult, null, 2));
}
suspend/resume における RuntimeContext
の使用
suspend/resume
と RuntimeContext
を併用する場合は、インスタンスを自分で作成し、それを start
と resume
関数に渡せます。
RuntimeContext
はワークフローの実行間で自動的には共有されません。
import { RuntimeContext } from "@mastra/core/di";
import { mastra } from "./mastra";
const runtimeContext = new RuntimeContext();
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: { suggestions: ["London", "Paris", "New York"] },
runtimeContext
});
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: 'step-1',
resumeData: { city: "New York" },
runtimeContext
});
}