Skip to Content
ドキュメントワークフロー (vNext)分岐、マージ、条件 | ワークフロー (vNext) | Mastra ドキュメント

順次フロー

.then()を使用して、順番に実行するステップをチェーンします:

// Chain steps to execute in sequence myWorkflow .then(step1) // First step .then(step2) // Second step, receives output from step1 .then(step3) // Third step, receives output from step2 .commit();

各ステップの出力は、スキーマが一致する場合、自動的に次のステップに渡されます。スキーマが一致しない場合は、map関数を使用して出力を期待されるスキーマに変換できます。 ステップのチェーンはタイプセーフであり、コンパイル時にチェックされます。

並列実行

.parallel()を使用してステップを並列に実行します:

// step1とstep2を並列に実行 myWorkflow .parallel([step1, step2]) // 両方の並列ステップが完了した後にstep3を続行 .then(step3) .commit();

これにより配列内のすべてのステップが同時に実行され、すべての並列ステップが完了した後に次のステップに進みます。

ワークフロー全体を並列に実行することもできます:

// nestedWorkflow1とnestedWorkflow2を並列に実行 myWorkflow .parallel([nestedWorkflow1, nestedWorkflow2]) .then(finalStep) .commit();

並列ステップは前のステップの結果を入力として受け取ります。それらの出力は、キーがステップIDで値がステップ出力であるオブジェクトとして次のステップの入力に渡されます。例えば、上記の例ではnestedWorkflow1nestedWorkflow2の2つのキーを持つオブジェクトが出力され、それぞれのワークフローの出力が値として含まれます。

条件分岐

.branch()を使用して条件分岐を作成します:

