Skip to Content

Sequential Flow

Chain steps to execute in sequence using .then():

// Chain steps to execute in sequence myWorkflow .then(step1) // First step .then(step2) // Second step, receives output from step1 .then(step3) // Third step, receives output from step2 .commit();

The output from each step is automatically passed to the next step if schemas match. If the schemas don’t match, you can use the map function to transform the output to the expected schema. Step chaining is type-safe and checked at compile time.

Parallel Execution

Execute steps in parallel using .parallel():

// Execute step1 and step2 in parallel myWorkflow .parallel([step1, step2]) // Continue with step3 after both parallel steps complete .then(step3) .commit();

This executes all steps in the array concurrently, then continues to the next step after all parallel steps complete.

You can also execute entire workflows in parallel:

// Execute nestedWorkflow1 and nestedWorkflow2 in parallel myWorkflow .parallel([nestedWorkflow1, nestedWorkflow2]) .then(finalStep) .commit();

Parallel steps receive previous step results as input. Their outputs are passed into the next step input as an object where the key is the step id and the value is the step output, for example the above example outputs an object with two keys nestedWorkflow1 and nestedWorkflow2 with the outputs of the respective workflows as values.

Conditional Branching

Create conditional branches using .branch():

myWorkflow .then(initialStep) .branch([ // If value > 50, run highValueStep [async ({ inputData }) => inputData.value > 50, highValueStep], // If value is between 11 and 50, run lowValueStep [ async ({ inputData }) => inputData.value > 10 && inputData.value <= 50, lowValueStep, ], // If value <= 10, run extremelyLowValueStep [async ({ inputData }) => inputData.value <= 10, extremelyLowValueStep], ]) // After branching, continue with finalStep .then(finalStep) .commit();

Branch conditions are evaluated sequentially, and all steps with matching conditions are executed in parallel. If inputData.value is 5 then both lowValueStep and extremelyLowValueStep will be run.

Each conditional step (like highValueStep or lowValueStep) receives as input the output of the previous step (initialStep in this case). The output of each matching conditional step is collected. The next step after the branch (finalStep) receives an object containing the outputs of all the steps that were run in the branch. The keys of this object are the step IDs, and the values are the outputs of those steps ({ lowValueStep: <output of lowValueStep>, extremelyLowValueStep: <output of extremelyLowValueStep> }).

Loops

vNext supports two types of loops. When looping a step (or nested workflow or any other step-compatible construct), the inputData of the loop is the output of the previous step initially, but any subsequent inputData is the output of the loop step itself. Thus for looping, the initial loop state should either match the previous step output or be derived using the map function.

Do-While Loop: Executes a step repeatedly while a condition is true.

// Repeat incrementStep while value is less than 10 myWorkflow .dowhile(incrementStep, async ({ inputData }) => inputData.value < 10) .then(finalStep) .commit();

Do-Until Loop: Executes a step repeatedly until a condition becomes true.

// Repeat incrementStep while value is more than 10 myWorkflow .dountil(incrementStep, async ({ inputData }) => inputData.value >= 10) .then(finalStep) .commit();

Foreach

Foreach is a step that executes a step for each item in an array type input.

// Step that adds 11 to the current value const mapStep = createStep({ id: "map", description: "Maps (+11) on the current value", inputSchema: z.object({ value: z.number(), }), outputSchema: z.object({ value: z.number(), }), execute: async ({ inputData }) => { return { value: inputData.value + 11 }; }, }); // final step that prints the result const finalStep = createStep({ id: "final", description: "Final step that prints the result", inputSchema: z.array(z.object({ value: z.number() })), outputSchema: z.object({ finalValue: z.number(), }), execute: async ({ inputData }) => { return { finalValue: inputData.reduce((acc, curr) => acc + curr.value, 0) }; }, }); const counterWorkflow = createWorkflow({ steps: [mapStep, finalStep], id: "counter-workflow", inputSchema: z.array(z.object({ value: z.number() })), outputSchema: z.object({ finalValue: z.number(), }), }); // Apply mapStep to each item in the input array, then run finalStep counterWorkflow.foreach(mapStep).then(finalStep).commit(); const run = counterWorkflow.createRun(); const result = await run.start({ inputData: [{ value: 1 }, { value: 22 }, { value: 333 }], }); if (result.status === "success") { console.log(result.result); // only exists if status is success } else if (result.status === "failed") { console.error(result.error); // only exists if status is failed, this is an instance of Error }

The loop executes the step for each item in the input array in sequence one at a time. The optional concurrency option allows you to execute steps in parallel with a limit on the number of concurrent executions.

counterWorkflow.foreach(mapStep, { concurrency: 2 }).then(finalStep).commit();

Nested Workflows

vNext supports composing workflows by nesting them:

const nestedWorkflow = createWorkflow({ id: 'nested-workflow', inputSchema: z.object({...}), outputSchema: z.object({...}), }) .then(step1) .then(step2) .commit(); const mainWorkflow = createWorkflow({ id: 'main-workflow', inputSchema: z.object({...}), outputSchema: z.object({...}), }) .then(initialStep) // Run initialStep first .then(nestedWorkflow) // Then run the nested workflow as a step .then(finalStep) // Finally, run finalStep .commit();

In the above example, the nestedWorkflow is used as a step in the mainWorkflow, where the inputSchema of nestedWorkflow matches the outputSchema of initialStep, and the outputSchema of nestedWorkflow matches the inputSchema of finalStep.

Nested workflows are the main (and only) way compose execution flows beyond simple sequential execution. When using .branch() or .parallel() to compose execution flows, executing more than just one step necessarily requires a nested workflow, and as a byproduct, a description of how these steps are to be executed.

// Define a workflow that plans both indoor and outdoor activities in parallel, then synthesizes the results const planBothWorkflow = createWorkflow({ id: "plan-both-workflow", inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), steps: [planActivities, planIndoorActivities, sythesizeStep], }) // Run planActivities and planIndoorActivities in parallel .parallel([planActivities, planIndoorActivities]) .then(sythesizeStep) .commit(); // Define the main weather workflow const weatherWorkflow = createWorkflow({ id: "weather-workflow-step3-concurrency", inputSchema: z.object({ city: z.string().describe("The city to get the weather for"), }), outputSchema: z.object({ activities: z.string(), }), steps: [fetchWeather, planBothWorkflow, planActivities], }) // First, fetch the weather for the given city .then(fetchWeather) .branch([ // If precipitationChance > 20, plan both indoor and outdoor activities (nested workflow) [ async ({ inputData }) => { return inputData?.precipitationChance > 20; }, planBothWorkflow, ], // If precipitationChance <= 20, only plan outdoor activities [ async ({ inputData }) => { return inputData?.precipitationChance <= 20; }, planActivities, ], ]);

Nested workflows only have their final result (result of the last step) as their step output.