Nested Workflows
Mastra allows you to use workflows as steps within other workflows, enabling you to create modular and reusable workflow components. This feature helps in organizing complex workflows into smaller, manageable pieces and promotes code reuse.
It is also visually easier to understand the flow of a workflow when you can see the nested workflows as steps in the parent workflow.
Basic Usage
You can use a workflow as a step directly in another workflow using the step()
method:
// Create a nested workflow
const nestedWorkflow = new Workflow({ name: "nested-workflow" })
.step(stepA)
.then(stepB)
.commit();
// Use the nested workflow in a parent workflow
const parentWorkflow = new Workflow({ name: "parent-workflow" })
.step(nestedWorkflow, {
variables: {
city: {
step: "trigger",
path: "myTriggerInput",
},
},
})
.then(stepC)
.commit();
When a workflow is used as a step:
- It is automatically converted to a step using the workflow’s name as the step ID
- The workflow’s results are available in the parent workflow’s context
- The nested workflow’s steps are executed in their defined order
Accessing Results
Results from a nested workflow are available in the parent workflow’s context under the nested workflow’s name. The results include all step outputs from the nested workflow:
const { results } = await parentWorkflow.start();
// Access nested workflow results
const nestedWorkflowResult = results["nested-workflow"];
if (nestedWorkflowResult.status === "success") {
const nestedResults = nestedWorkflowResult.output.results;
}
Control Flow with Nested Workflows
Nested workflows support all the control flow features available to regular steps:
Parallel Execution
Multiple nested workflows can be executed in parallel:
parentWorkflow
.step(nestedWorkflowA)
.step(nestedWorkflowB)
.after([nestedWorkflowA, nestedWorkflowB])
.step(finalStep);
Or using step()
with an array of workflows:
parentWorkflow.step([nestedWorkflowA, nestedWorkflowB]).then(finalStep);
In this case, then()
will implicitly wait for all the workflows to finish before executing the final step.
If-Else Branching
Nested workflows can be used in if-else branches using the new syntax that accepts both branches as arguments:
// Create nested workflows for different paths
const workflowA = new Workflow({ name: "workflow-a" })
.step(stepA1)
.then(stepA2)
.commit();
const workflowB = new Workflow({ name: "workflow-b" })
.step(stepB1)
.then(stepB2)
.commit();
// Use the new if-else syntax with nested workflows
parentWorkflow
.step(initialStep)
.if(
async ({ context }) => {
// Your condition here
return someCondition;
},
workflowA, // if branch
workflowB, // else branch
)
.then(finalStep)
.commit();
The new syntax is more concise and clearer when working with nested workflows. When the condition is:
true
: The first workflow (if branch) is executedfalse
: The second workflow (else branch) is executed
The skipped workflow will have a status of skipped
in the results:
The .then(finalStep)
call following the if-else block will merge the if and else branches back into a single execution path.
Looping
Nested workflows can use .until()
and .while()
loops same as any other step. One interesting new pattern is to pass a workflow directly as the loop-back argument to keep executing that nested workflow until something is true about its results:
parentWorkflow
.step(firstStep)
.while(
({ context }) =>
context.getStepResult("nested-workflow").output.results.someField ===
"someValue",
nestedWorkflow,
)
.step(finalStep)
.commit();
Watching Nested Workflows
You can watch the state changes of nested workflows using the watch
method on the parent workflow. This is useful for monitoring the progress and state transitions of complex workflows:
const parentWorkflow = new Workflow({ name: "parent-workflow" })
.step([nestedWorkflowA, nestedWorkflowB])
.then(finalStep)
.commit();
const run = parentWorkflow.createRun();
const unwatch = parentWorkflow.watch((state) => {
console.log("Current state:", state.value);
// Access nested workflow states in state.context
});
await run.start();
unwatch(); // Stop watching when done
Suspending and Resuming
Nested workflows support suspension and resumption, allowing you to pause and continue workflow execution at specific points. You can suspend either the entire nested workflow or specific steps within it:
// Define a step that may need to suspend
const suspendableStep = new Step({
id: "other",
description: "Step that may need to suspend",
execute: async ({ context, suspend }) => {
if (!wasSuspended) {
wasSuspended = true;
await suspend();
}
return { other: 26 };
},
});
// Create a nested workflow with suspendable steps
const nestedWorkflow = new Workflow({ name: "nested-workflow-a" })
.step(startStep)
.then(suspendableStep)
.then(finalStep)
.commit();
// Use in parent workflow
const parentWorkflow = new Workflow({ name: "parent-workflow" })
.step(beginStep)
.then(nestedWorkflow)
.then(lastStep)
.commit();
// Start the workflow
const run = parentWorkflow.createRun();
const { runId, results } = await run.start({ triggerData: { startValue: 1 } });
// Check if a specific step in the nested workflow is suspended
if (results["nested-workflow-a"].output.results.other.status === "suspended") {
// Resume the specific suspended step using dot notation
const resumedResults = await run.resume({
stepId: "nested-workflow-a.other",
context: { startValue: 1 },
});
// The resumed results will contain the completed nested workflow
expect(resumedResults.results["nested-workflow-a"].output.results).toEqual({
start: { output: { newValue: 1 }, status: "success" },
other: { output: { other: 26 }, status: "success" },
final: { output: { finalValue: 27 }, status: "success" },
});
}
When resuming a nested workflow:
- Use the nested workflow’s name as the
stepId
when callingresume()
to resume the entire workflow - Use dot notation (
nested-workflow.step-name
) to resume a specific step within the nested workflow - The nested workflow will continue from the suspended step with the provided context
- You can check the status of specific steps in the nested workflow’s results using
results["nested-workflow"].output.results
Result Schemas and Mapping
Nested workflows can define their result schema and mapping, which helps in type safety and data transformation. This is particularly useful when you want to ensure the nested workflow’s output matches a specific structure or when you need to transform the results before they’re used in the parent workflow.
// Create a nested workflow with result schema and mapping
const nestedWorkflow = new Workflow({
name: "nested-workflow",
result: {
schema: z.object({
total: z.number(),
items: z.array(
z.object({
id: z.string(),
value: z.number(),
}),
),
}),
mapping: {
// Map values from step results using variables syntax
total: { step: "step-a", path: "count" },
items: { step: "step-b", path: "items" },
},
},
})
.step(stepA)
.then(stepB)
.commit();
// Use in parent workflow with type-safe results
const parentWorkflow = new Workflow({ name: "parent-workflow" })
.step(nestedWorkflow)
.then(async ({ context }) => {
const result = context.getStepResult("nested-workflow");
// TypeScript knows the structure of result
console.log(result.total); // number
console.log(result.items); // Array<{ id: string, value: number }>
return { success: true };
})
.commit();
Best Practices
- Modularity: Use nested workflows to encapsulate related steps and create reusable workflow components.
- Naming: Give nested workflows descriptive names as they will be used as step IDs in the parent workflow.
- Error Handling: Nested workflows propagate their errors to the parent workflow, so handle errors appropriately.
- State Management: Each nested workflow maintains its own state but can access the parent workflow’s context.
- Suspension: When using suspension in nested workflows, consider the entire workflow’s state and handle resumption appropriately.
Example
Here’s a complete example showing various features of nested workflows:
const workflowA = new Workflow({
name: "workflow-a",
result: {
schema: z.object({
activities: z.string(),
}),
mapping: {
activities: {
step: planActivities,
path: "activities",
},
},
},
})
.step(fetchWeather)
.then(planActivities)
.commit();
const workflowB = new Workflow({
name: "workflow-b",
result: {
schema: z.object({
activities: z.string(),
}),
mapping: {
activities: {
step: planActivities,
path: "activities",
},
},
},
})
.step(fetchWeather)
.then(planActivities)
.commit();
const weatherWorkflow = new Workflow({
name: "weather-workflow",
triggerSchema: z.object({
cityA: z.string().describe("The city to get the weather for"),
cityB: z.string().describe("The city to get the weather for"),
}),
result: {
schema: z.object({
activitiesA: z.string(),
activitiesB: z.string(),
}),
mapping: {
activitiesA: {
step: workflowA,
path: "result.activities",
},
activitiesB: {
step: workflowB,
path: "result.activities",
},
},
},
})
.step(workflowA, {
variables: {
city: {
step: "trigger",
path: "cityA",
},
},
})
.step(workflowB, {
variables: {
city: {
step: "trigger",
path: "cityB",
},
},
});
weatherWorkflow.commit();
In this example:
- We define schemas for type safety across all workflows
- Each step has proper input and output schemas
- The nested workflows have their own trigger schemas and result mappings
- Data is passed through using variables syntax in the
.step()
calls - The main workflow combines data from both nested workflows