Skip to main content

Human in the Loop Workflow (Legacy)

Human-in-the-loop workflows allow you to pause execution at specific points to collect user input, make decisions, or perform actions that require human judgment. This example demonstrates how to create a legacy workflow with human intervention points.

How It Works

  1. A workflow step can suspend execution using the suspend() function, optionally passing a payload with context for the human decision maker.
  2. When the workflow is resumed, the human input is passed in the context parameter of the resume() call.
  3. This input becomes available in the step's execution context as context.inputData, which is typed according to the step's inputSchema.
  4. The step can then continue execution based on the human input.

This pattern allows for safe, type-checked human intervention in automated workflows.

Interactive Terminal Example Using Inquirer

This example demonstrates how to use the Inquirer library to collect user input directly from the terminal when a workflow is suspended, creating a truly interactive human-in-the-loop experience.

import { Mastra } from "@mastra/core";
import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy";
import { z } from "zod";
import { confirm, input, select } from "@inquirer/prompts";

// Step 1: Generate product recommendations
const generateRecommendations = new LegacyStep({
id: "generateRecommendations",
outputSchema: z.object({
customerName: z.string(),
recommendations: z.array(
z.object({
productId: z.string(),
productName: z.string(),
price: z.number(),
description: z.string(),
}),
),
}),
execute: async ({ context }) => {
const customerName = context.triggerData.customerName;

// In a real application, you might call an API or ML model here
// For this example, we'll return mock data
return {
customerName,
recommendations: [
{
productId: "prod-001",
productName: "Premium Widget",
price: 99.99,
description: "Our best-selling premium widget with advanced features",
},
{
productId: "prod-002",
productName: "Basic Widget",
price: 49.99,
description: "Affordable entry-level widget for beginners",
},
{
productId: "prod-003",
productName: "Widget Pro Plus",
price: 149.99,
description: "Professional-grade widget with extended warranty",
},
],
};
},
});
// Step 2: Get human approval and customization for the recommendations
const reviewRecommendations = new LegacyStep({
id: "reviewRecommendations",
inputSchema: z.object({
approvedProducts: z.array(z.string()),
customerNote: z.string().optional(),
offerDiscount: z.boolean().optional(),
}),
outputSchema: z.object({
finalRecommendations: z.array(
z.object({
productId: z.string(),
productName: z.string(),
price: z.number(),
}),
),
customerNote: z.string().optional(),
offerDiscount: z.boolean(),
}),
execute: async ({ context, suspend }) => {
const { customerName, recommendations } = context.getStepResult(
generateRecommendations,
) || {
customerName: "",
recommendations: [],
};

// Check if we have input from a resumed workflow
const reviewInput = {
approvedProducts: context.inputData?.approvedProducts || [],
customerNote: context.inputData?.customerNote,
offerDiscount: context.inputData?.offerDiscount,
};

// If we don't have agent input yet, suspend for human review
if (!reviewInput.approvedProducts.length) {
console.log(`Generating recommendations for customer: ${customerName}`);
await suspend({
customerName,
recommendations,
message:
"Please review these product recommendations before sending to the customer",
});

// Placeholder return (won't be reached due to suspend)
return {
finalRecommendations: [],
customerNote: "",
offerDiscount: false,
};
}

// Process the agent's product selections
const finalRecommendations = recommendations
.filter((product) =>
reviewInput.approvedProducts.includes(product.productId),
)
.map((product) => ({
productId: product.productId,
productName: product.productName,
price: product.price,
}));

return {
finalRecommendations,
customerNote: reviewInput.customerNote || "",
offerDiscount: reviewInput.offerDiscount || false,
};
},
});
// Step 3: Send the recommendations to the customer
const sendRecommendations = new LegacyStep({
id: "sendRecommendations",
outputSchema: z.object({
emailSent: z.boolean(),
emailContent: z.string(),
}),
execute: async ({ context }) => {
const { customerName } = context.getStepResult(generateRecommendations) || {
customerName: "",
};
const { finalRecommendations, customerNote, offerDiscount } =
context.getStepResult(reviewRecommendations) || {
finalRecommendations: [],
customerNote: "",
offerDiscount: false,
};

// Generate email content based on the recommendations
let emailContent = `Dear ${customerName},\n\nBased on your preferences, we recommend:\n\n`;

finalRecommendations.forEach((product) => {
emailContent += `- ${product.productName}: $${product.price.toFixed(2)}\n`;
});

if (offerDiscount) {
emailContent +=
"\nAs a valued customer, use code SAVE10 for 10% off your next purchase!\n";
}

if (customerNote) {
emailContent += `\nPersonal note: ${customerNote}\n`;
}

emailContent += "\nThank you for your business,\nThe Sales Team";

// In a real application, you would send this email
console.log("Email content generated:", emailContent);

return {
emailSent: true,
emailContent,
};
},
});

