Parallel Steps
When building AI applications, you often need to process multiple independent tasks simultaneously to improve efficiency.
Control Flow Diagram
This example shows how to structure a workflow that executes steps in parallel, with each branch handling its own data flow and dependencies.
Here’s the control flow diagram:
Creating the Steps
Let’s start by creating the steps and initializing the workflow.
import { Step, Workflow } from "@mastra/core";
import { z } from "zod";
const stepOne = new Step({
id: "stepOne",
execute: async ({ context: { machineContext } }) => ({
doubledValue: machineContext.triggerData.inputValue * 2,
}),
});
const stepTwo = new Step({
id: "stepTwo",
execute: async ({ context: { machineContext } }) => ({
incrementedValue:
machineContext.stepResults.stepOne.payload.doubledValue + 1,
}),
});
const stepThree = new Step({
id: "stepThree",
execute: async ({ context: { machineContext } }) => ({
tripledValue: machineContext.triggerData.inputValue * 3,
}),
});
const stepFour = new Step({
id: "stepFour",
execute: async ({ context: { machineContext } }) => ({
isEven: machineContext.stepResults.stepThree.payload.tripledValue % 2 === 0,
}),
});
const myWorkflow = new Workflow({
name: "my-workflow",
triggerSchema: z.object({
inputValue: z.number(),
}),
});
Chaining and Parallelizing Steps
Now we can add the steps to the workflow. Note the .then()
method is used to chain the steps, but the .step()
method is used to add the steps to the workflow.
myWorkflow
.step(stepOne)
.then(stepTwo) // chain one
.step(stepThree)
.then(stepFour) // chain two
.commit();
const result = await myWorkflow.execute({ triggerData: { inputValue: 3 } });
View Example on GitHub