ネストされたワークフロー
Mastraでは、ワークフローを他のワークフロー内のステップとして使用することができ、モジュール式で再利用可能なワークフローコンポーネントを作成できます。この機能により、複雑なワークフローをより小さく管理しやすい部分に整理し、コードの再利用を促進することができます。
また、親ワークフロー内のステップとしてネストされたワークフローを視覚的に確認できるため、ワークフローの流れを理解しやすくなります。
基本的な使用方法
step()
メソッドを使用して、あるワークフローを別のワークフローのステップとして直接使用できます:
// ネストされたワークフローを作成
const nestedWorkflow = new Workflow({ name: "nested-workflow" })
.step(stepA)
.then(stepB)
.commit();
// 親ワークフローでネストされたワークフローを使用
const parentWorkflow = new Workflow({ name: "parent-workflow" })
.step(nestedWorkflow, {
variables: {
city: {
step: "trigger",
path: "myTriggerInput",
},
},
})
.then(stepC)
.commit();
ワークフローがステップとして使用される場合:
- ワークフローの名前をステップIDとして使用し、自動的にステップに変換されます
- ワークフローの結果は親ワークフローのコンテキストで利用可能になります
- ネストされたワークフローのステップは定義された順序で実行されます
結果へのアクセス
ネストされたワークフローからの結果は、親ワークフローのコンテキスト内でネストされたワークフローの名前の下で利用できます。結果には、ネストされたワークフローからのすべてのステップ出力が含まれます:
const { results } = await parentWorkflow.start();
// Access nested workflow results
const nestedWorkflowResult = results["nested-workflow"];
if (nestedWorkflowResult.status === "success") {
const nestedResults = nestedWorkflowResult.output.results;
}
ネストされたワークフローによるフロー制御
ネストされたワークフローは、通常のステップで利用可能なすべてのフロー制御機能をサポートしています:
並列実行
複数のネストされたワークフローを並列で実行できます:
parentWorkflow
.step(nestedWorkflowA)
.step(nestedWorkflowB)
.after([nestedWorkflowA, nestedWorkflowB])
.step(finalStep);
または、ワークフローの配列を使用したstep()
を使用する方法:
parentWorkflow.step([nestedWorkflowA, nestedWorkflowB]).then(finalStep);
この場合、then()
は最終ステップを実行する前に、すべてのワークフローが完了するのを暗黙的に待ちます。
If-Else分岐
ネストされたワークフローは、両方の分岐を引数として受け入れる新しい構文でif-else分岐で使用できます:
// 異なるパス用のネストされたワークフローを作成
const workflowA = new Workflow({ name: "workflow-a" })
.step(stepA1)
.then(stepA2)
.commit();
const workflowB = new Workflow({ name: "workflow-b" })
.step(stepB1)
.then(stepB2)
.commit();
// ネストされたワークフローで新しいif-else構文を使用
parentWorkflow
.step(initialStep)
.if(
async ({ context }) => {
// ここに条件を記述
return someCondition;
},
workflowA, // if分岐
workflowB, // else分岐
)
.then(finalStep)
.commit();
新しい構文は、ネストされたワークフローを扱う際により簡潔で明確です。条件が:
true
の場合:最初のワークフロー(if分岐)が実行されますfalse
の場合:2番目のワークフロー(else分岐)が実行されます
スキップされたワークフローは結果でskipped
ステータスになります:
if-elseブロックの後に続く.then(finalStep)
呼び出しは、ifとelse分岐を単一の実行パスに戻します。
ループ処理
ネストされたワークフローは、他のステップと同様に.until()
と.while()
ループを使用できます。興味深い新しいパターンの一つは、ワークフローを直接ループバック引数として渡し、その結果に関する何かが真になるまでそのネストされたワークフローを実行し続けることです:
parentWorkflow
.step(firstStep)
.while(
({ context }) =>
context.getStepResult("nested-workflow").output.results.someField ===
"someValue",
nestedWorkflow,
)
.step(finalStep)
.commit();
ネストされたワークフローの監視
親ワークフローのwatch
メソッドを使用して、ネストされたワークフローの状態変化を監視することができます。これは複雑なワークフローの進行状況や状態遷移をモニタリングするのに役立ちます:
const parentWorkflow = new Workflow({ name: "parent-workflow" })
.step([nestedWorkflowA, nestedWorkflowB])
.then(finalStep)
.commit();
const run = parentWorkflow.createRun();
const unwatch = parentWorkflow.watch((state) => {
console.log("Current state:", state.value);
// Access nested workflow states in state.context
});
await run.start();
unwatch(); // Stop watching when done
一時停止と再開
ネストされたワークフローは一時停止と再開をサポートしており、特定のポイントでワークフロー実行を一時停止して続行することができます。ネストされたワークフロー全体または特定のステップを一時停止することができます:
// Define a step that may need to suspend
const suspendableStep = new Step({
id: "other",
description: "Step that may need to suspend",
execute: async ({ context, suspend }) => {
if (!wasSuspended) {
wasSuspended = true;
await suspend();
}
return { other: 26 };
},
});
// Create a nested workflow with suspendable steps
const nestedWorkflow = new Workflow({ name: "nested-workflow-a" })
.step(startStep)
.then(suspendableStep)
.then(finalStep)
.commit();
// Use in parent workflow
const parentWorkflow = new Workflow({ name: "parent-workflow" })
.step(beginStep)
.then(nestedWorkflow)
.then(lastStep)
.commit();
// Start the workflow
const run = parentWorkflow.createRun();
const { runId, results } = await run.start({ triggerData: { startValue: 1 } });
// Check if a specific step in the nested workflow is suspended
if (results["nested-workflow-a"].output.results.other.status === "suspended") {
// Resume the specific suspended step using dot notation
const resumedResults = await run.resume({
stepId: "nested-workflow-a.other",
context: { startValue: 1 },
});
// The resumed results will contain the completed nested workflow
expect(resumedResults.results["nested-workflow-a"].output.results).toEqual({
start: { output: { newValue: 1 }, status: "success" },
other: { output: { other: 26 }, status: "success" },
final: { output: { finalValue: 27 }, status: "success" },
});
}
ネストされたワークフローを再開する場合:
resume()
を呼び出す際に、ワークフロー全体を再開するには、ネストされたワークフローの名前をstepId
として使用します- ネストされたワークフロー内の特定のステップを再開するには、ドット表記(
nested-workflow.step-name
)を使用します - ネストされたワークフローは、提供されたコンテキストで一時停止されたステップから続行されます
- ネストされたワークフローの結果内の特定のステップのステータスを
results["nested-workflow"].output.results
を使用して確認できます
結果スキーマとマッピング
ネストされたワークフローは、結果スキーマとマッピングを定義することができ、これによって型の安全性とデータ変換が容易になります。これは、ネストされたワークフローの出力が特定の構造と一致することを確認したい場合や、結果を親ワークフローで使用する前に変換する必要がある場合に特に役立ちます。
// Create a nested workflow with result schema and mapping
const nestedWorkflow = new Workflow({
name: "nested-workflow",
result: {
schema: z.object({
total: z.number(),
items: z.array(
z.object({
id: z.string(),
value: z.number(),
}),
),
}),
mapping: {
// Map values from step results using variables syntax
total: { step: "step-a", path: "count" },
items: { step: "step-b", path: "items" },
},
},
})
.step(stepA)
.then(stepB)
.commit();
// Use in parent workflow with type-safe results
const parentWorkflow = new Workflow({ name: "parent-workflow" })
.step(nestedWorkflow)
.then(async ({ context }) => {
const result = context.getStepResult("nested-workflow");
// TypeScript knows the structure of result
console.log(result.total); // number
console.log(result.items); // Array<{ id: string, value: number }>
return { success: true };
})
.commit();
ベストプラクティス
- モジュール性: ネストされたワークフローを使用して関連するステップをカプセル化し、再利用可能なワークフローコンポーネントを作成します。
- 命名: ネストされたワークフローには、親ワークフローのステップIDとして使用されるため、説明的な名前を付けてください。
- エラー処理: ネストされたワークフローは親ワークフローにエラーを伝播するため、適切にエラーを処理してください。
- 状態管理: 各ネストされたワークフローは独自の状態を維持しますが、親ワークフローのコンテキストにアクセスできます。
- サスペンション: ネストされたワークフローでサスペンションを使用する場合は、ワークフロー全体の状態を考慮し、適切に再開処理を行ってください。
例
ネストされたワークフローのさまざまな機能を示す完全な例を以下に示します:
const workflowA = new Workflow({
name: "workflow-a",
result: {
schema: z.object({
activities: z.string(),
}),
mapping: {
activities: {
step: planActivities,
path: "activities",
},
},
},
})
.step(fetchWeather)
.then(planActivities)
.commit();
const workflowB = new Workflow({
name: "workflow-b",
result: {
schema: z.object({
activities: z.string(),
}),
mapping: {
activities: {
step: planActivities,
path: "activities",
},
},
},
})
.step(fetchWeather)
.then(planActivities)
.commit();
const weatherWorkflow = new Workflow({
name: "weather-workflow",
triggerSchema: z.object({
cityA: z.string().describe("The city to get the weather for"),
cityB: z.string().describe("The city to get the weather for"),
}),
result: {
schema: z.object({
activitiesA: z.string(),
activitiesB: z.string(),
}),
mapping: {
activitiesA: {
step: workflowA,
path: "result.activities",
},
activitiesB: {
step: workflowB,
path: "result.activities",
},
},
},
})
.step(workflowA, {
variables: {
city: {
step: "trigger",
path: "cityA",
},
},
})
.step(workflowB, {
variables: {
city: {
step: "trigger",
path: "cityB",
},
},
});
weatherWorkflow.commit();
この例では:
- すべてのワークフロー間で型安全性を確保するためのスキーマを定義しています
- 各ステップには適切な入力と出力のスキーマがあります
- ネストされたワークフローには独自のトリガースキーマと結果マッピングがあります
.step()
呼び出しで変数構文を使用してデータが渡されます- メインワークフローは両方のネストされたワークフローからのデータを組み合わせます