// Build the workflow
const recommendationWorkflow = new LegacyWorkflow({
name: "product-recommendation-workflow",
triggerSchema: z.object({
customerName: z.string(),
}),
});

recommendationWorkflow
.step(generateRecommendations)
.then(reviewRecommendations)
.then(sendRecommendations)
.commit();

// Register the workflow
const mastra = new Mastra({
legacy_workflows: { recommendationWorkflow },
});
// Example of using the workflow with Inquirer prompts
async function runRecommendationWorkflow() {
const registeredWorkflow = mastra.legacy_getWorkflow(
"recommendationWorkflow",
);
const run = registeredWorkflow.createRun();

console.log("Starting product recommendation workflow...");
const result = await run.start({
triggerData: {
customerName: "Jane Smith",
},
});

const isReviewStepSuspended =
result.activePaths.get("reviewRecommendations")?.status === "suspended";

// Check if workflow is suspended for human review
if (isReviewStepSuspended) {
const { customerName, recommendations, message } = result.activePaths.get(
"reviewRecommendations",
)?.suspendPayload;

console.log("\n===================================");
console.log(message);
console.log(`Customer: ${customerName}`);
console.log("===================================\n");

// Use Inquirer to collect input from the sales agent in the terminal
console.log("Available product recommendations:");
recommendations.forEach((product, index) => {
console.log(
`${index + 1}. ${product.productName} - $${product.price.toFixed(2)}`,
);
console.log(` ${product.description}\n`);
});

// Let the agent select which products to recommend
const approvedProducts = await checkbox({
message: "Select products to recommend to the customer:",
choices: recommendations.map((product) => ({
name: `${product.productName} ($${product.price.toFixed(2)})`,
value: product.productId,
})),
});

// Let the agent add a personal note
const includeNote = await confirm({
message: "Would you like to add a personal note?",
default: false,
});

let customerNote = "";
if (includeNote) {
customerNote = await input({
message: "Enter your personalized note for the customer:",
});
}

// Ask if a discount should be offered
const offerDiscount = await confirm({
message: "Offer a 10% discount to this customer?",
default: false,
});

console.log("\nSubmitting your review...");

// Resume the workflow with the agent's input
const resumeResult = await run.resume({
stepId: "reviewRecommendations",
context: {
approvedProducts,
customerNote,
offerDiscount,
},
});

console.log("\n===================================");
console.log("Workflow completed!");
console.log("Email content:");
console.log("===================================\n");
console.log(
resumeResult?.results?.sendRecommendations ||
"No email content generated",
);

return resumeResult;
}

return result;
}

// Invoke the workflow with interactive terminal input
runRecommendationWorkflow().catch(console.error);

Advanced Example with Multiple User Inputs

This example demonstrates a more complex workflow that requires multiple human intervention points, such as in a content moderation system.

import { Mastra } from "@mastra/core";
import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy";
import { z } from "zod";
import { select, input } from "@inquirer/prompts";

