Control Flow in Workflows: Branching, Merging, and Conditions
When you create a multi-step process, you may need to run steps in parallel, chain them sequentially, or follow different paths based on outcomes. This page describes how you can manage branching, merging, and conditions to construct workflows that meet your logic requirements. The code snippets show the key patterns for structuring complex control flow.
Parallel Execution
You can run multiple steps at the same time if they don’t depend on each other. This approach can speed up your workflow when steps perform independent tasks. The code below shows how to add two steps in parallel:
myWorkflow.step(fetchUserData).step(fetchOrderData);
See the Parallel Steps example for more details.
Sequential Execution
Sometimes you need to run steps in strict order to ensure outputs from one step become inputs for the next. Use .then() to link dependent operations. The code below shows how to chain steps sequentially:
myWorkflow.step(fetchOrderData).then(validateData).then(processOrder);
See the Sequential Steps example for more details.
Branching and Merging Paths
When different outcomes require different paths, branching is helpful. You can also merge paths later once they complete. The code below shows how to branch after stepA and later converge on stepF:
myWorkflow
.step(stepA)
.then(stepB)
.then(stepD)
.after(stepA)
.step(stepC)
.then(stepE)
.after([stepD, stepE])
.step(stepF);
In this example:
- stepA leads to stepB, then to stepD.
- Separately, stepA also triggers stepC, which in turn leads to stepE.
- Separately, stepD also triggers stepF and stepE in parallel.
See the Branching Paths example for more details.
Merging Multiple Branches
Sometimes you need a step to execute only after multiple other steps have completed. Mastra provides a compound .after([])
syntax that allows you to specify multiple dependencies for a step.
myWorkflow
.step(fetchUserData)
.then(validateUserData)
.step(fetchProductData)
.then(validateProductData)
// This step will only run after BOTH validateUserData AND validateProductData have completed
.after([validateUserData, validateProductData])
.step(processOrder)
In this example:
fetchUserData
andfetchProductData
run in parallel branches- Each branch has its own validation step
- The
processOrder
step only executes after both validation steps have completed successfully
This pattern is particularly useful for:
- Joining parallel execution paths
- Implementing synchronization points in your workflow
- Ensuring all required data is available before proceeding
You can also create complex dependency patterns by combining multiple .after([])
calls:
myWorkflow
// First branch
.step(stepA)
.then(stepB)
.then(stepC)
// Second branch
.step(stepD)
.then(stepE)
// Third branch
.step(stepF)
.then(stepG)
// This step depends on the completion of multiple branches
.after([stepC, stepE, stepG])
.step(finalStep)
Cyclical Dependencies and Loops
Workflows often need to repeat steps until certain conditions are met. Mastra provides two powerful methods for creating loops: until
and while
. These methods offer an intuitive way to implement repetitive tasks.
Using Manual Cyclical Dependencies (Legacy Approach)
In earlier versions, you could create loops by manually defining cyclical dependencies with conditions:
myWorkflow
.step(fetchData)
.then(processData)
.after(processData)
.step(finalizeData, {
when: { "processData.status": "success" },
})
.step(fetchData, {
when: { "processData.status": "retry" },
});
While this approach still works, the newer until
and while
methods provide a cleaner and more maintainable way to create loops.
Using until
for Condition-Based Loops
The until
method repeats a step until a specified condition becomes true. It takes two arguments:
- A condition that determines when to stop looping
- The step to repeat
workflow
.step(incrementStep)
.until(async ({ context }) => {
// Stop when the value reaches or exceeds 10
const result = context.getStepResult(incrementStep);
return (result?.value ?? 0) >= 10;
}, incrementStep)
.then(finalStep);
You can also use a reference-based condition:
workflow
.step(incrementStep)
.until(
{
ref: { step: incrementStep, path: 'value' },
query: { $gte: 10 },
},
incrementStep
)
.then(finalStep);
Using while
for Condition-Based Loops
The while
method repeats a step as long as a specified condition remains true. It takes the same arguments as until
:
- A condition that determines when to continue looping
- The step to repeat
workflow
.step(incrementStep)
.while(async ({ context }) => {
// Continue as long as the value is less than 10
const result = context.getStepResult(incrementStep);
return (result?.value ?? 0) < 10;
}, incrementStep)
.then(finalStep);
You can also use a reference-based condition:
workflow
.step(incrementStep)
.while(
{
ref: { step: incrementStep, path: 'value' },
query: { $lt: 10 },
},
incrementStep
)
.then(finalStep);
Comparison Operators for Reference Conditions
When using reference-based conditions, you can use these comparison operators:
Operator | Description |
---|---|
$eq | Equal to |
$ne | Not equal to |
$gt | Greater than |
$gte | Greater than or equal to |
$lt | Less than |
$lte | Less than or equal to |
See the Loop Control example for more details.
Conditions
Use the when property to control whether a step runs based on data from previous steps. Below are three ways to specify conditions.
Option 1: Function
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: async ({ context }) => {
const fetchData = context?.getStepResult<{ status: string }>("fetchData");
return fetchData?.status === "success";
},
},
);
Option 2: Query Object
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: {
ref: {
step: {
id: "fetchData",
},
path: "status",
},
query: { $eq: "success" },
},
},
);
Option 3: Simple Path Comparison
myWorkflow.step(
new Step({
id: "processData",
execute: async ({ context }) => {
// Action logic
},
}),
{
when: {
"fetchData.status": "success",
},
},
);
Data Access Patterns
Mastra provides several ways to pass data between steps:
- Context Object - Access step results directly through the context object
- Variable Mapping - Explicitly map outputs from one step to inputs of another
- getStepResult Method - Type-safe method to retrieve step outputs
Each approach has its advantages depending on your use case and requirements for type safety.
Using getStepResult Method
The getStepResult
method provides a type-safe way to access step results. This is the recommended approach when working with TypeScript as it preserves type information.
Basic Usage
For better type safety, you can provide a type parameter to getStepResult
:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const fetchUserStep = new Step({
id: 'fetchUser',
outputSchema: z.object({
name: z.string(),
userId: z.string(),
}),
execute: async ({ context }) => {
return { name: 'John Doe', userId: '123' };
},
});
const analyzeDataStep = new Step({
id: "analyzeData",
execute: async ({ context }) => {
// Type-safe access to previous step result
const userData = context.getStepResult<{ name: string, userId: string }>("fetchUser");
if (!userData) {
return { status: "error", message: "User data not found" };
}
return {
analysis: `Analyzed data for user ${userData.name}`,
userId: userData.userId
};
},
});
Using Step References
The most type-safe approach is to reference the step directly in the getStepResult
call:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Define step with output schema
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processUserStep = new Step({
id: "processUser",
execute: async ({ context }) => {
// TypeScript will infer the correct type from fetchUserStep's outputSchema
const userData = context.getStepResult(fetchUserStep);
return {
processed: true,
userName: userData?.name
};
},
});
const workflow = new Workflow({
name: "user-workflow",
});
workflow
.step(fetchUserStep)
.then(processUserStep)
.commit();
Using Variable Mapping
Variable mapping is an explicit way to define data flow between steps.
This approach makes dependencies clear and provides good type safety.
The data injected into the step is available in the context.inputData
object, and typed based on the inputSchema
of the step.
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const sendEmailStep = new Step({
id: "sendEmail",
inputSchema: z.object({
recipientEmail: z.string(),
recipientName: z.string(),
}),
execute: async ({ context }) => {
const { recipientEmail, recipientName } = context.inputData;
// Send email logic here
return {
status: "sent",
to: recipientEmail
};
},
});
const workflow = new Workflow({
name: "email-workflow",
});
workflow
.step(fetchUserStep)
.then(sendEmailStep, {
variables: {
// Map specific fields from fetchUser to sendEmail inputs
recipientEmail: { step: fetchUserStep, path: 'email' },
recipientName: { step: fetchUserStep, path: 'name' }
}
})
.commit();
For more details on variable mapping, see the Data Mapping with Workflow Variables documentation.
Using the Context Object
The context object provides direct access to all step results and their outputs. This approach is more flexible but requires careful handling to maintain type safety.
You can access step results directly through the context.steps
object:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const processOrderStep = new Step({
id: 'processOrder',
execute: async ({ context }) => {
// Access data from a previous step
let userData: { name: string, userId: string };
if (context.steps['fetchUser']?.status === 'success') {
userData = context.steps.fetchUser.output;
} else {
throw new Error('User data not found');
}
return {
orderId: 'order123',
userId: userData.userId,
status: 'processing',
};
},
});
const workflow = new Workflow({
name: "order-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.commit();
Workflow-Level Type Safety
For comprehensive type safety across your entire workflow, you can define types for all steps and pass them to the Workflow This allows you to get type safety for the context object on conditions, and on step results in the final workflow output.
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Create steps with typed outputs
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// TypeScript knows the shape of userData
const userData = context.getStepResult(fetchUserStep);
return {
orderId: "order123",
status: "processing"
};
},
});
const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
name: "typed-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.until(async ({ context }) => {
// TypeScript knows the shape of userData here
const res = context.getStepResult('fetchUser');
return res?.userId === '123';
}, processOrderStep)
.commit();
Accessing Trigger Data
In addition to step results, you can access the original trigger data that started the workflow:
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
// Define trigger schema
const triggerSchema = z.object({
customerId: z.string(),
orderItems: z.array(z.string()),
});
type TriggerType = z.infer<typeof triggerSchema>;
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// Access trigger data with type safety
const triggerData = context.getStepResult<TriggerType>('trigger');
return {
customerId: triggerData?.customerId,
itemCount: triggerData?.orderItems.length || 0,
status: "processing"
};
},
});
const workflow = new Workflow({
name: "order-workflow",
triggerSchema,
});
workflow
.step(processOrderStep)
.commit();
Accessing Resume Data
The data injected into the step is available in the context.inputData
object, and typed based on the inputSchema
of the step.
import { Step, Workflow } from "@mastra/core/workflows";
import { z } from "zod";
const processOrderStep = new Step({
id: "processOrder",
inputSchema: z.object({
orderId: z.string(),
}),
execute: async ({ context, suspend }) => {
const { orderId } = context.inputData;
if (!orderId) {
await suspend();
return;
}
return {
orderId,
status: "processed"
};
},
});
const workflow = new Workflow({
name: "order-workflow",
});
workflow
.step(processOrderStep)
.commit();
const run = workflow.createRun();
const result = await run.start();
const resumedResult = await workflow.resume({
runId: result.runId,
stepId: 'processOrder',
inputData: {
orderId: '123',
},
});
console.log({resumedResult});
Accessing Workflow Results
You can get typed access to the results of a workflow by injecting the step types into the Workflow
type params:
import { Workflow } from "@mastra/core/workflows";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processOrderStep = new Step({
id: "processOrder",
outputSchema: z.object({
orderId: z.string(),
status: z.string(),
}),
execute: async ({ context }) => {
const userData = context.getStepResult(fetchUserStep);
return {
orderId: "order123",
status: "processing"
};
},
});
const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
name: "typed-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.commit();
const run = workflow.createRun();
const result = await run.start();
// The result is a discriminated union of the step results
// So it needs to be narrowed down via status checks
if (result.results.processOrder.status === 'success') {
// TypeScript will know the shape of the results
const orderId = result.results.processOrder.output.orderId;
console.log({orderId});
}
if (result.results.fetchUser.status === 'success') {
const userId = result.results.fetchUser.output.userId;
console.log({userId});
}
Best Practices for Data Flow
-
Use getStepResult with Step References for Type Safety
- Ensures TypeScript can infer the correct types
- Catches type errors at compile time
-
*Use Variable Mapping for Explicit Dependencies
- Makes data flow clear and maintainable
- Provides good documentation of step dependencies
-
Define Output Schemas for Steps
- Validates data at runtime
- Validates return type of the
execute
function
- Validates return type of the
- Improves type inference in TypeScript
- Validates data at runtime
-
Handle Missing Data Gracefully
- Always check if step results exist before accessing properties
- Provide fallback values for optional data
-
Keep Data Transformations Simple
- Transform data in dedicated steps rather than in variable mappings
- Makes workflows easier to test and debug
Comparison of Data Flow Methods
Method | Type Safety | Explicitness | Use Case |
---|---|---|---|
getStepResult | Highest | High | Complex workflows with strict typing requirements |
Variable Mapping | High | High | When dependencies need to be clear and explicit |
context.steps | Medium | Low | Quick access to step data in simple workflows |
By choosing the right data flow method for your use case, you can create workflows that are both type-safe and maintainable.