Skip to Content
ドキュメントワークフロー分岐、マージ、条件分岐 | ワークフロー | Mastra ドキュメント

Control Flow

ワークフローを構築する際、通常は操作をより小さなタスクに分解し、それらをリンクして再利用できるようにします。Stepsは、入力、出力、実行ロジックを定義することで、これらのタスクを管理するための構造化された方法を提供します。

  • スキーマが一致する場合、各ステップのoutputSchemaは自動的に次のステップのinputSchemaに渡されます。
  • スキーマが一致しない場合は、Input data mappingを使用してoutputSchemaを期待されるinputSchemaに変換します。

.then()を使ったステップのチェーン

.then()を使用してステップを順次実行するようにチェーンします:

Chaining steps with .then()

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({...}); const step2 = createStep({...}); export const testWorkflow = createWorkflow({...}) .then(step1) .then(step2) .commit();

これは期待通りの動作をします:step1を実行し、次にstep2を実行します。

.parallel()を使用した同時実行ステップ

.parallel()を使用してステップを同時実行します:

.parallel()を使用した並行ステップ

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({...}); const step2 = createStep({...}); const step3 = createStep({...}); export const testWorkflow = createWorkflow({...}) .parallel([step1, step2]) .then(step3) .commit();

これはstep1step2を並行して実行し、両方が完了した後にstep3に続きます。

詳細については、ステップでの並列実行を参照してください。

.branch()を使った条件分岐

.branch()を使用してステップを条件付きで実行します:

.branch()を使った条件分岐

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const lessThanStep = createStep({...}); const greaterThanStep = createStep({...}); export const testWorkflow = createWorkflow({...}) .branch([ [async ({ inputData: { value } }) => (value < 9), lessThanStep], [async ({ inputData: { value } }) => (value >= 9), greaterThanStep] ]) .commit();

分岐条件は順次評価されますが、条件に一致するステップは並列で実行されます。

詳細については、条件分岐を使ったワークフローを参照してください。

ステップのループ

ワークフローは2種類のループをサポートしています。ステップやネストされたワークフローなどのステップ互換構造をループする場合、初期のinputDataは前のステップの出力から取得されます。

互換性を確保するため、ループの初期入力は前のステップの出力の形状と一致するか、map関数を使用して明示的に変換される必要があります。

  • 前のステップの出力の形状と一致する、または
  • map関数を使用して明示的に変換される。

.dowhile()による繰り返し

条件が真である間、ステップを繰り返し実行します。

Repeating with .dowhile()

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const counterStep = createStep({...}); export const testWorkflow = createWorkflow({...}) .dowhile(counterStep, async ({ inputData: { number } }) => number < 10) .commit();

.dountil()による繰り返し

条件が真になるまでステップを繰り返し実行します。

Repeating with .dountil()

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const counterStep = createStep({...}); export const testWorkflow = createWorkflow({...}) .dountil(counterStep, async ({ inputData: { number } }) => number > 10) .commit();

.foreach()による繰り返し

inputSchemaの各アイテムに対して同じステップを順次実行します。

Repeating with .foreach()

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const mapStep = createStep({...}); export const testWorkflow = createWorkflow({...}) .foreach(mapStep) .commit();

並行性制限の設定

concurrencyを使用して、同時実行数に制限を設けてステップを並列実行します。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const mapStep = createStep({...}) export const testWorkflow = createWorkflow({...}) .foreach(mapStep, { concurrency: 2 }) .commit();

ネストされたワークフローの使用

ネストされたワークフローを.then()に渡すことで、ステップとして使用します。これにより、親ワークフローの一部として、その各ステップが順番に実行されます。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; export const nestedWorkflow = createWorkflow({...}) export const testWorkflow = createWorkflow({...}) .then(nestedWorkflow) .commit();

ワークフローのクローン

cloneWorkflowを使用して既存のワークフローを複製します。これにより、idなどのパラメータを上書きしながら、その構造を再利用できます。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep, cloneWorkflow } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({...}); const parentWorkflow = createWorkflow({...}) const clonedWorkflow = cloneWorkflow(parentWorkflow, { id: "cloned-workflow" }); export const testWorkflow = createWorkflow({...}) .then(step1) .then(clonedWorkflow) .commit();

bail()を使用した早期終了

ステップでbail()を使用して、成功した結果で早期終了します。これは提供されたペイロードをステップの出力として返し、ワークフローの実行を終了します。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({ id: 'step1', execute: async ({ bail }) => { return bail({ result: 'bailed' }); }, inputSchema: z.object({ value: z.string() }), outputSchema: z.object({ result: z.string() }), }); export const testWorkflow = createWorkflow({...}) .then(step1) .commit();

Error()を使用した早期終了

ステップでthrow new Error()を使用してエラーで終了します。

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows"; import { z } from "zod"; const step1 = createStep({ id: 'step1', execute: async () => { throw new Error('bailed'); }, inputSchema: z.object({ value: z.string() }), outputSchema: z.object({ result: z.string() }), }); export const testWorkflow = createWorkflow({...}) .then(step1) .commit();

これによりステップからエラーがスローされ、ワークフローの実行が停止し、結果としてエラーが返されます。

実行インスタンスの例

以下の例では、複数の入力を使用してランを開始する方法を示しています。各入力はmapStepを順次通過します。

src/test-workflow.ts
import { mastra } from "./mastra"; const run = await mastra.getWorkflow("testWorkflow").createRunAsync(); const result = await run.start({ inputData: [{ number: 10 }, { number: 100 }, { number: 200 }] });

ターミナルからこのランを実行するには:

npx tsx src/test-workflow.ts