Event-Driven Workflows
Mastra provides built-in support for event-driven workflows through the afterEvent
and resumeWithEvent
methods. These methods allow you to create workflows that pause execution while waiting for specific events to occur, then resume with the event data when it’s available.
Overview
Event-driven workflows are useful for scenarios where:
- You need to wait for external systems to complete processing
- User approval or input is required at specific points
- Asynchronous operations need to be coordinated
- Long-running processes need to break up execution across different services
Defining Events
Before using event-driven methods, you must define the events your workflow will listen for in the workflow configuration:
import { Workflow } from '@mastra/core/workflows';
import { z } from 'zod';
const workflow = new Workflow({
name: 'approval-workflow',
triggerSchema: z.object({ requestId: z.string() }),
events: {
// Define events with their validation schemas
approvalReceived: {
schema: z.object({
approved: z.boolean(),
approverName: z.string(),
comment: z.string().optional(),
}),
},
documentUploaded: {
schema: z.object({
documentId: z.string(),
documentType: z.enum(['invoice', 'receipt', 'contract']),
metadata: z.record(z.string()).optional(),
}),
},
},
});
Each event must have a name and a schema that defines the structure of data expected when the event occurs.
afterEvent()
The afterEvent
method creates a suspension point in your workflow that automatically waits for a specific event.
Syntax
workflow.afterEvent(eventName: string): Workflow
Parameters
eventName
: The name of the event to wait for (must be defined in the workflow’sevents
configuration)
Return Value
Returns the workflow instance for method chaining.
How It Works
When afterEvent
is called, Mastra:
- Creates a special step with ID
__eventName_event
- Configures this step to automatically suspend workflow execution
- Sets up the continuation point after the event is received
Usage Example
workflow
.step(initialProcessStep)
.afterEvent('approvalReceived') // Workflow suspends here
.step(postApprovalStep) // This runs after event is received
.then(finalStep)
.commit();
resumeWithEvent()
The resumeWithEvent
method resumes a suspended workflow by providing data for a specific event.
Syntax
run.resumeWithEvent(eventName: string, data: any): Promise<WorkflowRunResult>
Parameters
eventName
: The name of the event being triggereddata
: The event data (must conform to the schema defined for this event)
Return Value
Returns a Promise that resolves to the workflow execution results after resumption.
How It Works
When resumeWithEvent
is called, Mastra:
- Validates the event data against the schema defined for that event
- Loads the workflow snapshot
- Updates the context with the event data
- Resumes execution from the event step
- Continues workflow execution with the subsequent steps
Usage Example
// Create a workflow run
const run = workflow.createRun();
// Start the workflow
await run.start({ triggerData: { requestId: 'req-123' } });
// Later, when the event occurs:
const result = await run.resumeWithEvent('approvalReceived', {
approved: true,
approverName: 'John Doe',
comment: 'Looks good to me!'
});
console.log(result.results);
Accessing Event Data
When a workflow is resumed with event data, that data is available in the step context as context.inputData.resumedEvent
:
const processApprovalStep = new Step({
id: 'processApproval',
execute: async ({ context }) => {
// Access the event data
const eventData = context.inputData.resumedEvent;
return {
processingResult: `Processed approval from ${eventData.approverName}`,
wasApproved: eventData.approved,
};
},
});
Multiple Events
You can create workflows that wait for multiple different events at various points:
workflow
.step(createRequest)
.afterEvent('approvalReceived')
.step(processApproval)
.afterEvent('documentUploaded')
.step(processDocument)
.commit();
When resuming a workflow with multiple event suspension points, you need to provide the correct event name and data for the current suspension point.
Practical Example
This example shows a complete workflow that requires both approval and document upload:
import { Workflow, Step } from '@mastra/core/workflows';
import { z } from 'zod';
// Define steps
const createRequest = new Step({
id: 'createRequest',
execute: async () => ({ requestId: `req-${Date.now()}` }),
});
const processApproval = new Step({
id: 'processApproval',
execute: async ({ context }) => {
const approvalData = context.inputData.resumedEvent;
return {
approved: approvalData.approved,
approver: approvalData.approverName,
};
},
});
const processDocument = new Step({
id: 'processDocument',
execute: async ({ context }) => {
const documentData = context.inputData.resumedEvent;
return {
documentId: documentData.documentId,
processed: true,
type: documentData.documentType,
};
},
});
const finalizeRequest = new Step({
id: 'finalizeRequest',
execute: async ({ context }) => {
const requestId = context.steps.createRequest.output.requestId;
const approved = context.steps.processApproval.output.approved;
const documentId = context.steps.processDocument.output.documentId;
return {
finalized: true,
summary: `Request ${requestId} was ${approved ? 'approved' : 'rejected'} with document ${documentId}`
};
},
});
// Create workflow
const requestWorkflow = new Workflow({
name: 'document-request-workflow',
events: {
approvalReceived: {
schema: z.object({
approved: z.boolean(),
approverName: z.string(),
}),
},
documentUploaded: {
schema: z.object({
documentId: z.string(),
documentType: z.enum(['invoice', 'receipt', 'contract']),
}),
},
},
});
// Build workflow
requestWorkflow
.step(createRequest)
.afterEvent('approvalReceived')
.step(processApproval)
.afterEvent('documentUploaded')
.step(processDocument)
.then(finalizeRequest)
.commit();
// Export workflow
export { requestWorkflow };
Running the Example Workflow
import { requestWorkflow } from './workflows';
import { mastra } from './mastra';
async function runWorkflow() {
// Get the workflow
const workflow = mastra.getWorkflow('document-request-workflow');
const run = workflow.createRun();
// Start the workflow
const initialResult = await run.start();
console.log('Workflow started:', initialResult.results);
// Simulate receiving approval
const afterApprovalResult = await run.resumeWithEvent('approvalReceived', {
approved: true,
approverName: 'Jane Smith',
});
console.log('After approval:', afterApprovalResult.results);
// Simulate document upload
const finalResult = await run.resumeWithEvent('documentUploaded', {
documentId: 'doc-456',
documentType: 'invoice',
});
console.log('Final result:', finalResult.results);
}
runWorkflow().catch(console.error);
Best Practices
- Define Clear Event Schemas: Use Zod to create precise schemas for event data validation
- Use Descriptive Event Names: Choose event names that clearly communicate their purpose
- Handle Missing Events: Ensure your workflow can handle cases where events don’t occur or time out
- Include Monitoring: Use the
watch
method to monitor suspended workflows waiting for events - Consider Timeouts: Implement timeout mechanisms for events that may never occur
- Document Events: Clearly document the events your workflow depends on for other developers