Skip to Content
ドキュメントワークフロー分岐、マージ、条件分岐 | ワークフロー | Mastra ドキュメント

ワークフローにおける制御フロー:分岐、マージ、条件

複数のステップからなるプロセスを作成する際、ステップを並列で実行したり、順番に連鎖させたり、結果に応じて異なる経路をたどる必要がある場合があります。このページでは、分岐、マージ、条件を管理して、ロジック要件を満たすワークフローを構築する方法について説明します。コードスニペットでは、複雑な制御フローを構成するための主要なパターンを示しています。

並列実行

互いに依存しない複数のステップを同時に実行することができます。この方法を使うことで、各ステップが独立したタスクを実行する場合、ワークフローの処理速度を向上させることができます。以下のコードは、2つのステップを並列で追加する方法を示しています。

myWorkflow.step(fetchUserData).step(fetchOrderData);

詳細については、Parallel Steps の例をご覧ください。

順次実行

ある処理の出力を次の処理の入力として利用するために、手順を厳密な順序で実行する必要がある場合があります。依存する操作をつなげるには、.then() を使用します。以下のコードは、手順を順番に連結する方法を示しています。

myWorkflow.step(fetchOrderData).then(validateData).then(processOrder);

詳細については、順次ステップ の例をご覧ください。

分岐と合流パス

異なる結果に対して異なるパスが必要な場合、分岐が役立ちます。また、完了後にパスを後で合流させることもできます。以下のコードは、stepAの後に分岐し、後でstepFで収束する方法を示しています:

myWorkflow .step(stepA) .then(stepB) .then(stepD) .after(stepA) .step(stepC) .then(stepE) .after([stepD, stepE]) .step(stepF);

この例では:

  • stepAはstepBに進み、その後stepDに進みます。
  • 別途、stepAはstepCもトリガーし、それがstepEにつながります。
  • 別途、stepFはstepDとstepEの両方が完了したときにトリガーされます。

詳細については、分岐パスの例を参照してください。

複数のブランチのマージ

複数の他のステップが完了した後にのみ実行されるステップが必要な場合があります。Mastra では、複数の依存関係をステップに指定できる複合的な .after([]) 構文が用意されています。

myWorkflow .step(fetchUserData) .then(validateUserData) .step(fetchProductData) .then(validateProductData) // This step will only run after BOTH validateUserData AND validateProductData have completed .after([validateUserData, validateProductData]) .step(processOrder);

この例では:

  • fetchUserDatafetchProductData は並列のブランチで実行されます
  • 各ブランチには独自のバリデーションステップがあります
  • processOrder ステップは、両方のバリデーションステップが正常に完了した後にのみ実行されます

このパターンは特に以下の場合に便利です:

  • 並列実行パスの結合
  • ワークフロー内での同期ポイントの実装
  • すべての必要なデータが揃ってから次に進むことの保証

複数の .after([]) 呼び出しを組み合わせることで、より複雑な依存関係パターンを作成することもできます:

myWorkflow // First branch .step(stepA) .then(stepB) .then(stepC) // Second branch .step(stepD) .then(stepE) // Third branch .step(stepF) .then(stepG) // This step depends on the completion of multiple branches .after([stepC, stepE, stepG]) .step(finalStep);

循環依存関係とループ

ワークフローでは、特定の条件が満たされるまでステップを繰り返す必要がよくあります。Mastra では、ループを作成するための強力な2つの方法、untilwhile を提供しています。これらのメソッドは、繰り返しタスクを直感的に実装する方法を提供します。

手動による循環依存関係の利用(従来のアプローチ)

以前のバージョンでは、条件付きで循環依存関係を手動で定義することでループを作成できました。

myWorkflow .step(fetchData) .then(processData) .after(processData) .step(finalizeData, { when: { "processData.status": "success" }, }) .step(fetchData, { when: { "processData.status": "retry" }, });

この方法も引き続き利用できますが、より新しい untilwhile メソッドを使うことで、よりシンプルで保守しやすいループを作成できます。

until を使った条件付きループ

until メソッドは、指定した条件が真になるまでステップを繰り返します。引数は以下の通りです。

  1. ループを終了する条件
  2. 繰り返すステップ
  3. 繰り返しステップに渡すオプションの変数
