Skip to Content
ドキュメントワークフロー分岐、マージ、条件 | ワークフロー | Mastra ドキュメント

順次フロー

.then()を使用して、順番に実行するステップをチェーンします:

// Chain steps to execute in sequence myWorkflow .then(step1) // First step .then(step2) // Second step, receives output from step1 .then(step3) // Third step, receives output from step2 .commit();

各ステップの出力は、スキーマが一致する場合、自動的に次のステップに渡されます。スキーマが一致しない場合は、map関数を使用して出力を期待されるスキーマに変換できます。 ステップのチェーンは型安全であり、コンパイル時にチェックされます。

並列実行

.parallel()を使用してステップを並列に実行します:

// step1とstep2を並列に実行 myWorkflow .parallel([step1, step2]) // 両方の並列ステップが完了した後にstep3を続行 .then(step3) .commit();

これにより配列内のすべてのステップが同時に実行され、すべての並列ステップが完了した後に次のステップに進みます。

ワークフロー全体を並列に実行することもできます:

// nestedWorkflow1とnestedWorkflow2を並列に実行 myWorkflow .parallel([nestedWorkflow1, nestedWorkflow2]) .then(finalStep) .commit();

並列ステップは前のステップの結果を入力として受け取ります。それらの出力は、キーがステップIDで値がステップ出力であるオブジェクトとして次のステップの入力に渡されます。例えば、上記の例ではnestedWorkflow1nestedWorkflow2の2つのキーを持つオブジェクトが出力され、それぞれのワークフローの出力が値として含まれます。

条件分岐

.branch()を使用して条件分岐を作成します:

myWorkflow .then(initialStep) .branch([ // If value > 50, run highValueStep [async ({ inputData }) => inputData.value > 50, highValueStep], // If value is between 4 and 50, run lowValueStep [ async ({ inputData }) => inputData.value > 4 && inputData.value <= 50, lowValueStep, ], // If value <= 10, run extremelyLowValueStep [async ({ inputData }) => inputData.value <= 10, extremelyLowValueStep], ]) // After branching, continue with finalStep .then(finalStep) .commit();

分岐条件は順次評価され、条件に一致するすべてのステップが並列で実行されます。inputData.value5の場合、lowValueStepextremelyLowValueStepの両方が実行されます。

各条件ステップ(highValueSteplowValueStepなど)は、前のステップ(この場合はinitialStep)の出力を入力として受け取ります。一致した各条件ステップの出力が収集されます。分岐後の次のステップ(finalStep)は、分岐で実行されたすべてのステップの出力を含むオブジェクトを受け取ります。このオブジェクトのキーはステップIDで、値はそれらのステップの出力です({ lowValueStep: <lowValueStepの出力>, extremelyLowValueStep: <extremelyLowValueStepの出力> })。

ループ

Workflowsは2種類のループをサポートしています。ステップ(またはネストされたワークフローやその他のステップ互換の構造)をループする場合、ループのinputDataは最初は前のステップの出力ですが、その後のinputDataはループステップ自体の出力になります。したがってループでは、初期ループ状態は前のステップの出力と一致するか、map関数を使用して導出される必要があります。

Do-Whileループ: 条件が真である間、ステップを繰り返し実行します。

// Repeat incrementStep while value is less than 10 myWorkflow .dowhile(incrementStep, async ({ inputData }) => inputData.value < 10) .then(finalStep) .commit();

Do-Untilループ: 条件が真になるまで、ステップを繰り返し実行します。

// Repeat incrementStep while value is more than 10 myWorkflow .dountil(incrementStep, async ({ inputData }) => inputData.value >= 10) .then(finalStep) .commit();

Foreach

Foreachは配列型の入力の各項目に対してステップを実行するステップです。

// Step that adds 11 to the current value const mapStep = createStep({ id: "map", description: "Maps (+11) on the current value", inputSchema: z.object({ value: z.number(), }), outputSchema: z.object({ value: z.number(), }), execute: async ({ inputData }) => { return { value: inputData.value + 11 }; }, }); // final step that prints the result const finalStep = createStep({ id: "final", description: "Final step that prints the result", inputSchema: z.array(z.object({ value: z.number() })), outputSchema: z.object({ finalValue: z.number(), }), execute: async ({ inputData }) => { return { finalValue: inputData.reduce((acc, curr) => acc + curr.value, 0) }; }, }); const counterWorkflow = createWorkflow({ steps: [mapStep, finalStep], id: "counter-workflow", inputSchema: z.array(z.object({ value: z.number() })), outputSchema: z.object({ finalValue: z.number(), }), }); // Apply mapStep to each item in the input array, then run finalStep counterWorkflow.foreach(mapStep).then(finalStep).commit(); const run = counterWorkflow.createRun(); const result = await run.start({ inputData: [{ value: 1 }, { value: 22 }, { value: 333 }], }); if (result.status === "success") { console.log(result.result); // only exists if status is success } else if (result.status === "failed") { console.error(result.error); // only exists if status is failed, this is an instance of Error }

ループは入力配列の各項目に対して、一度に1つずつ順番にステップを実行します。オプションのconcurrencyを使用すると、同時実行数の制限付きで並列にステップを実行することができます。

counterWorkflow.foreach(mapStep, { concurrency: 2 }).then(finalStep).commit();

ネストされたワークフロー

ワークフローをネストすることで、ワークフローを組み合わせることができます:

const nestedWorkflow = createWorkflow({ id: 'nested-workflow', inputSchema: z.object({...}), outputSchema: z.object({...}), }) .then(step1) .then(step2) .commit(); const mainWorkflow = createWorkflow({ id: 'main-workflow', inputSchema: z.object({...}), outputSchema: z.object({...}), }) .then(initialStep) // Run initialStep first .then(nestedWorkflow) // Then run the nested workflow as a step .then(finalStep) // Finally, run finalStep .commit();

上記の例では、nestedWorkflowmainWorkflowのステップとして使用されており、nestedWorkflowinputSchemainitialStepoutputSchemaと一致し、nestedWorkflowoutputSchemafinalStepinputSchemaと一致します。

ネストされたワークフローは、単純な順次実行を超えて実行フローを構成する主要な(そして唯一の)方法です。.branch().parallel()を使用して実行フローを構成する場合、単一のステップ以上を実行するには必然的にネストされたワークフローが必要となり、副産物として、これらのステップがどのように実行されるかの記述が必要になります。

// Define a workflow that plans both indoor and outdoor activities in parallel, then synthesizes the results const planBothWorkflow = createWorkflow({ id: "plan-both-workflow", inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), steps: [planActivities, planIndoorActivities, sythesizeStep], }) // Run planActivities and planIndoorActivities in parallel .parallel([planActivities, planIndoorActivities]) .then(sythesizeStep) .commit(); // Define the main weather workflow const weatherWorkflow = createWorkflow({ id: "weather-workflow-step3-concurrency", inputSchema: z.object({ city: z.string().describe("The city to get the weather for"), }), outputSchema: z.object({ activities: z.string(), }), steps: [fetchWeather, planBothWorkflow, planActivities], }) // First, fetch the weather for the given city .then(fetchWeather) .branch([ // If precipitationChance > 20, plan both indoor and outdoor activities (nested workflow) [ async ({ inputData }) => { return inputData?.precipitationChance > 20; }, planBothWorkflow, ], // If precipitationChance <= 20, only plan outdoor activities [ async ({ inputData }) => { return inputData?.precipitationChance <= 20; }, planActivities, ], ]);

ネストされたワークフローは、最終結果(最後のステップの結果)のみをステップ出力として持ちます。