Skip to Content
ExamplesWorkflows (Legacy)Example: Suspend and Resume | Workflows (Legacy) | Mastra Docs

Workflow (Legacy) with Suspend and Resume

Workflow steps can be suspended and resumed at any point in the workflow execution. This example demonstrates how to suspend a workflow step and resume it later.

Basic Example

import { Mastra } from "@mastra/core"; import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy"; import { z } from "zod"; const stepOne = new LegacyStep({ id: "stepOne", outputSchema: z.object({ doubledValue: z.number(), }), execute: async ({ context }) => { const doubledValue = context.triggerData.inputValue * 2; return { doubledValue }; }, });
const stepTwo = new LegacyStep({ id: "stepTwo", outputSchema: z.object({ incrementedValue: z.number(), }), execute: async ({ context, suspend }) => { const secondValue = context.inputData?.secondValue ?? 0; const doubledValue = context.getStepResult(stepOne)?.doubledValue ?? 0; const incrementedValue = doubledValue + secondValue; if (incrementedValue < 100) { await suspend(); return { incrementedValue: 0 }; } return { incrementedValue }; }, }); // Build the workflow const myWorkflow = new LegacyWorkflow({ name: "my-workflow", triggerSchema: z.object({ inputValue: z.number(), }), }); // run workflows in parallel myWorkflow.step(stepOne).then(stepTwo).commit();
// Register the workflow export const mastra = new Mastra({ legacy_workflows: { registeredWorkflow: myWorkflow }, }); // Get registered workflow from Mastra const registeredWorkflow = mastra.legacy_getWorkflow("registeredWorkflow"); const { runId, start } = registeredWorkflow.createRun(); // Start watching the workflow before executing it myWorkflow.watch(async ({ context, activePaths }) => { for (const _path of activePaths) { const stepTwoStatus = context.steps?.stepTwo?.status; if (stepTwoStatus === "suspended") { console.log("Workflow suspended, resuming with new value"); // Resume the workflow with new context await myWorkflow.resume({ runId, stepId: "stepTwo", context: { secondValue: 100 }, }); } } }); // Start the workflow execution await start({ triggerData: { inputValue: 45 } });

Advanced Example with Multiple Suspension Points Using async/await pattern and suspend payloads

This example demonstrates a more complex workflow with multiple suspension points using the async/await pattern. It simulates a content generation workflow that requires human intervention at different stages.