// 目標値に達するまでカウンターをインクリメントするステップ const incrementStep = new Step({ id: "increment", inputSchema: z.object({ // 現在のカウンター値 counter: z.number().optional(), }), outputSchema: z.object({ // 更新されたカウンター値 updatedCounter: z.number(), }), execute: async ({ context }) => { const { counter = 0 } = context.inputData; return { updatedCounter: counter + 1 }; }, }); workflow .step(incrementStep) .until( async ({ context }) => { // カウンターが10に達したら停止 const result = context.getStepResult(incrementStep); return (result?.updatedCounter ?? 0) >= 10; }, incrementStep, { // 現在のカウンターを次のイテレーションに渡す counter: { step: incrementStep, path: "updatedCounter", }, }, ) .then(finalStep);

参照ベースの条件も利用できます。

workflow .step(incrementStep) .until( { ref: { step: incrementStep, path: "updatedCounter" }, query: { $gte: 10 }, }, incrementStep, { counter: { step: incrementStep, path: "updatedCounter", }, }, ) .then(finalStep);

while を使った条件付きループ

while メソッドは、指定した条件が真である間ステップを繰り返します。引数は until と同じです。

  1. ループを継続する条件
  2. 繰り返すステップ
  3. 繰り返しステップに渡すオプションの変数
// 目標値未満の間カウンターをインクリメントするステップ const incrementStep = new Step({ id: "increment", inputSchema: z.object({ // 現在のカウンター値 counter: z.number().optional(), }), outputSchema: z.object({ // 更新されたカウンター値 updatedCounter: z.number(), }), execute: async ({ context }) => { const { counter = 0 } = context.inputData; return { updatedCounter: counter + 1 }; }, }); workflow .step(incrementStep) .while( async ({ context }) => { // カウンターが10未満の間継続 const result = context.getStepResult(incrementStep); return (result?.updatedCounter ?? 0) < 10; }, incrementStep, { // 現在のカウンターを次のイテレーションに渡す counter: { step: incrementStep, path: "updatedCounter", }, }, ) .then(finalStep);

参照ベースの条件も利用できます。

workflow .step(incrementStep) .while( { ref: { step: incrementStep, path: "updatedCounter" }, query: { $lt: 10 }, }, incrementStep, { counter: { step: incrementStep, path: "updatedCounter", }, }, ) .then(finalStep);

参照条件で使える比較演算子

参照ベースの条件を使う場合、以下の比較演算子が利用できます。

演算子説明
$eq等しい
$ne等しくない
$gtより大きい
$gte以上
$ltより小さい
$lte以下

条件

前のステップからのデータに基づいてステップを実行するかどうかを制御するには、when プロパティを使用します。以下は条件を指定する3つの方法です。

オプション1:関数

