Skip to Content
ドキュメントWorkflows Vnext分岐、マージ、条件 | ワークフロー (vNext) | Mastra ドキュメント

順次フロー

.then()を使用して、順番に実行するステップをチェーンします:

myWorkflow.then(step1).then(step2).then(step3).commit();

各ステップの出力は、スキーマが一致する場合、自動的に次のステップに渡されます。スキーマが一致しない場合は、map関数を使用して出力を期待されるスキーマに変換できます。 ステップのチェーンはタイプセーフであり、コンパイル時にチェックされます。

並列実行

.parallel()を使用してステップを並列に実行します:

myWorkflow.parallel([step1, step2]).then(step3).commit();

これにより配列内のすべてのステップが同時に実行され、すべての並列ステップが完了した後に次のステップに進みます。

ワークフロー全体を並列に実行することもできます:

myWorkflow .parallel([nestedWorkflow1, nestedWorkflow2]) .then(finalStep) .commit();

並列ステップは前のステップの結果を入力として受け取ります。それらの出力は、キーがステップIDで値がステップ出力であるオブジェクトとして次のステップの入力に渡されます。例えば、上記の例ではnestedWorkflow1nestedWorkflow2の2つのキーを持つオブジェクトが出力され、それぞれのワークフローの出力が値として含まれます。

条件分岐

.branch() を使って条件分岐を作成します:

myWorkflow .then(initialStep) .branch([ [async ({ inputData }) => inputData.value > 50, highValueStep], [ async ({ inputData }) => inputData.value > 10 && inputData.value <= 50, lowValueStep, ], [async ({ inputData }) => inputData.value <= 10, extremelyLowValueStep], ]) .then(finalStep) .commit();

分岐条件は順番に評価され、一致するすべての条件のステップが並列で実行されます。もし inputData.value5 の場合、lowValueStepextremelyLowValueStep の両方が実行されます。

各条件付きステップ(highValueSteplowValueStep など)は、前のステップ(この場合は initialStep)の出力を入力として受け取ります。一致した各条件付きステップの出力が収集されます。分岐の後の次のステップ(finalStep)は、分岐内で実行されたすべてのステップの出力を含むオブジェクトを受け取ります。このオブジェクトのキーはステップIDであり、値はそれぞれのステップの出力です({ lowValueStep: <output of lowValueStep>, extremelyLowValueStep: <output of extremelyLowValueStep> })。

ループ

vNextは2種類のループをサポートしています。ステップ(またはネストされたワークフローやその他のステップ互換の構造)をループする場合、ループのinputDataは最初は前のステップの出力ですが、その後のinputDataはループステップ自体の出力になります。したがってループでは、初期ループ状態は前のステップの出力と一致するか、map関数を使用して導出される必要があります。

Do-Whileループ: 条件が真である間、ステップを繰り返し実行します。

myWorkflow .dowhile(incrementStep, async ({ inputData }) => inputData.value < 10) .then(finalStep) .commit();

Do-Untilループ: 条件が真になるまで、ステップを繰り返し実行します。

myWorkflow .dountil(incrementStep, async ({ inputData }) => inputData.value >= 10) .then(finalStep) .commit();
const workflow = createWorkflow({ id: "increment-workflow", inputSchema: z.object({ value: z.number(), }), outputSchema: z.object({ value: z.number(), }), }) .dountil(incrementStep, async ({ inputData }) => inputData.value >= 10) .then(finalStep);

Foreach

Foreachは配列型の入力の各項目に対してステップを実行するステップです。

const mapStep = createStep({ id: "map", description: "Maps (+11) on the current value", inputSchema: z.object({ value: z.number(), }), outputSchema: z.object({ value: z.number(), }), execute: async ({ inputData }) => { return { value: inputData.value + 11 }; }, }); const finalStep = createStep({ id: "final", description: "Final step that prints the result", inputSchema: z.array(z.object({ value: z.number() })), outputSchema: z.object({ finalValue: z.number(), }), execute: async ({ inputData }) => { return { finalValue: inputData.reduce((acc, curr) => acc + curr.value, 0) }; }, }); const counterWorkflow = createWorkflow({ steps: [mapStep, finalStep], id: "counter-workflow", inputSchema: z.array(z.object({ value: z.number() })), outputSchema: z.object({ finalValue: z.number(), }), }); counterWorkflow.foreach(mapStep).then(finalStep).commit(); const run = counterWorkflow.createRun(); const result = await run.start({ inputData: [{ value: 1 }, { value: 22 }, { value: 333 }], }); if (result.status === "success") { console.log(result.result); // only exists if status is success } else if (result.status === "failed") { console.error(result.error); // only exists if status is failed, this is an instance of Error }

ループは入力配列の各項目に対して、一度に1つずつ順番にステップを実行します。オプションのconcurrencyを使用すると、並行実行の数に制限を設けながらステップを並列に実行することができます。

counterWorkflow.foreach(mapStep, { concurrency: 2 }).then(finalStep).commit();

ネストされたワークフロー

vNextではワークフローをネストして組み合わせることができます:

const nestedWorkflow = createWorkflow({ id: 'nested-workflow', inputSchema: z.object({...}), outputSchema: z.object({...}), }) .then(step1) .then(step2) .commit(); const mainWorkflow = createWorkflow({ id: 'main-workflow', inputSchema: z.object({...}), outputSchema: z.object({...}), }) .then(initialStep) .then(nestedWorkflow) .then(finalStep) .commit();

上記の例では、nestedWorkflowmainWorkflowのステップとして使用されています。ここで、nestedWorkflowinputSchemainitialStepoutputSchemaと一致し、nestedWorkflowoutputSchemafinalStepinputSchemaと一致します。

ネストされたワークフローは、単純な順次実行を超えた実行フローを構成するための主要な(そして唯一の)方法です。.branch().parallel()を使用して実行フローを構成する場合、1つ以上のステップを実行するにはネストされたワークフローが必要であり、その副産物として、これらのステップがどのように実行されるかの説明が必要です。

const planBothWorkflow = createWorkflow({ id: "plan-both-workflow", inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), steps: [planActivities, planIndoorActivities, sythesizeStep], }) .parallel([planActivities, planIndoorActivities]) .then(sythesizeStep) .commit(); const weatherWorkflow = createWorkflow({ id: "weather-workflow-step3-concurrency", inputSchema: z.object({ city: z.string().describe("The city to get the weather for"), }), outputSchema: z.object({ activities: z.string(), }), steps: [fetchWeather, planBothWorkflow, planActivities], }) .then(fetchWeather) .branch([ [ async ({ inputData }) => { return inputData?.precipitationChance > 20; }, planBothWorkflow, ], [ async ({ inputData }) => { return inputData?.precipitationChance <= 20; }, planActivities, ], ]);

ネストされたワークフローは、最終結果(最後のステップの結果)のみをステップ出力として持ちます。