Skip to Content
ExamplesWorkflowsSuspend and Resume

Workflow with Suspend and Resume

Workflow steps can be suspended and resumed at any point in the workflow execution. This example demonstrates how to suspend a workflow step and resume it later.

Basic Example

import { Mastra } from '@mastra/core'; import { Step, Workflow } from '@mastra/core/workflows'; import { z } from 'zod'; const stepOne = new Step({ id: 'stepOne', outputSchema: z.object({ doubledValue: z.number(), }), execute: async ({ context }) => { const doubledValue = context.triggerData.inputValue * 2; return { doubledValue }; }, }); const stepTwo = new Step({ id: 'stepTwo', outputSchema: z.object({ incrementedValue: z.number(), }), execute: async ({ context, suspend }) => { const secondValue = context.inputData?.secondValue ?? 0; const doubledValue = context.getStepResult(stepOne)?.doubledValue ?? 0; const incrementedValue = doubledValue + secondValue; if (incrementedValue < 100) { await suspend(); return { incrementedValue: 0 }; } return { incrementedValue }; }, }); // Build the workflow const myWorkflow = new Workflow({ name: 'my-workflow', triggerSchema: z.object({ inputValue: z.number(), }), }); // run workflows in parallel myWorkflow .step(stepOne) .then(stepTwo) .commit(); // Register the workflow export const mastra = new Mastra({ workflows: { registeredWorkflow: myWorkflow }, }) // Get registered workflow from Mastra const registeredWorkflow = mastra.getWorkflow('registeredWorkflow'); const { runId, start } = registeredWorkflow.createRun(); // Start watching the workflow before executing it myWorkflow.watch(async ({ context, activePaths }) => { for (const _path of activePaths) { const stepTwoStatus = context.steps?.stepTwo?.status; if (stepTwoStatus === 'suspended') { console.log("Workflow suspended, resuming with new value"); // Resume the workflow with new context await myWorkflow.resume({ runId, stepId: 'stepTwo', context: { secondValue: 100 }, }); } } }) // Start the workflow execution await start({ triggerData: { inputValue: 45 } });

Advanced Example with Multiple Suspension Points Using async/await pattern and suspend payloads

This example demonstrates a more complex workflow with multiple suspension points using the async/await pattern. It simulates a content generation workflow that requires human intervention at different stages.