myWorkflow .then(initialStep) .branch([ // If value > 50, run highValueStep [async ({ inputData }) => inputData.value > 50, highValueStep], // If value is between 11 and 50, run lowValueStep [ async ({ inputData }) => inputData.value > 10 && inputData.value <= 50, lowValueStep, ], // If value <= 10, run extremelyLowValueStep [async ({ inputData }) => inputData.value <= 10, extremelyLowValueStep], ]) // After branching, continue with finalStep .then(finalStep) .commit();

分岐条件は順番に評価され、一致する条件を持つすべてのステップは並行して実行されます。inputData.value5の場合、lowValueStepextremelyLowValueStepの両方が実行されます。

各条件付きステップ(highValueSteplowValueStepなど)は、前のステップ(この場合はinitialStep)の出力を入力として受け取ります。一致する条件付きステップの出力はすべて収集されます。分岐後の次のステップ(finalStep)は、分岐で実行されたすべてのステップの出力を含むオブジェクトを受け取ります。このオブジェクトのキーはステップIDであり、値はそれらのステップの出力です({ lowValueStep: <lowValueStepの出力>, extremelyLowValueStep: <extremelyLowValueStepの出力> })。

ループ

vNextは2種類のループをサポートしています。ステップ(またはネストされたワークフローやその他のステップ互換の構造)をループする場合、ループのinputDataは最初は前のステップの出力ですが、その後のinputDataはループステップ自体の出力になります。したがってループでは、初期ループ状態は前のステップの出力と一致するか、map関数を使用して導出される必要があります。

Do-Whileループ: 条件が真である間、ステップを繰り返し実行します。

// Repeat incrementStep while value is less than 10 myWorkflow .dowhile(incrementStep, async ({ inputData }) => inputData.value < 10) .then(finalStep) .commit();

Do-Untilループ: 条件が真になるまで、ステップを繰り返し実行します。

// Repeat incrementStep while value is more than 10 myWorkflow .dountil(incrementStep, async ({ inputData }) => inputData.value >= 10) .then(finalStep) .commit();

Foreach

Foreachは配列型の入力の各項目に対してステップを実行するステップです。

// Step that adds 11 to the current value const mapStep = createStep({ id: "map", description: "Maps (+11) on the current value", inputSchema: z.object({ value: z.number(), }), outputSchema: z.object({ value: z.number(), }), execute: async ({ inputData }) => { return { value: inputData.value + 11 }; }, }); // final step that prints the result const finalStep = createStep({ id: "final", description: "Final step that prints the result", inputSchema: z.array(z.object({ value: z.number() })), outputSchema: z.object({ finalValue: z.number(), }), execute: async ({ inputData }) => { return { finalValue: inputData.reduce((acc, curr) => acc + curr.value, 0) }; }, }); const counterWorkflow = createWorkflow({ steps: [mapStep, finalStep], id: "counter-workflow", inputSchema: z.array(z.object({ value: z.number() })), outputSchema: z.object({ finalValue: z.number(), }), }); // Apply mapStep to each item in the input array, then run finalStep counterWorkflow.foreach(mapStep).then(finalStep).commit(); const run = counterWorkflow.createRun(); const result = await run.start({ inputData: [{ value: 1 }, { value: 22 }, { value: 333 }], }); if (result.status === "success") { console.log(result.result); // only exists if status is success } else if (result.status === "failed") { console.error(result.error); // only exists if status is failed, this is an instance of Error }

ループは入力配列の各項目に対して、一度に1つずつ順番にステップを実行します。オプションのconcurrencyを使用すると、同時実行数の制限付きで並列にステップを実行することができます。

counterWorkflow.foreach(mapStep, { concurrency: 2 }).then(finalStep).commit();

ネストされたワークフロー

vNextではワークフローをネストして組み合わせることができます:

const nestedWorkflow = createWorkflow({ id: 'nested-workflow', inputSchema: z.object({...}), outputSchema: z.object({...}), }) .then(step1) .then(step2) .commit(); const mainWorkflow = createWorkflow({ id: 'main-workflow', inputSchema: z.object({...}), outputSchema: z.object({...}), }) .then(initialStep) // 最初にinitialStepを実行 .then(nestedWorkflow) // 次にネストされたワークフローをステップとして実行 .then(finalStep) // 最後に、finalStepを実行 .commit();

上記の例では、nestedWorkflowmainWorkflowのステップとして使用されています。ここで、nestedWorkflowinputSchemainitialStepoutputSchemaと一致し、nestedWorkflowoutputSchemafinalStepinputSchemaと一致します。

ネストされたワークフローは、単純な順次実行を超えた実行フローを構成するための主要な(そして唯一の)方法です。.branch().parallel()を使用して実行フローを構成する場合、1つ以上のステップを実行するにはネストされたワークフローが必要であり、その副産物として、これらのステップがどのように実行されるかの説明が必要です。

// 屋内と屋外のアクティビティを並行して計画し、その結果を統合するワークフローを定義 const planBothWorkflow = createWorkflow({ id: "plan-both-workflow", inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), steps: [planActivities, planIndoorActivities, sythesizeStep], }) // planActivitiesとplanIndoorActivitiesを並行して実行 .parallel([planActivities, planIndoorActivities]) .then(sythesizeStep) .commit(); // メインの天気ワークフローを定義 const weatherWorkflow = createWorkflow({ id: "weather-workflow-step3-concurrency", inputSchema: z.object({ city: z.string().describe("天気を取得する都市"), }), outputSchema: z.object({ activities: z.string(), }), steps: [fetchWeather, planBothWorkflow, planActivities], }) // まず、指定された都市の天気を取得 .then(fetchWeather) .branch([ // 降水確率が20%を超える場合、屋内と屋外の両方のアクティビティを計画(ネストされたワークフロー) [ async ({ inputData }) => { return inputData?.precipitationChance > 20; }, planBothWorkflow, ], // 降水確率が20%以下の場合、屋外アクティビティのみを計画 [ async ({ inputData }) => { return inputData?.precipitationChance <= 20; }, planActivities, ], ]);

ネストされたワークフローは、最終結果(最後のステップの結果)のみをステップ出力として持ちます。