ワークフローにおける制御フロー:分岐、マージ、条件
複数のステップからなるプロセスを作成する際、ステップを並列で実行したり、順番に連鎖させたり、結果に応じて異なる経路をたどる必要がある場合があります。このページでは、分岐、マージ、条件を管理して、ロジック要件を満たすワークフローを構築する方法について説明します。コードスニペットでは、複雑な制御フローを構成するための主要なパターンを示しています。
並列実行
互いに依存しない複数のステップを同時に実行することができます。この方法を使うことで、各ステップが独立したタスクを実行する場合、ワークフローの処理速度を向上させることができます。以下のコードは、2つのステップを並列で追加する方法を示しています。
myWorkflow.step(fetchUserData).step(fetchOrderData);
詳細については、Parallel Steps の例をご覧ください。
順次実行
ある処理の出力を次の処理の入力として利用するために、手順を厳密な順序で実行する必要がある場合があります。依存する操作をつなげるには、.then() を使用します。以下のコードは、手順を順番に連結する方法を示しています。
myWorkflow.step(fetchOrderData).then(validateData).then(processOrder);
詳細については、順次ステップ の例をご覧ください。
分岐と合流パス
異なる結果に対して異なるパスが必要な場合、分岐が役立ちます。また、完了後にパスを後で合流させることもできます。以下のコードは、stepAの後に分岐し、後でstepFで収束する方法を示しています:
myWorkflow
.step(stepA)
.then(stepB)
.then(stepD)
.after(stepA)
.step(stepC)
.then(stepE)
.after([stepD, stepE])
.step(stepF);
この例では:
- stepAはstepBに進み、その後stepDに進みます。
- 別途、stepAはstepCもトリガーし、それがstepEにつながります。
- 別途、stepFはstepDとstepEの両方が完了したときにトリガーされます。
詳細については、分岐パスの例を参照してください。
複数のブランチのマージ
複数の他のステップが完了した後にのみ実行されるステップが必要な場合があります。Mastra では、複数の依存関係をステップに指定できる複合的な .after([])
構文が用意されています。
myWorkflow
.step(fetchUserData)
.then(validateUserData)
.step(fetchProductData)
.then(validateProductData)
// This step will only run after BOTH validateUserData AND validateProductData have completed
.after([validateUserData, validateProductData])
.step(processOrder);
この例では:
fetchUserData
とfetchProductData
は並列のブランチで実行されます- 各ブランチには独自のバリデーションステップがあります
processOrder
ステップは、両方のバリデーションステップが正常に完了した後にのみ実行されます
このパターンは特に以下の場合に便利です:
- 並列実行パスの結合
- ワークフロー内での同期ポイントの実装
- すべての必要なデータが揃ってから次に進むことの保証
複数の .after([])
呼び出しを組み合わせることで、より複雑な依存関係パターンを作成することもできます:
myWorkflow
// First branch
.step(stepA)
.then(stepB)
.then(stepC)
// Second branch
.step(stepD)
.then(stepE)
// Third branch
.step(stepF)
.then(stepG)
// This step depends on the completion of multiple branches
.after([stepC, stepE, stepG])
.step(finalStep);
循環依存関係とループ
ワークフローでは、特定の条件が満たされるまでステップを繰り返す必要がよくあります。Mastra では、ループを作成するための強力な2つの方法、until
と while
を提供しています。これらのメソッドは、繰り返しタスクを直感的に実装する方法を提供します。
手動による循環依存関係の利用(従来のアプローチ)
以前のバージョンでは、条件付きで循環依存関係を手動で定義することでループを作成できました。
myWorkflow
.step(fetchData)
.then(processData)
.after(processData)
.step(finalizeData, {
when: { "processData.status": "success" },
})
.step(fetchData, {
when: { "processData.status": "retry" },
});
この方法も引き続き利用できますが、より新しい until
や while
メソッドを使うことで、よりシンプルで保守しやすいループを作成できます。
until
を使った条件付きループ
until
メソッドは、指定した条件が真になるまでステップを繰り返します。引数は以下の通りです。
- ループを終了する条件
- 繰り返すステップ
- 繰り返しステップに渡すオプションの変数
// 目標値に達するまでカウンターをインクリメントするステップ
const incrementStep = new Step({
id: "increment",
inputSchema: z.object({
// 現在のカウンター値
counter: z.number().optional(),
}),
outputSchema: z.object({
// 更新されたカウンター値
updatedCounter: z.number(),
}),
execute: async ({ context }) => {
const { counter = 0 } = context.inputData;
return { updatedCounter: counter + 1 };
},
});
workflow
.step(incrementStep)
.until(
async ({ context }) => {
// カウンターが10に達したら停止
const result = context.getStepResult(incrementStep);
return (result?.updatedCounter ?? 0) >= 10;
},
incrementStep,
{
// 現在のカウンターを次のイテレーションに渡す
counter: {
step: incrementStep,
path: "updatedCounter",
},
},
)
.then(finalStep);
参照ベースの条件も利用できます。
workflow
.step(incrementStep)
.until(
{
ref: { step: incrementStep, path: "updatedCounter" },
query: { $gte: 10 },
},
incrementStep,
{
counter: {
step: incrementStep,
path: "updatedCounter",
},
},
)
.then(finalStep);
while
を使った条件付きループ
while
メソッドは、指定した条件が真である間ステップを繰り返します。引数は until
と同じです。
- ループを継続する条件
- 繰り返すステップ
- 繰り返しステップに渡すオプションの変数
// 目標値未満の間カウンターをインクリメントするステップ
const incrementStep = new Step({
id: "increment",
inputSchema: z.object({
// 現在のカウンター値
counter: z.number().optional(),
}),
outputSchema: z.object({
// 更新されたカウンター値
updatedCounter: z.number(),
}),
execute: async ({ context }) => {
const { counter = 0 } = context.inputData;
return { updatedCounter: counter + 1 };
},
});
workflow
.step(incrementStep)
.while(
async ({ context }) => {
// カウンターが10未満の間継続
const result = context.getStepResult(incrementStep);
return (result?.updatedCounter ?? 0) < 10;
},
incrementStep,
{
// 現在のカウンターを次のイテレーションに渡す
counter: {
step: incrementStep,
path: "updatedCounter",
},
},
)
.then(finalStep);
参照ベースの条件も利用できます。
workflow
.step(incrementStep)
.while(
{
ref: { step: incrementStep, path: "updatedCounter" },
query: { $lt: 10 },
},
incrementStep,
{
counter: {
step: incrementStep,
path: "updatedCounter",
},
},
)
.then(finalStep);
参照条件で使える比較演算子
参照ベースの条件を使う場合、以下の比較演算子が利用できます。
演算子 | 説明 |
---|---|
$eq | 等しい |
$ne | 等しくない |
$gt | より大きい |
$gte | 以上 |
$lt | より小さい |
$lte | 以下 |
条件
前のステップからのデータに基づいてステップを実行するかどうかを制御するには、when プロパティを使用します。以下は条件を指定する3つの方法です。
オプション1:関数
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: async ({ context }) => {
const fetchData = context?.getStepResult<{ status: string }>("fetchData");
return fetchData?.status === "success";
},
},
);
オプション2:クエリオブジェクト
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: {
ref: {
step: {
id: "fetchData",
},
path: "status",
},
query: { $eq: "success" },
},
},
);
オプション3:シンプルなパス比較
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: {
"fetchData.status": "success",
},
},
);
データアクセスパターン
Mastraはステップ間でデータを受け渡すためのいくつかの方法を提供しています:
- コンテキストオブジェクト - コンテキストオブジェクトを通じてステップの結果に直接アクセスする
- 変数マッピング - あるステップの出力を別のステップの入力に明示的にマッピングする
- getStepResultメソッド - ステップの出力を取得するための型安全なメソッド
各アプローチは、ユースケースや型安全性の要件に応じて、それぞれ利点があります。
getStepResultメソッドの使用
getStepResult
メソッドは、ステップの結果にアクセスするための型安全な方法を提供します。TypeScriptを使用する場合は、型情報を保持するためにこのアプローチが推奨されます。
基本的な使用法
より良い型安全性のために、getStepResult
に型パラメータを提供できます:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
name: z.string(),
userId: z.string(),
}),
execute: async ({ context }) => {
return { name: "John Doe", userId: "123" };
},
});
const analyzeDataStep = new Step({
id: "analyzeData",
execute: async ({ context }) => {
// Type-safe access to previous step result
const userData = context.getStepResult<{ name: string; userId: string }>(
"fetchUser",
);
if (!userData) {
return { status: "error", message: "User data not found" };
}
return {
analysis: `Analyzed data for user ${userData.name}`,
userId: userData.userId,
};
},
});
ステップ参照の使用
最も型安全なアプローチは、getStepResult
呼び出しでステップを直接参照することです:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Define step with output schema
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com",
};
},
});
const processUserStep = new Step({
id: "processUser",
execute: async ({ context }) => {
// TypeScript will infer the correct type from fetchUserStep's outputSchema
const userData = context.getStepResult(fetchUserStep);
return {
processed: true,
userName: userData?.name,
};
},
});
const workflow = new Workflow({
name: "user-workflow",
});
workflow.step(fetchUserStep).then(processUserStep).commit();
変数マッピングの使用
変数マッピングは、ステップ間のデータフローを定義する明示的な方法です。
このアプローチは依存関係を明確にし、優れた型安全性を提供します。
ステップに注入されたデータはcontext.inputData
オブジェクトで利用可能であり、ステップのinputSchema
に基づいて型付けされます。
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com",
};
},
});
const sendEmailStep = new Step({
id: "sendEmail",
inputSchema: z.object({
recipientEmail: z.string(),
recipientName: z.string(),
}),
execute: async ({ context }) => {
const { recipientEmail, recipientName } = context.inputData;
// Send email logic here
return {
status: "sent",
to: recipientEmail,
};
},
});
const workflow = new Workflow({
name: "email-workflow",
});
workflow
.step(fetchUserStep)
.then(sendEmailStep, {
variables: {
// Map specific fields from fetchUser to sendEmail inputs
recipientEmail: { step: fetchUserStep, path: "email" },
recipientName: { step: fetchUserStep, path: "name" },
},
})
.commit();
変数マッピングの詳細については、ワークフロー変数によるデータマッピングのドキュメントをご覧ください。
Contextオブジェクトの利用
contextオブジェクトは、すべてのステップ結果とその出力に直接アクセスする手段を提供します。この方法はより柔軟ですが、型安全性を維持するためには注意が必要です。
ステップの結果には context.steps
オブジェクトを通じて直接アクセスできます:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// Access data from a previous step
let userData: { name: string; userId: string };
if (context.steps["fetchUser"]?.status === "success") {
userData = context.steps.fetchUser.output;
} else {
throw new Error("User data not found");
}
return {
orderId: "order123",
userId: userData.userId,
status: "processing",
};
},
});
const workflow = new Workflow({
name: "order-workflow",
});
workflow.step(fetchUserStep).then(processOrderStep).commit();
ワークフロー全体の型安全性
ワークフロー全体で包括的な型安全性を確保するために、すべてのステップの型を定義し、それらをWorkflowに渡すことができます。 これにより、条件や最終的なワークフロー出力でcontextオブジェクトやステップ結果に対して型安全性を得ることができます。
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Create steps with typed outputs
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com",
};
},
});
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// TypeScript knows the shape of userData
const userData = context.getStepResult(fetchUserStep);
return {
orderId: "order123",
status: "processing",
};
},
});
const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
name: "typed-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.until(async ({ context }) => {
// TypeScript knows the shape of userData here
const res = context.getStepResult("fetchUser");
return res?.userId === "123";
}, processOrderStep)
.commit();
トリガーデータへのアクセス
ステップ結果に加えて、ワークフローを開始した元のトリガーデータにもアクセスできます:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Define trigger schema
const triggerSchema = z.object({
customerId: z.string(),
orderItems: z.array(z.string()),
});
type TriggerType = z.infer<typeof triggerSchema>;
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// Access trigger data with type safety
const triggerData = context.getStepResult<TriggerType>("trigger");
return {
customerId: triggerData?.customerId,
itemCount: triggerData?.orderItems.length || 0,
status: "processing",
};
},
});
const workflow = new Workflow({
name: "order-workflow",
triggerSchema,
});
workflow.step(processOrderStep).commit();
レジュームデータへのアクセス
ステップに注入されたデータは context.inputData
オブジェクトで利用でき、ステップの inputSchema
に基づいて型付けされます。
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const processOrderStep = new Step({
id: "processOrder",
inputSchema: z.object({
orderId: z.string(),
}),
execute: async ({ context, suspend }) => {
const { orderId } = context.inputData;
if (!orderId) {
await suspend();
return;
}
return {
orderId,
status: "processed",
};
},
});
const workflow = new Workflow({
name: "order-workflow",
});
workflow.step(processOrderStep).commit();
const run = workflow.createRun();
const result = await run.start();
const resumedResult = await workflow.resume({
runId: result.runId,
stepId: "processOrder",
inputData: {
orderId: "123",
},
});
console.log({ resumedResult });
ワークフロー結果へのアクセス
Workflow
型パラメータにステップ型を注入することで、ワークフローの結果に型安全にアクセスできます。
import { Workflow } from "@mastra/core/workflows";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com",
};
},
});
const processOrderStep = new Step({
id: "processOrder",
outputSchema: z.object({
orderId: z.string(),
status: z.string(),
}),
execute: async ({ context }) => {
const userData = context.getStepResult(fetchUserStep);
return {
orderId: "order123",
status: "processing",
};
},
});
const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
name: "typed-workflow",
});
workflow.step(fetchUserStep).then(processOrderStep).commit();
const run = workflow.createRun();
const result = await run.start();
// The result is a discriminated union of the step results
// So it needs to be narrowed down via status checks
if (result.results.processOrder.status === "success") {
// TypeScript will know the shape of the results
const orderId = result.results.processOrder.output.orderId;
console.log({ orderId });
}
if (result.results.fetchUser.status === "success") {
const userId = result.results.fetchUser.output.userId;
console.log({ userId });
}
データフローのベストプラクティス
-
型安全性のために Step 参照とともに getStepResult を使用する
- TypeScript が正しい型を推論できるようにする
- コンパイル時に型エラーを検出できる
-
*明示的な依存関係のために変数マッピングを使用する
- データフローが明確かつ保守しやすくなる
- ステップ間の依存関係を良いドキュメントとして提供できる
-
各ステップに出力スキーマを定義する
- 実行時にデータを検証できる
execute
関数の戻り値の型を検証できる- TypeScript での型推論が向上する
-
データが存在しない場合も適切に処理する
- ステップの結果にアクセスする前に必ず存在を確認する
- オプションデータにはフォールバック値を用意する
-
データ変換はシンプルに保つ
- 変数マッピング内ではなく、専用のステップでデータを変換する
- ワークフローのテストやデバッグが容易になる
データフロー手法の比較
手法 | 型安全性 | 明示性 | ユースケース |
---|---|---|---|
getStepResult | 最高 | 高い | 厳格な型付けが必要な複雑なワークフロー |
変数マッピング | 高い | 高い | 依存関係を明確かつ明示的にしたい場合 |
context.steps | 中程度 | 低い | シンプルなワークフローでステップデータに素早くアクセスしたい場合 |
ユースケースに合ったデータフロー手法を選択することで、型安全かつ保守性の高いワークフローを構築できます。