ワークフローの制御フロー:分岐、マージ、条件
複数ステップのプロセスを作成する場合、ステップを並行して実行したり、順番に連鎖させたり、結果に基づいて異なるパスをたどる必要があるかもしれません。このページでは、論理的要件を満たすワークフローを構築するために、分岐、マージ、条件をどのように管理できるかを説明します。コードスニペットは、複雑な制御フローを構築するための主要なパターンを示しています。
並列実行
互いに依存関係のないステップを同時に実行することができます。ステップが独立したタスクを実行する場合、このアプローチによってワークフローを高速化できます。以下のコードは、2つのステップを並列に追加する方法を示しています:
myWorkflow.step(fetchUserData).step(fetchOrderData);
詳細については、並列ステップの例を参照してください。
順次実行
時には、あるステップの出力が次のステップの入力になるように、厳密な順序でステップを実行する必要があります。依存する操作をリンクするには .then() を使用します。以下のコードは、ステップを順番に連鎖させる方法を示しています:
myWorkflow.step(fetchOrderData).then(validateData).then(processOrder);
詳細については、順次ステップの例を参照してください。
分岐と合流パス
異なる結果に対して異なるパスが必要な場合、分岐が役立ちます。また、完了後にパスを後で合流させることもできます。以下のコードは、stepAの後に分岐し、後でstepFで収束する方法を示しています:
myWorkflow
.step(stepA)
.then(stepB)
.then(stepD)
.after(stepA)
.step(stepC)
.then(stepE)
.after([stepD, stepE])
.step(stepF);
この例では:
- stepAはstepBに進み、その後stepDに進みます。
- 別途、stepAはstepCもトリガーし、それがstepEにつながります。
- 別途、stepFはstepDとstepEの両方が完了したときにトリガーされます。
詳細については、分岐パスの例を参照してください。
複数のブランチのマージ
時には、複数の他のステップが完了した後にのみステップを実行する必要があります。Mastraは、ステップに対して複数の依存関係を指定できる複合的な.after([])
構文を提供しています。
myWorkflow
.step(fetchUserData)
.then(validateUserData)
.step(fetchProductData)
.then(validateProductData)
// このステップは、validateUserDataとvalidateProductDataの両方が完了した後にのみ実行されます
.after([validateUserData, validateProductData])
.step(processOrder)
この例では:
fetchUserData
とfetchProductData
は並列ブランチで実行されます- 各ブランチには独自の検証ステップがあります
processOrder
ステップは、両方の検証ステップが正常に完了した後にのみ実行されます
このパターンは特に以下の場合に役立ちます:
- 並列実行パスの結合
- ワークフローに同期ポイントを実装する
- 進行する前にすべての必要なデータが利用可能であることを確認する
複数の.after([])
呼び出しを組み合わせることで、複雑な依存関係パターンを作成することもできます:
myWorkflow
// 最初のブランチ
.step(stepA)
.then(stepB)
.then(stepC)
// 2番目のブランチ
.step(stepD)
.then(stepE)
// 3番目のブランチ
.step(stepF)
.then(stepG)
// このステップは複数のブランチの完了に依存しています
.after([stepC, stepE, stepG])
.step(finalStep)
循環依存とループ
ワークフローでは、特定の条件が満たされるまでステップを繰り返す必要があることがよくあります。Mastraはループを作成するための2つの強力な方法を提供しています:until
とwhile
です。これらのメソッドは、繰り返しタスクを実装するための直感的な方法を提供します。
手動循環依存の使用(レガシーアプローチ)
以前のバージョンでは、条件付きの循環依存関係を手動で定義してループを作成することができました:
myWorkflow
.step(fetchData)
.then(processData)
.after(processData)
.step(finalizeData, {
when: { "processData.status": "success" },
})
.step(fetchData, {
when: { "processData.status": "retry" },
});
このアプローチは引き続き機能しますが、新しいuntil
およびwhile
メソッドは、ループを作成するためのよりクリーンで保守しやすい方法を提供します。
条件ベースのループにuntil
を使用する
until
メソッドは、指定された条件が真になるまでステップを繰り返します。以下の引数を取ります:
- ループを停止するタイミングを決定する条件
- 繰り返すステップ
- 繰り返されるステップに渡すオプションの変数
// ターゲットに達するまでカウンターをインクリメントするステップ
const incrementStep = new Step({
id: 'increment',
inputSchema: z.object({
// 現在のカウンター値
counter: z.number().optional(),
}),
outputSchema: z.object({
// 更新されたカウンター値
updatedCounter: z.number(),
}),
execute: async ({ context }) => {
const { counter = 0 } = context.inputData;
return { updatedCounter: counter + 1 };
},
});
workflow
.step(incrementStep)
.until(
async ({ context }) => {
// カウンターが10に達したら停止
const result = context.getStepResult(incrementStep);
return (result?.updatedCounter ?? 0) >= 10;
},
incrementStep,
{
// 現在のカウンターを次の反復に渡す
counter: {
step: incrementStep,
path: 'updatedCounter'
}
}
)
.then(finalStep);
参照ベースの条件を使用することもできます:
workflow
.step(incrementStep)
.until(
{
ref: { step: incrementStep, path: 'updatedCounter' },
query: { $gte: 10 },
},
incrementStep,
{
counter: {
step: incrementStep,
path: 'updatedCounter'
}
}
)
.then(finalStep);
条件ベースのループにwhile
を使用する
while
メソッドは、指定された条件が真である限りステップを繰り返します。until
と同じ引数を取ります:
- ループを継続するタイミングを決定する条件
- 繰り返すステップ
- 繰り返されるステップに渡すオプションの変数
// ターゲット未満の間カウンターをインクリメントするステップ
const incrementStep = new Step({
id: 'increment',
inputSchema: z.object({
// 現在のカウンター値
counter: z.number().optional(),
}),
outputSchema: z.object({
// 更新されたカウンター値
updatedCounter: z.number(),
}),
execute: async ({ context }) => {
const { counter = 0 } = context.inputData;
return { updatedCounter: counter + 1 };
},
});
workflow
.step(incrementStep)
.while(
async ({ context }) => {
// カウンターが10未満の間継続
const result = context.getStepResult(incrementStep);
return (result?.updatedCounter ?? 0) < 10;
},
incrementStep,
{
// 現在のカウンターを次の反復に渡す
counter: {
step: incrementStep,
path: 'updatedCounter'
}
}
)
.then(finalStep);
参照ベースの条件を使用することもできます:
workflow
.step(incrementStep)
.while(
{
ref: { step: incrementStep, path: 'updatedCounter' },
query: { $lt: 10 },
},
incrementStep,
{
counter: {
step: incrementStep,
path: 'updatedCounter'
}
}
)
.then(finalStep);
参照条件の比較演算子
参照ベースの条件を使用する場合、以下の比較演算子を使用できます:
演算子 | 説明 |
---|---|
$eq | 等しい |
$ne | 等しくない |
$gt | より大きい |
$gte | 以上 |
$lt | より小さい |
$lte | 以下 |
条件
前のステップからのデータに基づいてステップを実行するかどうかを制御するには、when プロパティを使用します。以下は条件を指定する3つの方法です。
オプション1:関数
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: async ({ context }) => {
const fetchData = context?.getStepResult<{ status: string }>("fetchData");
return fetchData?.status === "success";
},
},
);
オプション2:クエリオブジェクト
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: {
ref: {
step: {
id: "fetchData",
},
path: "status",
},
query: { $eq: "success" },
},
},
);
オプション3:シンプルなパス比較
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: {
"fetchData.status": "success",
},
},
);
データアクセスパターン
Mastraはステップ間でデータを受け渡すためのいくつかの方法を提供しています:
- コンテキストオブジェクト - コンテキストオブジェクトを通じてステップの結果に直接アクセスする
- 変数マッピング - あるステップの出力を別のステップの入力に明示的にマッピングする
- getStepResultメソッド - ステップの出力を取得するための型安全なメソッド
各アプローチは、ユースケースや型安全性の要件に応じて、それぞれ利点があります。
getStepResultメソッドの使用
getStepResult
メソッドは、ステップの結果にアクセスするための型安全な方法を提供します。TypeScriptを使用する場合は、型情報を保持するためにこのアプローチが推奨されます。
基本的な使用法
より良い型安全性のために、getStepResult
に型パラメータを提供できます:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const fetchUserStep = new Step({
id: 'fetchUser',
outputSchema: z.object({
name: z.string(),
userId: z.string(),
}),
execute: async ({ context }) => {
return { name: 'John Doe', userId: '123' };
},
});
const analyzeDataStep = new Step({
id: "analyzeData",
execute: async ({ context }) => {
// Type-safe access to previous step result
const userData = context.getStepResult<{ name: string, userId: string }>("fetchUser");
if (!userData) {
return { status: "error", message: "User data not found" };
}
return {
analysis: `Analyzed data for user ${userData.name}`,
userId: userData.userId
};
},
});
ステップ参照の使用
最も型安全なアプローチは、getStepResult
呼び出しでステップを直接参照することです:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Define step with output schema
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processUserStep = new Step({
id: "processUser",
execute: async ({ context }) => {
// TypeScript will infer the correct type from fetchUserStep's outputSchema
const userData = context.getStepResult(fetchUserStep);
return {
processed: true,
userName: userData?.name
};
},
});
const workflow = new Workflow({
name: "user-workflow",
});
workflow
.step(fetchUserStep)
.then(processUserStep)
.commit();
変数マッピングの使用
変数マッピングは、ステップ間のデータフローを定義する明示的な方法です。
このアプローチは依存関係を明確にし、優れた型安全性を提供します。
ステップに注入されたデータはcontext.inputData
オブジェクトで利用可能であり、ステップのinputSchema
に基づいて型付けされます。
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const sendEmailStep = new Step({
id: "sendEmail",
inputSchema: z.object({
recipientEmail: z.string(),
recipientName: z.string(),
}),
execute: async ({ context }) => {
const { recipientEmail, recipientName } = context.inputData;
// Send email logic here
return {
status: "sent",
to: recipientEmail
};
},
});
const workflow = new Workflow({
name: "email-workflow",
});
workflow
.step(fetchUserStep)
.then(sendEmailStep, {
variables: {
// Map specific fields from fetchUser to sendEmail inputs
recipientEmail: { step: fetchUserStep, path: 'email' },
recipientName: { step: fetchUserStep, path: 'name' }
}
})
.commit();
変数マッピングの詳細については、ワークフロー変数によるデータマッピングのドキュメントを参照してください。
コンテキストオブジェクトの使用
コンテキストオブジェクトは、すべてのステップの結果とその出力に直接アクセスできます。このアプローチはより柔軟ですが、型の安全性を維持するために慎重な取り扱いが必要です。
context.steps
オブジェクトを通じてステップの結果に直接アクセスできます:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const processOrderStep = new Step({
id: 'processOrder',
execute: async ({ context }) => {
// Access data from a previous step
let userData: { name: string, userId: string };
if (context.steps['fetchUser']?.status === 'success') {
userData = context.steps.fetchUser.output;
} else {
throw new Error('User data not found');
}
return {
orderId: 'order123',
userId: userData.userId,
status: 'processing',
};
},
});
const workflow = new Workflow({
name: "order-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.commit();
ワークフローレベルの型安全性
ワークフロー全体で包括的な型安全性を確保するには、すべてのステップの型を定義し、それらをワークフローに渡すことができます。 これにより、条件のコンテキストオブジェクト、および最終的なワークフロー出力のステップ結果に対して型安全性を確保できます。
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Create steps with typed outputs
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// TypeScript knows the shape of userData
const userData = context.getStepResult(fetchUserStep);
return {
orderId: "order123",
status: "processing"
};
},
});
const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
name: "typed-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.until(async ({ context }) => {
// TypeScript knows the shape of userData here
const res = context.getStepResult('fetchUser');
return res?.userId === '123';
}, processOrderStep)
.commit();
トリガーデータへのアクセス
ステップ結果に加えて、ワークフローを開始した元のトリガーデータにアクセスできます:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Define trigger schema
const triggerSchema = z.object({
customerId: z.string(),
orderItems: z.array(z.string()),
});
type TriggerType = z.infer<typeof triggerSchema>;
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// Access trigger data with type safety
const triggerData = context.getStepResult<TriggerType>('trigger');
return {
customerId: triggerData?.customerId,
itemCount: triggerData?.orderItems.length || 0,
status: "processing"
};
},
});
const workflow = new Workflow({
name: "order-workflow",
triggerSchema,
});
workflow
.step(processOrderStep)
.commit();
再開データへのアクセス
ステップに注入されたデータはcontext.inputData
オブジェクトで利用でき、ステップのinputSchema
に基づいて型付けされています。
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const processOrderStep = new Step({
id: "processOrder",
inputSchema: z.object({
orderId: z.string(),
}),
execute: async ({ context, suspend }) => {
const { orderId } = context.inputData;
if (!orderId) {
await suspend();
return;
}
return {
orderId,
status: "processed"
};
},
});
const workflow = new Workflow({
name: "order-workflow",
});
workflow
.step(processOrderStep)
.commit();
const run = workflow.createRun();
const result = await run.start();
const resumedResult = await workflow.resume({
runId: result.runId,
stepId: 'processOrder',
inputData: {
orderId: '123',
},
});
console.log({resumedResult});
ワークフローの結果へのアクセス
ワークフローの結果に型付きでアクセスするには、ステップの型をWorkflow
型パラメータに注入します:
import { Workflow } from "@mastra/core/workflows";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processOrderStep = new Step({
id: "processOrder",
outputSchema: z.object({
orderId: z.string(),
status: z.string(),
}),
execute: async ({ context }) => {
const userData = context.getStepResult(fetchUserStep);
return {
orderId: "order123",
status: "processing"
};
},
});
const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
name: "typed-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.commit();
const run = workflow.createRun();
const result = await run.start();
// 結果はステップ結果の判別共用体です
// そのためステータスチェックによって絞り込む必要があります
if (result.results.processOrder.status === 'success') {
// TypeScriptは結果の形状を認識します
const orderId = result.results.processOrder.output.orderId;
console.log({orderId});
}
if (result.results.fetchUser.status === 'success') {
const userId = result.results.fetchUser.output.userId;
console.log({userId});
}
データフローのベストプラクティス
-
型安全性のためにステップ参照でgetStepResultを使用する
- TypeScriptが正しい型を推論できるようにします
- コンパイル時に型エラーを検出します
-
明示的な依存関係のために変数マッピングを使用する
- データフローを明確で保守しやすくします
- ステップの依存関係の良いドキュメントを提供します
-
ステップの出力スキーマを定義する
- 実行時にデータを検証します
execute
関数の戻り値の型を検証します- TypeScriptでの型推論を改善します
-
欠落データを適切に処理する
- プロパティにアクセスする前に常にステップ結果が存在するかチェックします
- オプションデータにフォールバック値を提供します
-
データ変換をシンプルに保つ
- 変数マッピングではなく専用のステップでデータを変換します
- ワークフローのテストとデバッグが容易になります
データフロー手法の比較
手法 | 型安全性 | 明示性 | ユースケース |
---|---|---|---|
getStepResult | 最高 | 高 | 厳格な型要件を持つ複雑なワークフロー |
変数マッピング | 高 | 高 | 依存関係を明確かつ明示的にする必要がある場合 |
context.steps | 中 | 低 | シンプルなワークフローでのステップデータへの迅速なアクセス |
ユースケースに適したデータフロー手法を選択することで、型安全で保守しやすいワークフローを作成できます。