Workflow (Legacy) with Suspend and Resume
Workflow steps can be suspended and resumed at any point in the workflow execution. This example demonstrates how to suspend a workflow step and resume it later.
Basic Example
import { Mastra } from "@mastra/core";
import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy";
import { z } from "zod";
const stepOne = new LegacyStep({
id: "stepOne",
outputSchema: z.object({
doubledValue: z.number(),
}),
execute: async ({ context }) => {
const doubledValue = context.triggerData.inputValue * 2;
return { doubledValue };
},
});
const stepTwo = new LegacyStep({
id: "stepTwo",
outputSchema: z.object({
incrementedValue: z.number(),
}),
execute: async ({ context, suspend }) => {
const secondValue = context.inputData?.secondValue ?? 0;
const doubledValue = context.getStepResult(stepOne)?.doubledValue ?? 0;
const incrementedValue = doubledValue + secondValue;
if (incrementedValue < 100) {
await suspend();
return { incrementedValue: 0 };
}
return { incrementedValue };
},
});
// Build the workflow
const myWorkflow = new LegacyWorkflow({
name: "my-workflow",
triggerSchema: z.object({
inputValue: z.number(),
}),
});
// run workflows in parallel
myWorkflow.step(stepOne).then(stepTwo).commit();
// Register the workflow
export const mastra = new Mastra({
legacy_workflows: { registeredWorkflow: myWorkflow },
});
// Get registered workflow from Mastra
const registeredWorkflow = mastra.legacy_getWorkflow("registeredWorkflow");
const { runId, start } = registeredWorkflow.createRun();
// Start watching the workflow before executing it
myWorkflow.watch(async ({ context, activePaths }) => {
for (const _path of activePaths) {
const stepTwoStatus = context.steps?.stepTwo?.status;
if (stepTwoStatus === "suspended") {
console.log("Workflow suspended, resuming with new value");
// Resume the workflow with new context
await myWorkflow.resume({
runId,
stepId: "stepTwo",
context: { secondValue: 100 },
});
}
}
});
// Start the workflow execution
await start({ triggerData: { inputValue: 45 } });
Advanced Example with Multiple Suspension Points Using async/await pattern and suspend payloads
This example demonstrates a more complex workflow with multiple suspension points using the async/await pattern. It simulates a content generation workflow that requires human intervention at different stages.
import { Mastra } from "@mastra/core";
import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy";
import { z } from "zod";
// Step 1: Get user input
const getUserInput = new LegacyStep({
id: "getUserInput",
execute: async ({ context }) => {
// In a real application, this might come from a form or API
return { userInput: context.triggerData.input };
},
outputSchema: z.object({ userInput: z.string() }),
});
// Step 2: Generate content with AI (may suspend for human guidance)
const promptAgent = new LegacyStep({
id: "promptAgent",
inputSchema: z.object({
guidance: z.string(),
}),
execute: async ({ context, suspend }) => {
const userInput = context.getStepResult(getUserInput)?.userInput;
console.log(`Generating content based on: ${userInput}`);
const guidance = context.inputData?.guidance;
// Simulate AI generating content
const initialDraft = generateInitialDraft(userInput);
// If confidence is high, return the generated content directly
if (initialDraft.confidenceScore > 0.7) {
return { modelOutput: initialDraft.content };
}
console.log(
"Low confidence in generated content, suspending for human guidance",
{ guidance },
);
// If confidence is low, suspend for human guidance
if (!guidance) {
// only suspend if no guidance is provided
await suspend();
return undefined;
}
// This code runs after resume with human guidance
console.log("Resumed with human guidance");
// Use the human guidance to improve the output
return {
modelOutput: enhanceWithGuidance(initialDraft.content, guidance),
};
},
outputSchema: z.object({ modelOutput: z.string() }).optional(),
});
// Step 3: Evaluate the content quality
const evaluateTone = new LegacyStep({
id: "evaluateToneConsistency",
execute: async ({ context }) => {
const content = context.getStepResult(promptAgent)?.modelOutput;
// Simulate evaluation
return {
toneScore: { score: calculateToneScore(content) },
completenessScore: { score: calculateCompletenessScore(content) },
};
},
outputSchema: z.object({
toneScore: z.any(),
completenessScore: z.any(),
}),
});
// Step 4: Improve response if needed (may suspend)
const improveResponse = new LegacyStep({
id: "improveResponse",
inputSchema: z.object({
improvedContent: z.string(),
resumeAttempts: z.number(),
}),
execute: async ({ context, suspend }) => {
const content = context.getStepResult(promptAgent)?.modelOutput;
const toneScore = context.getStepResult(evaluateTone)?.toneScore.score ?? 0;
const completenessScore =
context.getStepResult(evaluateTone)?.completenessScore.score ?? 0;
const improvedContent = context.inputData.improvedContent;
const resumeAttempts = context.inputData.resumeAttempts ?? 0;
// If scores are above threshold, make minor improvements
if (toneScore > 0.8 && completenessScore > 0.8) {
return { improvedOutput: makeMinorImprovements(content) };
}
console.log(
"Content quality below threshold, suspending for human intervention",
{ improvedContent, resumeAttempts },
);
if (!improvedContent) {
// Suspend with payload containing content and resume attempts
await suspend({
content,
scores: { tone: toneScore, completeness: completenessScore },
needsImprovement: toneScore < 0.8 ? "tone" : "completeness",
resumeAttempts: resumeAttempts + 1,
});
return { improvedOutput: content ?? "" };
}
console.log("Resumed with human improvements", improvedContent);
return { improvedOutput: improvedContent ?? content ?? "" };
},
outputSchema: z.object({ improvedOutput: z.string() }).optional(),
});
// Step 5: Final evaluation
const evaluateImproved = new LegacyStep({
id: "evaluateImprovedResponse",
execute: async ({ context }) => {
const improvedContent =
context.getStepResult(improveResponse)?.improvedOutput;
// Simulate final evaluation
return {
toneScore: { score: calculateToneScore(improvedContent) },
completenessScore: { score: calculateCompletenessScore(improvedContent) },
};
},
outputSchema: z.object({
toneScore: z.any(),
completenessScore: z.any(),
}),
});
// Build the workflow
const contentWorkflow = new LegacyWorkflow({
name: "content-generation-workflow",
triggerSchema: z.object({ input: z.string() }),
});
contentWorkflow
.step(getUserInput)
.then(promptAgent)
.then(evaluateTone)
.then(improveResponse)
.then(evaluateImproved)
.commit();
// Register the workflow
const mastra = new Mastra({
legacy_workflows: { contentWorkflow },
});
// Helper functions (simulated)
function generateInitialDraft(input: string = "") {
// Simulate AI generating content
return {
content: `Generated content based on: ${input}`,
confidenceScore: 0.6, // Simulate low confidence to trigger suspension
};
}
function enhanceWithGuidance(content: string = "", guidance: string = "") {
return `${content} (Enhanced with guidance: ${guidance})`;
}
function makeMinorImprovements(content: string = "") {
return `${content} (with minor improvements)`;
}
function calculateToneScore(_: string = "") {
return 0.7; // Simulate a score that will trigger suspension
}
function calculateCompletenessScore(_: string = "") {
return 0.9;
}
// Usage example
async function runWorkflow() {
const workflow = mastra.legacy_getWorkflow("contentWorkflow");
const { runId, start } = workflow.createRun();
let finalResult: any;
// Start the workflow
const initialResult = await start({
triggerData: { input: "Create content about sustainable energy" },
});
console.log("Initial workflow state:", initialResult.results);
const promptAgentStepResult = initialResult.activePaths.get("promptAgent");
// Check if promptAgent step is suspended
if (promptAgentStepResult?.status === "suspended") {
console.log("Workflow suspended at promptAgent step");
console.log("Suspension payload:", promptAgentStepResult?.suspendPayload);
// Resume with human guidance
const resumeResult1 = await workflow.resume({
runId,
stepId: "promptAgent",
context: {
guidance: "Focus more on solar and wind energy technologies",
},
});
console.log("Workflow resumed and continued to next steps");
let improveResponseResumeAttempts = 0;
let improveResponseStatus =
resumeResult1?.activePaths.get("improveResponse")?.status;
// Check if improveResponse step is suspended
while (improveResponseStatus === "suspended") {
console.log("Workflow suspended at improveResponse step");
console.log(
"Suspension payload:",
resumeResult1?.activePaths.get("improveResponse")?.suspendPayload,
);
const improvedContent =
improveResponseResumeAttempts < 3
? undefined
: "Completely revised content about sustainable energy focusing on solar and wind technologies";
// Resume with human improvements
finalResult = await workflow.resume({
runId,
stepId: "improveResponse",
context: {
improvedContent,
resumeAttempts: improveResponseResumeAttempts,
},
});
improveResponseResumeAttempts =
finalResult?.activePaths.get("improveResponse")?.suspendPayload
?.resumeAttempts ?? 0;
improveResponseStatus =
finalResult?.activePaths.get("improveResponse")?.status;
console.log("Improved response result:", finalResult?.results);
}
}
return finalResult;
}
// Run the workflow
const result = await runWorkflow();
console.log("Workflow completed");
console.log("Final workflow result:", result);