Sequential Flow
Chain steps to execute in sequence using .then()
:
// Chain steps to execute in sequence
myWorkflow
.then(step1) // First step
.then(step2) // Second step, receives output from step1
.then(step3) // Third step, receives output from step2
.commit();
The output from each step is automatically passed to the next step if schemas match. If the schemas don’t match, you can use the map
function to transform the output to the expected schema.
Step chaining is type-safe and checked at compile time.
Parallel Execution
Execute steps in parallel using .parallel()
:
// Execute step1 and step2 in parallel
myWorkflow
.parallel([step1, step2])
// Continue with step3 after both parallel steps complete
.then(step3)
.commit();
This executes all steps in the array concurrently, then continues to the next step after all parallel steps complete.
You can also execute entire workflows in parallel:
// Execute nestedWorkflow1 and nestedWorkflow2 in parallel
myWorkflow
.parallel([nestedWorkflow1, nestedWorkflow2])
.then(finalStep)
.commit();
Parallel steps receive previous step results as input. Their outputs are passed into the next step input as an object where the key is the step id and the value is the step output, for example the above example outputs an object with two keys nestedWorkflow1
and nestedWorkflow2
with the outputs of the respective workflows as values.
Conditional Branching
Create conditional branches using .branch()
:
myWorkflow
.then(initialStep)
.branch([
// If value > 50, run highValueStep
[async ({ inputData }) => inputData.value > 50, highValueStep],
// If value is between 11 and 50, run lowValueStep
[
async ({ inputData }) => inputData.value > 10 && inputData.value <= 50,
lowValueStep,
],
// If value <= 10, run extremelyLowValueStep
[async ({ inputData }) => inputData.value <= 10, extremelyLowValueStep],
])
// After branching, continue with finalStep
.then(finalStep)
.commit();
Branch conditions are evaluated sequentially, and all steps with matching conditions are executed in parallel. If inputData.value
is 5
then both lowValueStep
and extremelyLowValueStep
will be run.
Each conditional step (like highValueStep
or lowValueStep
) receives as input the output of the previous step (initialStep
in this case). The output of each matching conditional step is collected. The next step after the branch (finalStep
) receives an object containing the outputs of all the steps that were run in the branch. The keys of this object are the step IDs, and the values are the outputs of those steps ({ lowValueStep: <output of lowValueStep>, extremelyLowValueStep: <output of extremelyLowValueStep> }
).
Loops
vNext supports two types of loops. When looping a step (or nested workflow or any other step-compatible construct), the inputData
of the loop is the output of the previous step initially, but any subsequent inputData
is the output of the loop step itself. Thus for looping, the initial loop state should either match the previous step output or be derived using the map
function.
Do-While Loop: Executes a step repeatedly while a condition is true.
// Repeat incrementStep while value is less than 10
myWorkflow
.dowhile(incrementStep, async ({ inputData }) => inputData.value < 10)
.then(finalStep)
.commit();
Do-Until Loop: Executes a step repeatedly until a condition becomes true.
// Repeat incrementStep while value is more than 10
myWorkflow
.dountil(incrementStep, async ({ inputData }) => inputData.value >= 10)
.then(finalStep)
.commit();
Foreach
Foreach is a step that executes a step for each item in an array type input.
// Step that adds 11 to the current value
const mapStep = createStep({
id: "map",
description: "Maps (+11) on the current value",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
return { value: inputData.value + 11 };
},
});
// final step that prints the result
const finalStep = createStep({
id: "final",
description: "Final step that prints the result",
inputSchema: z.array(z.object({ value: z.number() })),
outputSchema: z.object({
finalValue: z.number(),
}),
execute: async ({ inputData }) => {
return { finalValue: inputData.reduce((acc, curr) => acc + curr.value, 0) };
},
});
const counterWorkflow = createWorkflow({
steps: [mapStep, finalStep],
id: "counter-workflow",
inputSchema: z.array(z.object({ value: z.number() })),
outputSchema: z.object({
finalValue: z.number(),
}),
});
// Apply mapStep to each item in the input array, then run finalStep
counterWorkflow.foreach(mapStep).then(finalStep).commit();
const run = counterWorkflow.createRun();
const result = await run.start({
inputData: [{ value: 1 }, { value: 22 }, { value: 333 }],
});
if (result.status === "success") {
console.log(result.result); // only exists if status is success
} else if (result.status === "failed") {
console.error(result.error); // only exists if status is failed, this is an instance of Error
}
The loop executes the step for each item in the input array in sequence one at a time. The optional concurrency
option allows you to execute steps in parallel with a limit on the number of concurrent executions.
counterWorkflow.foreach(mapStep, { concurrency: 2 }).then(finalStep).commit();
Nested Workflows
vNext supports composing workflows by nesting them:
const nestedWorkflow = createWorkflow({
id: 'nested-workflow',
inputSchema: z.object({...}),
outputSchema: z.object({...}),
})
.then(step1)
.then(step2)
.commit();
const mainWorkflow = createWorkflow({
id: 'main-workflow',
inputSchema: z.object({...}),
outputSchema: z.object({...}),
})
.then(initialStep) // Run initialStep first
.then(nestedWorkflow) // Then run the nested workflow as a step
.then(finalStep) // Finally, run finalStep
.commit();
In the above example, the nestedWorkflow
is used as a step in the mainWorkflow
, where the inputSchema
of nestedWorkflow
matches the outputSchema
of initialStep
, and the outputSchema
of nestedWorkflow
matches the inputSchema
of finalStep
.
Nested workflows are the main (and only) way compose execution flows beyond simple sequential execution. When using .branch()
or .parallel()
to compose execution flows, executing more than just one step necessarily requires a nested workflow, and as a byproduct, a description of how these steps are to be executed.
// Define a workflow that plans both indoor and outdoor activities in parallel, then synthesizes the results
const planBothWorkflow = createWorkflow({
id: "plan-both-workflow",
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
steps: [planActivities, planIndoorActivities, sythesizeStep],
})
// Run planActivities and planIndoorActivities in parallel
.parallel([planActivities, planIndoorActivities])
.then(sythesizeStep)
.commit();
// Define the main weather workflow
const weatherWorkflow = createWorkflow({
id: "weather-workflow-step3-concurrency",
inputSchema: z.object({
city: z.string().describe("The city to get the weather for"),
}),
outputSchema: z.object({
activities: z.string(),
}),
steps: [fetchWeather, planBothWorkflow, planActivities],
})
// First, fetch the weather for the given city
.then(fetchWeather)
.branch([
// If precipitationChance > 20, plan both indoor and outdoor activities (nested workflow)
[
async ({ inputData }) => {
return inputData?.precipitationChance > 20;
},
planBothWorkflow,
],
// If precipitationChance <= 20, only plan outdoor activities
[
async ({ inputData }) => {
return inputData?.precipitationChance <= 20;
},
planActivities,
],
]);
Nested workflows only have their final result (result of the last step) as their step output.