// Step 1: Receive and analyze content
const analyzeContent = new LegacyStep({
id: "analyzeContent",
outputSchema: z.object({
content: z.string(),
aiAnalysisScore: z.number(),
flaggedCategories: z.array(z.string()).optional(),
}),
execute: async ({ context }) => {
const content = context.triggerData.content;

// Simulate AI analysis
const aiAnalysisScore = simulateContentAnalysis(content);
const flaggedCategories =
aiAnalysisScore < 0.7
? ["potentially inappropriate", "needs review"]
: [];

return {
content,
aiAnalysisScore,
flaggedCategories,
};
},
});
// Step 2: Moderate content that needs review
const moderateContent = new LegacyStep({
id: "moderateContent",
// Define the schema for human input that will be provided when resuming
inputSchema: z.object({
moderatorDecision: z.enum(["approve", "reject", "modify"]).optional(),
moderatorNotes: z.string().optional(),
modifiedContent: z.string().optional(),
}),
outputSchema: z.object({
moderationResult: z.enum(["approved", "rejected", "modified"]),
moderatedContent: z.string(),
notes: z.string().optional(),
}),
// @ts-ignore
execute: async ({ context, suspend }) => {
const analysisResult = context.getStepResult(analyzeContent);
// Access the input provided when resuming the workflow
const moderatorInput = {
decision: context.inputData?.moderatorDecision,
notes: context.inputData?.moderatorNotes,
modifiedContent: context.inputData?.modifiedContent,
};

// If the AI analysis score is high enough, auto-approve
if (
analysisResult?.aiAnalysisScore > 0.9 &&
!analysisResult?.flaggedCategories?.length
) {
return {
moderationResult: "approved",
moderatedContent: analysisResult.content,
notes: "Auto-approved by system",
};
}

// If we don't have moderator input yet, suspend for human review
if (!moderatorInput.decision) {
await suspend({
content: analysisResult?.content,
aiScore: analysisResult?.aiAnalysisScore,
flaggedCategories: analysisResult?.flaggedCategories,
message: "Please review this content and make a moderation decision",
});

// Placeholder return
return {
moderationResult: "approved",
moderatedContent: "",
};
}

// Process the moderator's decision
switch (moderatorInput.decision) {
case "approve":
return {
moderationResult: "approved",
moderatedContent: analysisResult?.content || "",
notes: moderatorInput.notes || "Approved by moderator",
};

case "reject":
return {
moderationResult: "rejected",
moderatedContent: "",
notes: moderatorInput.notes || "Rejected by moderator",
};

case "modify":
return {
moderationResult: "modified",
moderatedContent:
moderatorInput.modifiedContent || analysisResult?.content || "",
notes: moderatorInput.notes || "Modified by moderator",
};

default:
return {
moderationResult: "rejected",
moderatedContent: "",
notes: "Invalid moderator decision",
};
}
},
});
// Step 3: Apply moderation actions
const applyModeration = new LegacyStep({
id: "applyModeration",
outputSchema: z.object({
finalStatus: z.string(),
content: z.string().optional(),
auditLog: z.object({
originalContent: z.string(),
moderationResult: z.string(),
aiScore: z.number(),
timestamp: z.string(),
}),
}),
execute: async ({ context }) => {
const analysisResult = context.getStepResult(analyzeContent);
const moderationResult = context.getStepResult(moderateContent);

// Create audit log
const auditLog = {
originalContent: analysisResult?.content || "",
moderationResult: moderationResult?.moderationResult || "unknown",
aiScore: analysisResult?.aiAnalysisScore || 0,
timestamp: new Date().toISOString(),
};

// Apply moderation action
switch (moderationResult?.moderationResult) {
case "approved":
return {
finalStatus: "Content published",
content: moderationResult.moderatedContent,
auditLog,
};

case "modified":
return {
finalStatus: "Content modified and published",
content: moderationResult.moderatedContent,
auditLog,
};

case "rejected":
return {
finalStatus: "Content rejected",
auditLog,
};

default:
return {
finalStatus: "Error in moderation process",
auditLog,
};
}
},
});
// Build the workflow
const contentModerationWorkflow = new LegacyWorkflow({
name: "content-moderation-workflow",
triggerSchema: z.object({
content: z.string(),
}),
});

contentModerationWorkflow
.step(analyzeContent)
.then(moderateContent)
.then(applyModeration)
.commit();

// Register the workflow
const mastra = new Mastra({
legacy_workflows: { contentModerationWorkflow },
});

