Workflow
The Workflow class enables you to create state machines for complex sequences of operations with conditional branching and data validation.
import { Workflow } from '@mastra/core';
const workflow = new Workflow('my-workflow');
API Reference
Constructor
name:
string
Identifier for the workflow
logger?:
Logger<WorkflowLogMessage>
Optional logger instance for workflow execution details
Core Methods
addStep()
Adds a Step to the workflow, including transitions to other steps. Returns the workflow instance for chaining. Learn more about steps.
setTriggerSchema()
Sets a schema for validating initial workflow data.
commit()
Validates and finalizes the workflow configuration. Must be called after adding all steps.
executeWorkflow()
Executes the workflow with optional trigger data.
Trigger Schemas
Trigger schemas validate the initial data passed to a workflow using Zod.
const workflow = new Workflow('order-process')
.setTriggerSchema(
z.object({
orderId: z.string(),
customer: z.object({
id: z.string(),
email: z.string().email()
})
})
);
The schema:
- Validates data passed to
executeWorkflow()
- Provides TypeScript types for your workflow input
Variables & Data Flow
Variables allow steps to access data from:
- Previous steps’ outputs
- Trigger data
- Static payloads
workflow
.addStep('createOrder', {
// Access trigger data
variables: {
orderId: { stepId: 'trigger', path: 'orderId' }
},
action: async (data) => ({
status: 'created',
total: 100
})
})
.addStep('processPayment', {
variables: {
// Access previous step's data
orderStatus: { stepId: 'createOrder', path: 'status' },
amount: { stepId: 'createOrder', path: 'total' }
},
// Add static values
payload: {
currency: 'USD'
},
action: async (data) => {
// data contains:
// - orderStatus and amount from createOrder step
// - currency from payload
return { status: 'paid' };
}
});
Variable Resolution
- Variables are resolved in order of step execution
- Each step can access outputs of all previous steps
- Paths use dot notation for nested data
- Missing or invalid paths throw errors during execution
Example
const workflow = new Workflow('process-data')
.setTriggerSchema(z.object({
items: z.array(z.object({
id: z.number(),
value: z.number()
}))
}))
.addStep('filter', {
action: async (data) => ({
filtered: data.items.filter(item => item.value > 50)
}),
variables: {
items: { stepId: 'trigger', path: '.' }
},
transitions: {
process: {
condition: {
ref: { stepId: 'filter', path: 'filtered' },
query: { $where: (value) => value.length > 0 }
}
}
}
})
.addStep('process', {
action: async ({ items }) => ({
processed: items.map(item => ({
id: item.id,
doubled: item.value * 2
}))
}),
variables: {
items: { stepId: 'filter', path: 'filtered' }
}
})
.commit();
Validation
Workflow validation happens at two key times:
1. At Commit Time
When you call .commit()
, the workflow validates:
workflow
.addStep('step1', {...})
.addStep('step2', {...})
.commit(); // Validates workflow structure
- Circular dependencies between steps
- Terminal paths (every path must end)
- Unreachable steps
- Variable references to non-existent steps
- Duplicate step IDs
2. During Execution
When you call executeWorkflow()
, it validates:
// Validates trigger data against schema
await workflow.executeWorkflow({
orderId: '123',
customer: {
id: 'cust_123',
email: 'invalid-email' // Will fail validation
}
});
- Trigger data against trigger schema
- Each step’s input data against its inputSchema
- Variable paths exist in referenced step outputs
- Required variables are present
Error Handling
try {
await workflow.executeWorkflow(data);
} catch (error) {
if (error instanceof ValidationError) {
// Handle validation errors
console.log(error.type); // 'circular_dependency' | 'no_terminal_path' | 'unreachable_step'
console.log(error.details); // { stepId?: string, path?: string[] }
}
}