Control Flow
ワークフローを構築する際、通常は操作をより小さなタスクに分解し、それらをリンクして再利用できるようにします。Stepsは、入力、出力、実行ロジックを定義することで、これらのタスクを管理するための構造化された方法を提供します。
- スキーマが一致する場合、各ステップの
outputSchema
は自動的に次のステップのinputSchema
に渡されます。 - スキーマが一致しない場合は、Input data mappingを使用して
outputSchema
を期待されるinputSchema
に変換します。
Sequential
.then()
を使用してステップを順次実行するように連鎖させます:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
export const testWorkflow = createWorkflow({...})
.then(step1)
.then(step2)
.commit();
並列
.parallel()
を使ってステップを並列に実行します:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({...});
export const testWorkflow = createWorkflow({...})
.parallel([step1, step2])
.commit();
この方法では、配列内のすべてのステップが同時に実行され、すべての並列ステップが完了した後に次のステップへ進みます。
詳細は ステップによる並列実行 をご覧ください。
分岐
.branch()
を使って条件分岐を作成します:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const lessThanStep = createStep({...});
const greaterThanStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.branch([
[async ({ inputData: { some_value } }) => some_value <= 9, lessThanStep],
[async ({ inputData: { some_value } }) => some_value >= 10, greaterThanStep]
])
.commit();
分岐条件は順番に評価されますが、条件に一致したステップは並列で実行されます。
詳細は Workflow with Conditional Branching をご覧ください。
ループ
ワークフローは2種類のループをサポートしています。ステップ、またはネストされたワークフローのようなステップ互換の構造をループする場合、初期のinputData
は前のステップの出力から取得されます。
互換性を確保するため、ループの初期入力は以下のいずれかである必要があります:
- 前のステップの出力の形状と一致する、または
map
関数を使用して明示的に変換される。
Dowhile
条件が真である間、ステップを繰り返し実行します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const counterStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.dowhile(counterStep, async ({ inputData: { number } }) => number < 10)
.commit();
Dountil
条件が真になるまで、ステップを繰り返し実行します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const counterStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.dountil(counterStep, async ({ inputData: { number } }) => number > 10)
.commit();
Foreach
inputSchema
の各アイテムに対して、同じステップを順次実行します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const mapStep = createStep({...});
export const testWorkflow = createWorkflow({...})
.foreach(mapStep)
.commit();
早期終了
ステップ内でbail()
を呼び出すことで、ワークフローの実行を正常に中断できます。これはbail()
関数に渡されたペイロードをワークフローの結果として返します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: 'step1',
execute: async ({ bail, inputData }) => {
return bail({ result: 'bailed' });
},
inputSchema: z.object({ value: z.string() }),
outputSchema: z.object({ result: z.string() }),
});
export const testWorkflow = createWorkflow({...})
.then(step1)
.commit();
失敗した中断は、ステップ内でエラーをスローすることで発生します。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: 'step1',
execute: async ({ bail, inputData }) => {
throw new Error('bailed');
},
inputSchema: z.object({ value: z.string() }),
outputSchema: z.object({ result: z.string() }),
});
export const testWorkflow = createWorkflow({...})
.then(step1)
.commit();
実行インスタンスの例
以下の例は、複数の入力でランを開始する方法を示しています。各入力はmapStep
を順次通過します。
import { mastra } from "./mastra";
const run = await mastra.getWorkflow("testWorkflow").createRunAsync();
const result = await run.start({
inputData: [{ number: 10 }, { number: 100 }, { number: 200 }]
});
ターミナルからこのランを実行するには:
npx tsx src/test-workflow.ts
並行性
オプションで、concurrency
を使用すると、同時実行数に制限を設けてステップを並列実行できます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const mapStep = createStep({...})
export const testWorkflow = createWorkflow({...})
.foreach(mapStep, { concurrency: 2 })
.commit();
並列ワークフロー
ワークフロー自体も並列で実行することができます。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const workflow1 = createWorkflow({...});
const workflow2 = createWorkflow({...});
export const testWorkflow = createWorkflow({...})
.parallel([workflow1, workflow2])
.commit();
並列ステップは前のステップの結果を入力として受け取ります。それらの出力は、ステップのid
をキーとし、ステップのoutput
を値とするオブジェクトとして次のステップの入力に渡されます。
ネストされたワークフロー
以下の例では、nestedWorkflow
がtestWorkflow
内のステップとして使用されています。testWorkflow
はstep1
を使用し、一方でnestedWorkflow
はstep2
とstep3
を構成しています。
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
export const nestedWorkflow = createWorkflow({...})
export const testWorkflow = createWorkflow({...})
.then(nestedWorkflow)
.commit();
.branch()
や.parallel()
を使用してより複雑な制御フローを構築する場合、複数のステップを実行するには、それらのステップをネストされたワークフローでラップし、どのように実行すべきかを明確に定義する必要があります。
クローンされたワークフロー
以下の例では、clonedWorkflow
はworkflow1
のクローンであり、testWorkflow
内のステップとして使用されています。clonedWorkflow
はstep1
の後に順次実行されます。
import { createWorkflow, createStep, cloneWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const clonedWorkflow = cloneWorkflow(step1, { id: "cloned-workflow" });
export const testWorkflow = createWorkflow({...})
.then(step1)
.then(clonedWorkflow)
.commit();