Sequential Flow
Chain steps to execute in sequence using .then()
:
myWorkflow.then(step1).then(step2).then(step3).commit();
The output from each step is automatically passed to the next step if schemas match. If the schemas don’t match, you can use the map
function to transform the output to the expected schema.
Step chaining is type-safe and checked at compile time.
Parallel Execution
Execute steps in parallel using .parallel()
:
myWorkflow.parallel([step1, step2]).then(step3).commit();
This executes all steps in the array concurrently, then continues to the next step after all parallel steps complete.
You can also execute entire workflows in parallel:
myWorkflow
.parallel([nestedWorkflow1, nestedWorkflow2])
.then(finalStep)
.commit();
Parallel steps receive previous step results as input. Their outputs are passed into the next step input as an object where the key is the step id and the value is the step output, for example the above example outputs an object with two keys nestedWorkflow1
and nestedWorkflow2
with the outputs of the respective workflows as values.
Conditional Branching
Create conditional branches using .branch()
:
myWorkflow
.then(initialStep)
.branch([
[async ({ inputData }) => inputData.value > 50, highValueStep],
[async ({ inputData }) => inputData.value <= 50, lowValueStep],
])
.then(finalStep)
.commit();
Branch conditions are evaluated in order, and the first matching condition’s step is executed.
Conditional steps receive previous step results as input. Their outputs are passed into the next step input as an object where the key is the step id and the value is the step output, for example the above example outputs an object with two keys highValueStep
and lowValueStep
with the outputs of the respective workflows as values.
When multiple conditions are true, all matching steps are executed in parallel.
Loops
vNext supports two types of loops. When looping a step (or nested workflow or any other step-compatible construct), the inputData
of the loop is the output of the previous step initially, but any subsequent inputData
is the output of the loop step itself. Thus for looping, the initial loop state should either match the previous step output or be derived using the map
function.
Do-While Loop: Executes a step repeatedly while a condition is true.
myWorkflow
.dowhile(incrementStep, async ({ inputData }) => inputData.value < 10)
.then(finalStep)
.commit();
Do-Until Loop: Executes a step repeatedly until a condition becomes true.
myWorkflow
.dountil(incrementStep, async ({ inputData }) => inputData.value >= 10)
.then(finalStep)
.commit();
const workflow = createWorkflow({
id: "increment-workflow",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
})
.dountil(incrementStep, async ({ inputData }) => inputData.value >= 10)
.then(finalStep);
Foreach
Foreach is a step that executes a step for each item in an array type input.
const mapStep = createStep({
id: "map",
description: "Maps (+11) on the current value",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
return { value: inputData.value + 11 };
},
});
const finalStep = createStep({
id: "final",
description: "Final step that prints the result",
inputSchema: z.array(z.object({ value: z.number() })),
outputSchema: z.object({
finalValue: z.number(),
}),
execute: async ({ inputData }) => {
return { finalValue: inputData.reduce((acc, curr) => acc + curr.value, 0) };
},
});
const counterWorkflow = createWorkflow({
steps: [mapStep, finalStep],
id: "counter-workflow",
inputSchema: z.array(z.object({ value: z.number() })),
outputSchema: z.object({
finalValue: z.number(),
}),
});
counterWorkflow.foreach(mapStep).then(finalStep).commit();
const run = counterWorkflow.createRun();
const result = await run.start({
inputData: [{ value: 1 }, { value: 22 }, { value: 333 }],
});
if (result.status === "success") {
console.log(result.result); // only exists if status is success
} else if (result.status === "failed") {
console.error(result.error); // only exists if status is failed, this is an instance of Error
}
The loop executes the step for each item in the input array in sequence one at a time. The optional concurrency
option allows you to execute steps in parallel with a limit on the number of concurrent executions.
counterWorkflow.foreach(mapStep, { concurrency: 2 }).then(finalStep).commit();
Nested Workflows
vNext supports composing workflows by nesting them:
const nestedWorkflow = createWorkflow({
id: 'nested-workflow',
inputSchema: z.object({...}),
outputSchema: z.object({...}),
})
.then(step1)
.then(step2)
.commit();
const mainWorkflow = createWorkflow({
id: 'main-workflow',
inputSchema: z.object({...}),
outputSchema: z.object({...}),
})
.then(initialStep)
.then(nestedWorkflow)
.then(finalStep)
.commit();
In the above example, the nestedWorkflow
is used as a step in the mainWorkflow
, where the inputSchema
of nestedWorkflow
matches the outputSchema
of initialStep
, and the outputSchema
of nestedWorkflow
matches the inputSchema
of finalStep
.
Nested workflows are the main (and only) way compose execution flows beyond simple sequential execution. When using .branch()
or .parallel()
to compose execution flows, executing more than just one step necessarily requires a nested workflow, and as a byproduct, a description of how these steps are to be executed.
const planBothWorkflow = createWorkflow({
id: "plan-both-workflow",
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
steps: [planActivities, planIndoorActivities, sythesizeStep],
})
.parallel([planActivities, planIndoorActivities])
.then(sythesizeStep)
.commit();
const weatherWorkflow = createWorkflow({
id: "weather-workflow-step3-concurrency",
inputSchema: z.object({
city: z.string().describe("The city to get the weather for"),
}),
outputSchema: z.object({
activities: z.string(),
}),
steps: [fetchWeather, planBothWorkflow, planActivities],
})
.then(fetchWeather)
.branch([
[
async ({ inputData }) => {
return inputData?.precipitationChance > 20;
},
planBothWorkflow,
],
[
async ({ inputData }) => {
return inputData?.precipitationChance <= 20;
},
planActivities,
],
]);
Nested workflows only have their final result (result of the last step) as their step output.