// Example of using the workflow with Inquirer prompts
async function runModerationDemo() {
const registeredWorkflow = mastra.legacy_getWorkflow(
"contentModerationWorkflow",
);
const run = registeredWorkflow.createRun();

// Start the workflow with content that needs review
console.log("Starting content moderation workflow...");
const result = await run.start({
triggerData: {
content: "This is some user-generated content that requires moderation.",
},
});

const isReviewStepSuspended =
result.activePaths.get("moderateContent")?.status === "suspended";

// Check if workflow is suspended
if (isReviewStepSuspended) {
const { content, aiScore, flaggedCategories, message } =
result.activePaths.get("moderateContent")?.suspendPayload;

console.log("\n===================================");
console.log(message);
console.log("===================================\n");

console.log("Content to review:");
console.log(content);
console.log(`\nAI Analysis Score: ${aiScore}`);
console.log(
`Flagged Categories: ${flaggedCategories?.join(", ") || "None"}\n`,
);

// Collect moderator decision using Inquirer
const moderatorDecision = await select({
message: "Select your moderation decision:",
choices: [
{ name: "Approve content as is", value: "approve" },
{ name: "Reject content completely", value: "reject" },
{ name: "Modify content before publishing", value: "modify" },
],
});

// Collect additional information based on decision
let moderatorNotes = "";
let modifiedContent = "";

moderatorNotes = await input({
message: "Enter any notes about your decision:",
});

if (moderatorDecision === "modify") {
modifiedContent = await input({
message: "Enter the modified content:",
default: content,
});
}

console.log("\nSubmitting your moderation decision...");

// Resume the workflow with the moderator's input
const resumeResult = await run.resume({
stepId: "moderateContent",
context: {
moderatorDecision,
moderatorNotes,
modifiedContent,
},
});

if (resumeResult?.results?.applyModeration?.status === "success") {
console.log("\n===================================");
console.log(
`Moderation complete: ${resumeResult?.results?.applyModeration?.output.finalStatus}`,
);
console.log("===================================\n");

if (resumeResult?.results?.applyModeration?.output.content) {
console.log("Published content:");
console.log(resumeResult.results.applyModeration.output.content);
}
}

return resumeResult;
}

console.log(
"Workflow completed without requiring human intervention:",
result.results,
);
return result;
}

// Helper function for AI content analysis simulation
function simulateContentAnalysis(content: string): number {
// In a real application, this would call an AI service
// For the example, we're returning a random score
return Math.random();
}

// Invoke the demo function
runModerationDemo().catch(console.error);

Key Concepts

  1. Suspension Points - Use the suspend() function within a step's execute to pause workflow execution.

  2. Suspension Payload - Pass relevant data when suspending to provide context for human decision-making:

await suspend({
messageForHuman: "Please review this data",
data: someImportantData,
});
  1. Checking Workflow Status - After starting a workflow, check the returned status to see if it's suspended:
const result = await workflow.start({ triggerData });
if (result.status === "suspended" && result.suspendedStepId === "stepId") {
// Process suspension
console.log("Workflow is waiting for input:", result.suspendPayload);
}
  1. Interactive Terminal Input - Use libraries like Inquirer to create interactive prompts:
import { select, input, confirm } from "@inquirer/prompts";

// When the workflow is suspended
if (result.status === "suspended") {
// Display information from the suspend payload
console.log(result.suspendPayload.message);

// Collect user input interactively
const decision = await select({
message: "What would you like to do?",
choices: [
{ name: "Approve", value: "approve" },
{ name: "Reject", value: "reject" },
],
});

// Resume the workflow with the collected input
await run.resume({
stepId: result.suspendedStepId,
context: { decision },
});
}
  1. Resuming Workflow - Use the resume() method to continue workflow execution with human input:
const resumeResult = await run.resume({
stepId: "suspendedStepId",
context: {
// This data is passed to the suspended step as context.inputData
// and must conform to the step's inputSchema
userDecision: "approve",
},
});
  1. Input Schema for Human Data - Define an input schema on steps that might be resumed with human input to ensure type safety:
const myStep = new LegacyStep({
id: "myStep",
inputSchema: z.object({
// This schema validates the data passed in resume's context
// and makes it available as context.inputData
userDecision: z.enum(["approve", "reject"]),
userComments: z.string().optional(),
}),
execute: async ({ context, suspend }) => {
// Check if we have user input from a previous suspension
if (context.inputData?.userDecision) {
// Process the user's decision
return { result: `User decided: ${context.inputData.userDecision}` };
}

// If no input, suspend for human decision
await suspend();
},
});

Human-in-the-loop workflows are powerful for building systems that blend automation with human judgment, such as:

  • Content moderation systems
  • Approval workflows
  • Supervised AI systems
  • Customer service automation with escalation





View source on GitHub

Workflows (Legacy)

The following links provide example documentation for legacy workflows: