Skip to Content
ドキュメントワークフロー分岐、マージ、条件分岐 | ワークフロー | Mastra ドキュメント

Control Flow

ワークフローを構築する際、通常は操作をより小さなタスクに分解し、それらをリンクして再利用できるようにします。Stepsは、入力、出力、実行ロジックを定義することで、これらのタスクを管理するための構造化された方法を提供します。

  • スキーマが一致する場合、各ステップのoutputSchemaは自動的に次のステップのinputSchemaに渡されます。
  • スキーマが一致しない場合は、Input data mappingを使用してoutputSchemaを期待されるinputSchemaに変換します。

Sequential

.then()を使用してステップを順次実行するように連鎖させます:

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({...}); const step2 = createStep({...}); export const testWorkflow = createWorkflow({...}) .then(step1) .then(step2) .commit();

並列

.parallel() を使ってステップを並列に実行します:

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({...}); const step2 = createStep({...}); export const testWorkflow = createWorkflow({...}) .parallel([step1, step2]) .commit();

この方法では、配列内のすべてのステップが同時に実行され、すべての並列ステップが完了した後に次のステップへ進みます。

詳細は ステップによる並列実行 をご覧ください。

分岐

.branch() を使って条件分岐を作成します:

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const lessThanStep = createStep({...}); const greaterThanStep = createStep({...}); export const testWorkflow = createWorkflow({...}) .branch([ [async ({ inputData: { some_value } }) => some_value <= 9, lessThanStep], [async ({ inputData: { some_value } }) => some_value >= 10, greaterThanStep] ]) .commit();

分岐条件は順番に評価されますが、条件に一致したステップは並列で実行されます。

詳細は Workflow with Conditional Branching をご覧ください。

ループ

ワークフローは2種類のループをサポートしています。ステップ、またはネストされたワークフローのようなステップ互換の構造をループする場合、初期のinputDataは前のステップの出力から取得されます。

互換性を確保するため、ループの初期入力は以下のいずれかである必要があります:

  • 前のステップの出力の形状と一致する、または
  • map関数を使用して明示的に変換される。

Dowhile

条件が真である間、ステップを繰り返し実行します。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const counterStep = createStep({...}); export const testWorkflow = createWorkflow({...}) .dowhile(counterStep, async ({ inputData: { number } }) => number < 10) .commit();

Dountil

条件が真になるまで、ステップを繰り返し実行します。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const counterStep = createStep({...}); export const testWorkflow = createWorkflow({...}) .dountil(counterStep, async ({ inputData: { number } }) => number > 10) .commit();

Foreach

inputSchemaの各アイテムに対して、同じステップを順次実行します。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const mapStep = createStep({...}); export const testWorkflow = createWorkflow({...}) .foreach(mapStep) .commit();

早期終了

ステップ内でbail()を呼び出すことで、ワークフローの実行を正常に中断できます。これはbail()関数に渡されたペイロードをワークフローの結果として返します。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({ id: 'step1', execute: async ({ bail, inputData }) => { return bail({ result: 'bailed' }); }, inputSchema: z.object({ value: z.string() }), outputSchema: z.object({ result: z.string() }), }); export const testWorkflow = createWorkflow({...}) .then(step1) .commit();

失敗した中断は、ステップ内でエラーをスローすることで発生します。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({ id: 'step1', execute: async ({ bail, inputData }) => { throw new Error('bailed'); }, inputSchema: z.object({ value: z.string() }), outputSchema: z.object({ result: z.string() }), }); export const testWorkflow = createWorkflow({...}) .then(step1) .commit();

実行インスタンスの例

以下の例は、複数の入力でランを開始する方法を示しています。各入力はmapStepを順次通過します。

src/test-workflow.ts
import { mastra } from "./mastra"; const run = await mastra.getWorkflow("testWorkflow").createRunAsync(); const result = await run.start({ inputData: [{ number: 10 }, { number: 100 }, { number: 200 }] });

ターミナルからこのランを実行するには:

npx tsx src/test-workflow.ts

並行性

オプションで、concurrencyを使用すると、同時実行数に制限を設けてステップを並列実行できます。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const mapStep = createStep({...}) export const testWorkflow = createWorkflow({...}) .foreach(mapStep, { concurrency: 2 }) .commit();

並列ワークフロー

ワークフロー自体も並列で実行することができます。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const workflow1 = createWorkflow({...}); const workflow2 = createWorkflow({...}); export const testWorkflow = createWorkflow({...}) .parallel([workflow1, workflow2]) .commit();

並列ステップは前のステップの結果を入力として受け取ります。それらの出力は、ステップのidをキーとし、ステップのoutputを値とするオブジェクトとして次のステップの入力に渡されます。

ネストされたワークフロー

以下の例では、nestedWorkflowtestWorkflow内のステップとして使用されています。testWorkflowstep1を使用し、一方でnestedWorkflowstep2step3を構成しています。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; export const nestedWorkflow = createWorkflow({...}) export const testWorkflow = createWorkflow({...}) .then(nestedWorkflow) .commit();

.branch().parallel()を使用してより複雑な制御フローを構築する場合、複数のステップを実行するには、それらのステップをネストされたワークフローでラップし、どのように実行すべきかを明確に定義する必要があります。

クローンされたワークフロー

以下の例では、clonedWorkflowworkflow1のクローンであり、testWorkflow内のステップとして使用されています。clonedWorkflowstep1の後に順次実行されます。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep, cloneWorkflow } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({...}); const clonedWorkflow = cloneWorkflow(step1, { id: "cloned-workflow" }); export const testWorkflow = createWorkflow({...}) .then(step1) .then(clonedWorkflow) .commit();