一時停止と再開
ワークフローは任意のステップで一時停止でき、現在の状態はストレージにスナップショットとして永続化されます。実行は、準備が整い次第そのスナップショットから再開できます。スナップショットを永続化しておくことで、セッションやデプロイ、サーバー再起動をまたいでもワークフローの状態が保持され、外部からの入力やリソースを待つ間、長時間一時停止される可能性のあるワークフローに不可欠です。
ワークフローを一時停止する一般的なシナリオには次のようなものがあります:
- 人手による承認・入力を待つ
- 外部 API やリソースの利用可能化まで待機する
- 後続ステップに必要な追加データを収集する
- 高コストな処理に対してレート制限やスロットリングを行う
- 外部トリガーを伴うイベント駆動型プロセスを扱う
ワークフローのステータス種別
ワークフローを実行しているとき、status
は次のいずれかになります:
running
- ワークフローを実行中suspended
- ワークフローを一時停止中success
- ワークフローが完了failed
- ワークフローが失敗
suspend()
でワークフローを一時停止する
特定のステップでユーザー入力があるまで実行を止めたい場合は、suspend
関数でワークフローを一時停止し、必要なデータが提供されたときにのみ再開できるようにします。
const step1 = createStep({
id: "step-1",
inputSchema: z.object({
input: z.string()
}),
outputSchema: z.object({
output: z.string()
}),
resumeSchema: z.object({
city: z.string()
}),
execute: async ({ resumeData, suspend }) => {
const { city } = resumeData ?? {};
if (!city) {
return await suspend({});
}
return { output: "" };
}
});
export const testWorkflow = createWorkflow({
// ...
})
.then(step1)
.commit();
さらに詳しくは、Suspend workflow example を参照してください。
一時停止中のステップを特定する
一時停止中のワークフローを再開するには、結果の suspended
配列を確認し、どのステップが入力を必要としているかを特定します。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(JSON.stringify(result, null, 2));
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: result.suspended[0],
resumeData: {
city: "Berlin"
}
});
}
詳細は Run Workflow Results を参照してください。
この例では、suspended
配列に並んだ最初のステップを再開します。step
は id
で指定することもでき、例として ‘step-1’ のように指定します。
{
"status": "suspended",
"steps": {
// ...
"step-1": {
// ...
"status": "suspended",
}
},
"suspended": [
[
"step-1"
]
]
}
suspend を使ったユーザーフィードバックの提供
ワークフローが一時停止(サスペンド)された場合、suspendSchema
を通じてユーザーにフィードバックを表示できます。ワークフローが一時停止した理由を説明するために、suspend
のペイロードに理由を含めてください。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: "step-1",
inputSchema: z.object({
value: z.string()
}),
resumeSchema: z.object({
confirm: z.boolean()
}),
suspendSchema: z.object({
reason: z.string()
}),
outputSchema: z.object({
value: z.string()
}),
execute: async ({ resumeData, suspend }) => {
const { confirm } = resumeData ?? {};
if (!confirm) {
return await suspend({
reason: "Confirm to continue"
});
}
return { value: "" };
}
});
export const testWorkflow = createWorkflow({
// ...
})
.then(step1)
.commit();
この場合、理由として「続行するには確認が必要である」ことが示されています。
{
"step-1": {
// ...
"status": "suspended",
"suspendPayload": {
"reason": "Confirm to continue"
},
}
}
詳細は Run Workflow Results を参照してください。
resume()
を使ってワークフローを再開する
ワークフローは resume
を呼び出し、必要な resumeData
を渡すことで再開できます。どのステップから再開するかを明示的に指定することもできますし、ちょうど1つのステップだけが一時停止されている場合は step
パラメータを省略すると、そのステップが自動的に再開されます。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: {
city: "London"
}
});
console.log(JSON.stringify(result, null, 2));
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: 'step-1',
resumeData: {
city: "Berlin"
}
});
console.log(JSON.stringify(resumedResult, null, 2));
}
ちょうど1つのステップだけが一時停止されている場合は step
パラメータを省略できます:
const resumedResult = await run.resume({
resumeData: {
city: "Berlin"
},
// step parameter omitted - automatically resumes the single suspended step
});
ネストされたワークフローの再開
一時停止されたネストされたワークフローを再開するには、resume
関数の step
パラメータにそのワークフローインスタンスを渡します。
const dowhileWorkflow = createWorkflow({
id: 'dowhile-workflow',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
})
.dountil(
createWorkflow({
id: 'simple-resume-workflow',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
steps: [incrementStep, resumeStep],
})
.then(incrementStep)
.then(resumeStep)
.commit(),
async ({ inputData }) => inputData.value >= 10,
)
.then(
createStep({
id: 'final',
inputSchema: z.object({ value: z.number() }),
outputSchema: z.object({ value: z.number() }),
execute: async ({ inputData }) => ({ value: inputData.value }),
}),
)
.commit();
const run = await dowhileWorkflow.createRunAsync();
const result = await run.start({ inputData: { value: 0 } });
if (result.status === "suspended") {
const resumedResult = await run.resume({
resumeData: { value: 2 },
step: ['simple-resume-workflow', 'resume'],
});
console.log(JSON.stringify(resumedResult, null, 2));
}
suspend/resume における RuntimeContext
の使用
suspend/resume
を RuntimeContext
と併用する場合、インスタンスを自分で作成し、それを start
と resume
関数に渡せます。
RuntimeContext
はワークフローの実行間で自動的に共有されません。
import { RuntimeContext } from "@mastra/core/di";
import { mastra } from "./mastra";
const runtimeContext = new RuntimeContext();
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: { suggestions: ["London", "Paris", "New York"] },
runtimeContext
});
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: 'step-1',
resumeData: { city: "New York" },
runtimeContext
});
}