制御フロー
ワークフローを構築する際は、通常、処理を再利用可能な小さなタスクに分割して連結します。ステップは、入力・出力・実行ロジックを定義することで、これらのタスクを体系的に管理するための仕組みを提供します。
- スキーマが一致する場合、各ステップの
outputSchema
は自動的に次のステップのinputSchema
に渡されます。 - スキーマが一致しない場合は、入力データマッピング を使用して、
outputSchema
を想定されるinputSchema
に変換します。
.then()
を使ったステップの連結
.then()
を使ってステップを順に実行するように連結します:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
export const testWorkflow = createWorkflow({...})
.then(step1)
.then(step2)
.commit();
これは想定どおりに動作します。まず step1
を実行し、続いて step2
を実行します。
.parallel()
を使った並行実行ステップ
.parallel()
を使ってステップを並行実行します:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
const step3 = createStep({...});
export const testWorkflow = createWorkflow({...})
.parallel([step1, step2])
.then(step3)
.commit();
これは step1
と step2
を並行して実行し、両方の完了後に step3
に進みます。
くわしくは Parallel Execution with Steps を参照してください。
📹 視聴: ステップを並列実行して Mastra ワークフローを最適化する方法 → YouTube(3分)
.branch()
を使った条件ロジック
.branch()
を使って条件付きでステップを実行します:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const lessThanStep = createStep({...});
const greaterThanStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.branch([
[async ({ inputData: { value } }) => value <= 10, lessThanStep],
[async ({ inputData: { value } }) => value > 10, greaterThanStep]
])
.commit();
ブランチ条件は順に評価されますが、条件に一致したステップは並列に実行されます。
詳細は Workflow with Conditional Branching を参照してください。
ループ処理ステップ
Workflows は2種類のループをサポートします。ステップやネストされたワークフローなどのステップ互換の構成要素をループ実行する場合、初期の inputData
は直前のステップの出力が元になります。
互換性を保つため、ループの初期入力は直前のステップの出力と同じ形(shape)である必要があるか、map
関数で明示的に変換しておく必要があります。
- 直前のステップの出力と同じ形に合わせる、または
map
関数で明示的に変換する
.dowhile()
で繰り返す
条件が true のあいだ、ステップを繰り返し実行します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const counterStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.dowhile(counterStep, async ({ inputData: { number } }) => number < 10)
.commit();
.dountil()
で繰り返す
条件が true になるまで、ステップを繰り返し実行します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const counterStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.dountil(counterStep, async ({ inputData: { number } }) => number > 10)
.commit();
.foreach()
で繰り返す
inputSchema
の各アイテムに対して、同じステップを順次実行します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const mapStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.foreach(mapStep)
.commit();
同時実行数の制限を設定する
concurrency
を使うと、同時実行の上限を設けつつステップを並列実行できます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const mapStep = createStep({...})
export const testWorkflow = createWorkflow({...})
.foreach(mapStep, { concurrency: 2 })
.commit();
ネストされたワークフローの使用
.then()
に渡すことで、ネストされたワークフローをステップとして利用できます。これにより、親ワークフローの一部として、その各ステップが順番に実行されます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
export const nestedWorkflow = createWorkflow({...})
export const testWorkflow = createWorkflow({...})
.then(nestedWorkflow)
.commit();
ワークフローのクローン作成
既存のワークフローを複製するには cloneWorkflow
を使用します。これにより、id
などのパラメーターを上書きしつつ、構造を再利用できます。
import { createWorkflow, createStep, cloneWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const parentWorkflow = createWorkflow({...})
const clonedWorkflow = cloneWorkflow(parentWorkflow, { id: "cloned-workflow" });
export const testWorkflow = createWorkflow({...})
.then(step1)
.then(clonedWorkflow)
.commit();
実行インスタンスの例
次の例は、複数の入力で実行を開始する方法を示します。各入力は順番に mapStep
を通過します。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: [{ number: 10 }, { number: 100 }, { number: 200 }]
});
この実行をターミナルから実行するには:
npx tsx src/test-workflow.ts