myWorkflow.step( new Step({ id: "processData", execute: async ({ context }) => { // Action logic }, }), { when: async ({ context }) => { const fetchData = context?.getStepResult<{ status: string }>("fetchData"); return fetchData?.status === "success"; }, }, );

オプション2:クエリオブジェクト

myWorkflow.step( new Step({ id: "processData", execute: async ({ context }) => { // Action logic }, }), { when: { ref: { step: { id: "fetchData", }, path: "status", }, query: { $eq: "success" }, }, }, );

オプション3:シンプルなパス比較

myWorkflow.step( new Step({ id: "processData", execute: async ({ context }) => { // Action logic }, }), { when: { "fetchData.status": "success", }, }, );

データアクセスパターン

Mastraはステップ間でデータを受け渡すためのいくつかの方法を提供しています:

  1. コンテキストオブジェクト - コンテキストオブジェクトを通じてステップの結果に直接アクセスする
  2. 変数マッピング - あるステップの出力を別のステップの入力に明示的にマッピングする
  3. getStepResultメソッド - ステップの出力を取得するための型安全なメソッド

各アプローチは、ユースケースや型安全性の要件に応じて、それぞれ利点があります。

getStepResultメソッドの使用

getStepResultメソッドは、ステップの結果にアクセスするための型安全な方法を提供します。TypeScriptを使用する場合は、型情報を保持するためにこのアプローチが推奨されます。

基本的な使用法

より良い型安全性のために、getStepResultに型パラメータを提供できます:

src/mastra/workflows/get-step-result.ts
import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ name: z.string(), userId: z.string(), }), execute: async ({ context }) => { return { name: "John Doe", userId: "123" }; }, }); const analyzeDataStep = new Step({ id: "analyzeData", execute: async ({ context }) => { // Type-safe access to previous step result const userData = context.getStepResult<{ name: string; userId: string }>( "fetchUser", ); if (!userData) { return { status: "error", message: "User data not found" }; } return { analysis: `Analyzed data for user ${userData.name}`, userId: userData.userId, }; }, });

ステップ参照の使用

最も型安全なアプローチは、getStepResult呼び出しでステップを直接参照することです:

src/mastra/workflows/step-reference.ts
import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; // Define step with output schema const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ userId: z.string(), name: z.string(), email: z.string(), }), execute: async () => { return { userId: "user123", name: "John Doe", email: "john@example.com", }; }, }); const processUserStep = new Step({ id: "processUser", execute: async ({ context }) => { // TypeScript will infer the correct type from fetchUserStep's outputSchema const userData = context.getStepResult(fetchUserStep); return { processed: true, userName: userData?.name, }; }, }); const workflow = new Workflow({ name: "user-workflow", }); workflow.step(fetchUserStep).then(processUserStep).commit();

変数マッピングの使用

変数マッピングは、ステップ間のデータフローを定義する明示的な方法です。 このアプローチは依存関係を明確にし、優れた型安全性を提供します。 ステップに注入されたデータはcontext.inputDataオブジェクトで利用可能であり、ステップのinputSchemaに基づいて型付けされます。

src/mastra/workflows/variable-mapping.ts
import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ userId: z.string(), name: z.string(), email: z.string(), }), execute: async () => { return { userId: "user123", name: "John Doe", email: "john@example.com", }; }, }); const sendEmailStep = new Step({ id: "sendEmail", inputSchema: z.object({ recipientEmail: z.string(), recipientName: z.string(), }), execute: async ({ context }) => { const { recipientEmail, recipientName } = context.inputData; // Send email logic here return { status: "sent", to: recipientEmail, }; }, }); const workflow = new Workflow({ name: "email-workflow", }); workflow .step(fetchUserStep) .then(sendEmailStep, { variables: { // Map specific fields from fetchUser to sendEmail inputs recipientEmail: { step: fetchUserStep, path: "email" }, recipientName: { step: fetchUserStep, path: "name" }, }, }) .commit();

変数マッピングの詳細については、ワークフロー変数によるデータマッピングのドキュメントをご覧ください。

Contextオブジェクトの利用

contextオブジェクトは、すべてのステップ結果とその出力に直接アクセスする手段を提供します。この方法はより柔軟ですが、型安全性を維持するためには注意が必要です。 ステップの結果には context.steps オブジェクトを通じて直接アクセスできます:

src/mastra/workflows/context-access.ts
import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; const processOrderStep = new Step({ id: "processOrder", execute: async ({ context }) => { // Access data from a previous step let userData: { name: string; userId: string }; if (context.steps["fetchUser"]?.status === "success") { userData = context.steps.fetchUser.output; } else { throw new Error("User data not found"); } return { orderId: "order123", userId: userData.userId, status: "processing", }; }, }); const workflow = new Workflow({ name: "order-workflow", }); workflow.step(fetchUserStep).then(processOrderStep).commit();

ワークフロー全体の型安全性

ワークフロー全体で包括的な型安全性を確保するために、すべてのステップの型を定義し、それらをWorkflowに渡すことができます。 これにより、条件や最終的なワークフロー出力でcontextオブジェクトやステップ結果に対して型安全性を得ることができます。

src/mastra/workflows/workflow-typing.ts
import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; // Create steps with typed outputs const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ userId: z.string(), name: z.string(), email: z.string(), }), execute: async () => { return { userId: "user123", name: "John Doe", email: "john@example.com", }; }, }); const processOrderStep = new Step({ id: "processOrder", execute: async ({ context }) => { // TypeScript knows the shape of userData const userData = context.getStepResult(fetchUserStep); return { orderId: "order123", status: "processing", }; }, }); const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({ name: "typed-workflow", }); workflow .step(fetchUserStep) .then(processOrderStep) .until(async ({ context }) => { // TypeScript knows the shape of userData here const res = context.getStepResult("fetchUser"); return res?.userId === "123"; }, processOrderStep) .commit();

トリガーデータへのアクセス

ステップ結果に加えて、ワークフローを開始した元のトリガーデータにもアクセスできます:

src/mastra/workflows/trigger-data.ts
import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; // Define trigger schema const triggerSchema = z.object({ customerId: z.string(), orderItems: z.array(z.string()), }); type TriggerType = z.infer<typeof triggerSchema>; const processOrderStep = new Step({ id: "processOrder", execute: async ({ context }) => { // Access trigger data with type safety const triggerData = context.getStepResult<TriggerType>("trigger"); return { customerId: triggerData?.customerId, itemCount: triggerData?.orderItems.length || 0, status: "processing", }; }, }); const workflow = new Workflow({ name: "order-workflow", triggerSchema, }); workflow.step(processOrderStep).commit();

レジュームデータへのアクセス

ステップに注入されたデータは context.inputData オブジェクトで利用でき、ステップの inputSchema に基づいて型付けされます。

src/mastra/workflows/resume-data.ts
import { Step, Workflow } from "@mastra/core/workflows"; import { z } from "zod"; const processOrderStep = new Step({ id: "processOrder", inputSchema: z.object({ orderId: z.string(), }), execute: async ({ context, suspend }) => { const { orderId } = context.inputData; if (!orderId) { await suspend(); return; } return { orderId, status: "processed", }; }, }); const workflow = new Workflow({ name: "order-workflow", }); workflow.step(processOrderStep).commit(); const run = workflow.createRun(); const result = await run.start(); const resumedResult = await workflow.resume({ runId: result.runId, stepId: "processOrder", inputData: { orderId: "123", }, }); console.log({ resumedResult });

ワークフロー結果へのアクセス

Workflow 型パラメータにステップ型を注入することで、ワークフローの結果に型安全にアクセスできます。

src/mastra/workflows/get-results.ts
import { Workflow } from "@mastra/core/workflows"; const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ userId: z.string(), name: z.string(), email: z.string(), }), execute: async () => { return { userId: "user123", name: "John Doe", email: "john@example.com", }; }, }); const processOrderStep = new Step({ id: "processOrder", outputSchema: z.object({ orderId: z.string(), status: z.string(), }), execute: async ({ context }) => { const userData = context.getStepResult(fetchUserStep); return { orderId: "order123", status: "processing", }; }, }); const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({ name: "typed-workflow", }); workflow.step(fetchUserStep).then(processOrderStep).commit(); const run = workflow.createRun(); const result = await run.start(); // The result is a discriminated union of the step results // So it needs to be narrowed down via status checks if (result.results.processOrder.status === "success") { // TypeScript will know the shape of the results const orderId = result.results.processOrder.output.orderId; console.log({ orderId }); } if (result.results.fetchUser.status === "success") { const userId = result.results.fetchUser.output.userId; console.log({ userId }); }

データフローのベストプラクティス

  1. 型安全性のために Step 参照とともに getStepResult を使用する

    • TypeScript が正しい型を推論できるようにする
    • コンパイル時に型エラーを検出できる
  2. *明示的な依存関係のために変数マッピングを使用する

    • データフローが明確かつ保守しやすくなる
    • ステップ間の依存関係を良いドキュメントとして提供できる
  3. 各ステップに出力スキーマを定義する

    • 実行時にデータを検証できる
    • execute 関数の戻り値の型を検証できる
    • TypeScript での型推論が向上する
  4. データが存在しない場合も適切に処理する

    • ステップの結果にアクセスする前に必ず存在を確認する
    • オプションデータにはフォールバック値を用意する
  5. データ変換はシンプルに保つ

    • 変数マッピング内ではなく、専用のステップでデータを変換する
    • ワークフローのテストやデバッグが容易になる

データフロー手法の比較

手法型安全性明示性ユースケース
getStepResult最高高い厳格な型付けが必要な複雑なワークフロー
変数マッピング高い高い依存関係を明確かつ明示的にしたい場合
context.steps中程度低いシンプルなワークフローでステップデータに素早くアクセスしたい場合

ユースケースに合ったデータフロー手法を選択することで、型安全かつ保守性の高いワークフローを構築できます。