import { Mastra } from '@mastra/core'; import { Step, Workflow } from '@mastra/core/workflows'; import { z } from 'zod'; // Step 1: Get user input const getUserInput = new Step({ id: 'getUserInput', execute: async ({ context }) => { // In a real application, this might come from a form or API return { userInput: context.triggerData.input }; }, outputSchema: z.object({ userInput: z.string() }), }); // Step 2: Generate content with AI (may suspend for human guidance) const promptAgent = new Step({ id: 'promptAgent', inputSchema: z.object({ guidance: z.string(), }), execute: async ({ context, suspend }) => { const userInput = context.getStepResult(getUserInput)?.userInput; console.log(`Generating content based on: ${userInput}`); const guidance = context.inputData?.guidance; // Simulate AI generating content const initialDraft = generateInitialDraft(userInput); // If confidence is high, return the generated content directly if (initialDraft.confidenceScore > 0.7) { return { modelOutput: initialDraft.content }; } console.log('Low confidence in generated content, suspending for human guidance', {guidance}); // If confidence is low, suspend for human guidance if (!guidance) { // only suspend if no guidance is provided await suspend(); return undefined; } // This code runs after resume with human guidance console.log('Resumed with human guidance'); // Use the human guidance to improve the output return { modelOutput: enhanceWithGuidance(initialDraft.content, guidance), }; }, outputSchema: z.object({ modelOutput: z.string() }).optional(), }); // Step 3: Evaluate the content quality const evaluateTone = new Step({ id: 'evaluateToneConsistency', execute: async ({ context }) => { const content = context.getStepResult(promptAgent)?.modelOutput; // Simulate evaluation return { toneScore: { score: calculateToneScore(content) }, completenessScore: { score: calculateCompletenessScore(content) }, }; }, outputSchema: z.object({ toneScore: z.any(), completenessScore: z.any(), }), }); // Step 4: Improve response if needed (may suspend) const improveResponse = new Step({ id: 'improveResponse', inputSchema: z.object({ improvedContent: z.string(), resumeAttempts: z.number(), }), execute: async ({ context, suspend }) => { const content = context.getStepResult(promptAgent)?.modelOutput; const toneScore = context.getStepResult(evaluateTone)?.toneScore.score ?? 0; const completenessScore = context.getStepResult(evaluateTone)?.completenessScore.score ?? 0; const improvedContent = context.inputData.improvedContent; const resumeAttempts = context.inputData.resumeAttempts ?? 0; // If scores are above threshold, make minor improvements if (toneScore > 0.8 && completenessScore > 0.8) { return { improvedOutput: makeMinorImprovements(content) }; } console.log('Content quality below threshold, suspending for human intervention', {improvedContent, resumeAttempts}); if (!improvedContent) { // Suspend with payload containing content and resume attempts await suspend({ content, scores: { tone: toneScore, completeness: completenessScore }, needsImprovement: toneScore < 0.8 ? 'tone' : 'completeness', resumeAttempts: resumeAttempts + 1, }); return { improvedOutput: content ?? '' }; } console.log('Resumed with human improvements', improvedContent); return { improvedOutput: improvedContent ?? content ?? '' }; }, outputSchema: z.object({ improvedOutput: z.string() }).optional(), }); // Step 5: Final evaluation const evaluateImproved = new Step({ id: 'evaluateImprovedResponse', execute: async ({ context }) => { const improvedContent = context.getStepResult(improveResponse)?.improvedOutput; // Simulate final evaluation return { toneScore: { score: calculateToneScore(improvedContent) }, completenessScore: { score: calculateCompletenessScore(improvedContent) }, }; }, outputSchema: z.object({ toneScore: z.any(), completenessScore: z.any(), }), }); // Build the workflow const contentWorkflow = new Workflow({ name: 'content-generation-workflow', triggerSchema: z.object({ input: z.string() }), }); contentWorkflow .step(getUserInput) .then(promptAgent) .then(evaluateTone) .then(improveResponse) .then(evaluateImproved) .commit(); // Register the workflow const mastra = new Mastra({ workflows: { contentWorkflow }, }); // Helper functions (simulated) function generateInitialDraft(input: string = '') { // Simulate AI generating content return { content: `Generated content based on: ${input}`, confidenceScore: 0.6, // Simulate low confidence to trigger suspension }; } function enhanceWithGuidance(content: string = '', guidance: string = '') { return `${content} (Enhanced with guidance: ${guidance})`; } function makeMinorImprovements(content: string = '') { return `${content} (with minor improvements)`; } function calculateToneScore(_: string = '') { return 0.7; // Simulate a score that will trigger suspension } function calculateCompletenessScore(_: string = '') { return 0.9; } // Usage example async function runWorkflow() { const workflow = mastra.getWorkflow('contentWorkflow'); const { runId, start } = workflow.createRun(); let finalResult: any; // Start the workflow const initialResult = await start({ triggerData: { input: 'Create content about sustainable energy' }, }); console.log('Initial workflow state:', initialResult.results); const promptAgentStepResult = initialResult.activePaths.get('promptAgent'); // Check if promptAgent step is suspended if (promptAgentStepResult?.status === 'suspended') { console.log('Workflow suspended at promptAgent step'); console.log('Suspension payload:', promptAgentStepResult?.suspendPayload); // Resume with human guidance const resumeResult1 = await workflow.resume({ runId, stepId: 'promptAgent', context: { guidance: 'Focus more on solar and wind energy technologies', }, }); console.log('Workflow resumed and continued to next steps'); let improveResponseResumeAttempts = 0; let improveResponseStatus = resumeResult1?.activePaths.get('improveResponse')?.status; // Check if improveResponse step is suspended while (improveResponseStatus === 'suspended') { console.log('Workflow suspended at improveResponse step'); console.log('Suspension payload:', resumeResult1?.activePaths.get('improveResponse')?.suspendPayload); const improvedContent = improveResponseResumeAttempts < 3 ? undefined : 'Completely revised content about sustainable energy focusing on solar and wind technologies'; // Resume with human improvements finalResult = await workflow.resume({ runId, stepId: 'improveResponse', context: { improvedContent, resumeAttempts: improveResponseResumeAttempts, }, }); improveResponseResumeAttempts = finalResult?.activePaths.get('improveResponse')?.suspendPayload?.resumeAttempts ?? 0; improveResponseStatus = finalResult?.activePaths.get('improveResponse')?.status; console.log('Improved response result:', finalResult?.results); } } return finalResult; } // Run the workflow const result = await runWorkflow(); console.log('Workflow completed'); console.log('Final workflow result:', result);





View Example on GitHub