import { Mastra } from "@mastra/core"; import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy"; import { z } from "zod"; // Step 1: Get user input const getUserInput = new LegacyStep({ id: "getUserInput", execute: async ({ context }) => { // In a real application, this might come from a form or API return { userInput: context.triggerData.input }; }, outputSchema: z.object({ userInput: z.string() }), });
// Step 2: Generate content with AI (may suspend for human guidance) const promptAgent = new LegacyStep({ id: "promptAgent", inputSchema: z.object({ guidance: z.string(), }), execute: async ({ context, suspend }) => { const userInput = context.getStepResult(getUserInput)?.userInput; console.log(`Generating content based on: ${userInput}`); const guidance = context.inputData?.guidance; // Simulate AI generating content const initialDraft = generateInitialDraft(userInput); // If confidence is high, return the generated content directly if (initialDraft.confidenceScore > 0.7) { return { modelOutput: initialDraft.content }; } console.log( "Low confidence in generated content, suspending for human guidance", { guidance }, ); // If confidence is low, suspend for human guidance if (!guidance) { // only suspend if no guidance is provided await suspend(); return undefined; } // This code runs after resume with human guidance console.log("Resumed with human guidance"); // Use the human guidance to improve the output return { modelOutput: enhanceWithGuidance(initialDraft.content, guidance), }; }, outputSchema: z.object({ modelOutput: z.string() }).optional(), });
// Step 3: Evaluate the content quality const evaluateTone = new LegacyStep({ id: "evaluateToneConsistency", execute: async ({ context }) => { const content = context.getStepResult(promptAgent)?.modelOutput; // Simulate evaluation return { toneScore: { score: calculateToneScore(content) }, completenessScore: { score: calculateCompletenessScore(content) }, }; }, outputSchema: z.object({ toneScore: z.any(), completenessScore: z.any(), }), });
// Step 4: Improve response if needed (may suspend) const improveResponse = new LegacyStep({ id: "improveResponse", inputSchema: z.object({ improvedContent: z.string(), resumeAttempts: z.number(), }), execute: async ({ context, suspend }) => { const content = context.getStepResult(promptAgent)?.modelOutput; const toneScore = context.getStepResult(evaluateTone)?.toneScore.score ?? 0; const completenessScore = context.getStepResult(evaluateTone)?.completenessScore.score ?? 0; const improvedContent = context.inputData.improvedContent; const resumeAttempts = context.inputData.resumeAttempts ?? 0; // If scores are above threshold, make minor improvements if (toneScore > 0.8 && completenessScore > 0.8) { return { improvedOutput: makeMinorImprovements(content) }; } console.log( "Content quality below threshold, suspending for human intervention", { improvedContent, resumeAttempts }, ); if (!improvedContent) { // Suspend with payload containing content and resume attempts await suspend({ content, scores: { tone: toneScore, completeness: completenessScore }, needsImprovement: toneScore < 0.8 ? "tone" : "completeness", resumeAttempts: resumeAttempts + 1, }); return { improvedOutput: content ?? "" }; } console.log("Resumed with human improvements", improvedContent); return { improvedOutput: improvedContent ?? content ?? "" }; }, outputSchema: z.object({ improvedOutput: z.string() }).optional(), });
// Step 5: Final evaluation const evaluateImproved = new LegacyStep({ id: "evaluateImprovedResponse", execute: async ({ context }) => { const improvedContent = context.getStepResult(improveResponse)?.improvedOutput; // Simulate final evaluation return { toneScore: { score: calculateToneScore(improvedContent) }, completenessScore: { score: calculateCompletenessScore(improvedContent) }, }; }, outputSchema: z.object({ toneScore: z.any(), completenessScore: z.any(), }), }); // Build the workflow const contentWorkflow = new LegacyWorkflow({ name: "content-generation-workflow", triggerSchema: z.object({ input: z.string() }), }); contentWorkflow .step(getUserInput) .then(promptAgent) .then(evaluateTone) .then(improveResponse) .then(evaluateImproved) .commit();
// Register the workflow const mastra = new Mastra({ legacy_workflows: { contentWorkflow }, }); // Helper functions (simulated) function generateInitialDraft(input: string = "") { // Simulate AI generating content return { content: `Generated content based on: ${input}`, confidenceScore: 0.6, // Simulate low confidence to trigger suspension }; } function enhanceWithGuidance(content: string = "", guidance: string = "") { return `${content} (Enhanced with guidance: ${guidance})`; } function makeMinorImprovements(content: string = "") { return `${content} (with minor improvements)`; } function calculateToneScore(_: string = "") { return 0.7; // Simulate a score that will trigger suspension } function calculateCompletenessScore(_: string = "") { return 0.9; } // Usage example async function runWorkflow() { const workflow = mastra.legacy_getWorkflow("contentWorkflow"); const { runId, start } = workflow.createRun(); let finalResult: any; // Start the workflow const initialResult = await start({ triggerData: { input: "Create content about sustainable energy" }, }); console.log("Initial workflow state:", initialResult.results); const promptAgentStepResult = initialResult.activePaths.get("promptAgent"); // Check if promptAgent step is suspended if (promptAgentStepResult?.status === "suspended") { console.log("Workflow suspended at promptAgent step"); console.log("Suspension payload:", promptAgentStepResult?.suspendPayload); // Resume with human guidance const resumeResult1 = await workflow.resume({ runId, stepId: "promptAgent", context: { guidance: "Focus more on solar and wind energy technologies", }, }); console.log("Workflow resumed and continued to next steps"); let improveResponseResumeAttempts = 0; let improveResponseStatus = resumeResult1?.activePaths.get("improveResponse")?.status; // Check if improveResponse step is suspended while (improveResponseStatus === "suspended") { console.log("Workflow suspended at improveResponse step"); console.log( "Suspension payload:", resumeResult1?.activePaths.get("improveResponse")?.suspendPayload, ); const improvedContent = improveResponseResumeAttempts < 3 ? undefined : "Completely revised content about sustainable energy focusing on solar and wind technologies"; // Resume with human improvements finalResult = await workflow.resume({ runId, stepId: "improveResponse", context: { improvedContent, resumeAttempts: improveResponseResumeAttempts, }, }); improveResponseResumeAttempts = finalResult?.activePaths.get("improveResponse")?.suspendPayload ?.resumeAttempts ?? 0; improveResponseStatus = finalResult?.activePaths.get("improveResponse")?.status; console.log("Improved response result:", finalResult?.results); } } return finalResult; } // Run the workflow const result = await runWorkflow(); console.log("Workflow completed"); console.log("Final workflow result:", result);