Human in the Loop Workflow (Legacy)
Human-in-the-loop workflows allow you to pause execution at specific points to collect user input, make decisions, or perform actions that require human judgment. This example demonstrates how to create a legacy workflow with human intervention points.
How It Works
- A workflow step can suspend execution using the
suspend()
function, optionally passing a payload with context for the human decision maker. - When the workflow is resumed, the human input is passed in the
context
parameter of theresume()
call. - This input becomes available in the step’s execution context as
context.inputData
, which is typed according to the step’sinputSchema
. - The step can then continue execution based on the human input.
This pattern allows for safe, type-checked human intervention in automated workflows.
Interactive Terminal Example Using Inquirer
This example demonstrates how to use the Inquirer  library to collect user input directly from the terminal when a workflow is suspended, creating a truly interactive human-in-the-loop experience.
import { Mastra } from "@mastra/core";
import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy";
import { z } from "zod";
import { confirm, input, select } from "@inquirer/prompts";
// Step 1: Generate product recommendations
const generateRecommendations = new LegacyStep({
id: "generateRecommendations",
outputSchema: z.object({
customerName: z.string(),
recommendations: z.array(
z.object({
productId: z.string(),
productName: z.string(),
price: z.number(),
description: z.string(),
}),
),
}),
execute: async ({ context }) => {
const customerName = context.triggerData.customerName;
// In a real application, you might call an API or ML model here
// For this example, we'll return mock data
return {
customerName,
recommendations: [
{
productId: "prod-001",
productName: "Premium Widget",
price: 99.99,
description: "Our best-selling premium widget with advanced features",
},
{
productId: "prod-002",
productName: "Basic Widget",
price: 49.99,
description: "Affordable entry-level widget for beginners",
},
{
productId: "prod-003",
productName: "Widget Pro Plus",
price: 149.99,
description: "Professional-grade widget with extended warranty",
},
],
};
},
});
// Step 2: Get human approval and customization for the recommendations
const reviewRecommendations = new LegacyStep({
id: "reviewRecommendations",
inputSchema: z.object({
approvedProducts: z.array(z.string()),
customerNote: z.string().optional(),
offerDiscount: z.boolean().optional(),
}),
outputSchema: z.object({
finalRecommendations: z.array(
z.object({
productId: z.string(),
productName: z.string(),
price: z.number(),
}),
),
customerNote: z.string().optional(),
offerDiscount: z.boolean(),
}),
execute: async ({ context, suspend }) => {
const { customerName, recommendations } = context.getStepResult(
generateRecommendations,
) || {
customerName: "",
recommendations: [],
};
// Check if we have input from a resumed workflow
const reviewInput = {
approvedProducts: context.inputData?.approvedProducts || [],
customerNote: context.inputData?.customerNote,
offerDiscount: context.inputData?.offerDiscount,
};
// If we don't have agent input yet, suspend for human review
if (!reviewInput.approvedProducts.length) {
console.log(`Generating recommendations for customer: ${customerName}`);
await suspend({
customerName,
recommendations,
message:
"Please review these product recommendations before sending to the customer",
});
// Placeholder return (won't be reached due to suspend)
return {
finalRecommendations: [],
customerNote: "",
offerDiscount: false,
};
}
// Process the agent's product selections
const finalRecommendations = recommendations
.filter((product) =>
reviewInput.approvedProducts.includes(product.productId),
)
.map((product) => ({
productId: product.productId,
productName: product.productName,
price: product.price,
}));
return {
finalRecommendations,
customerNote: reviewInput.customerNote || "",
offerDiscount: reviewInput.offerDiscount || false,
};
},
});
// Step 3: Send the recommendations to the customer
const sendRecommendations = new LegacyStep({
id: "sendRecommendations",
outputSchema: z.object({
emailSent: z.boolean(),
emailContent: z.string(),
}),
execute: async ({ context }) => {
const { customerName } = context.getStepResult(generateRecommendations) || {
customerName: "",
};
const { finalRecommendations, customerNote, offerDiscount } =
context.getStepResult(reviewRecommendations) || {
finalRecommendations: [],
customerNote: "",
offerDiscount: false,
};
// Generate email content based on the recommendations
let emailContent = `Dear ${customerName},\n\nBased on your preferences, we recommend:\n\n`;
finalRecommendations.forEach((product) => {
emailContent += `- ${product.productName}: $${product.price.toFixed(2)}\n`;
});
if (offerDiscount) {
emailContent +=
"\nAs a valued customer, use code SAVE10 for 10% off your next purchase!\n";
}
if (customerNote) {
emailContent += `\nPersonal note: ${customerNote}\n`;
}
emailContent += "\nThank you for your business,\nThe Sales Team";
// In a real application, you would send this email
console.log("Email content generated:", emailContent);
return {
emailSent: true,
emailContent,
};
},
});
// Build the workflow
const recommendationWorkflow = new LegacyWorkflow({
name: "product-recommendation-workflow",
triggerSchema: z.object({
customerName: z.string(),
}),
});
recommendationWorkflow
.step(generateRecommendations)
.then(reviewRecommendations)
.then(sendRecommendations)
.commit();
// Register the workflow
const mastra = new Mastra({
legacy_workflows: { recommendationWorkflow },
});
// Example of using the workflow with Inquirer prompts
async function runRecommendationWorkflow() {
const registeredWorkflow = mastra.legacy_getWorkflow("recommendationWorkflow");
const run = registeredWorkflow.createRun();
console.log("Starting product recommendation workflow...");
const result = await run.start({
triggerData: {
customerName: "Jane Smith",
},
});
const isReviewStepSuspended =
result.activePaths.get("reviewRecommendations")?.status === "suspended";
// Check if workflow is suspended for human review
if (isReviewStepSuspended) {
const { customerName, recommendations, message } = result.activePaths.get(
"reviewRecommendations",
)?.suspendPayload;
console.log("\n===================================");
console.log(message);
console.log(`Customer: ${customerName}`);
console.log("===================================\n");
// Use Inquirer to collect input from the sales agent in the terminal
console.log("Available product recommendations:");
recommendations.forEach((product, index) => {
console.log(
`${index + 1}. ${product.productName} - $${product.price.toFixed(2)}`,
);
console.log(` ${product.description}\n`);
});
// Let the agent select which products to recommend
const approvedProducts = await checkbox({
message: "Select products to recommend to the customer:",
choices: recommendations.map((product) => ({
name: `${product.productName} ($${product.price.toFixed(2)})`,
value: product.productId,
})),
});
// Let the agent add a personal note
const includeNote = await confirm({
message: "Would you like to add a personal note?",
default: false,
});
let customerNote = "";
if (includeNote) {
customerNote = await input({
message: "Enter your personalized note for the customer:",
});
}
// Ask if a discount should be offered
const offerDiscount = await confirm({
message: "Offer a 10% discount to this customer?",
default: false,
});
console.log("\nSubmitting your review...");
// Resume the workflow with the agent's input
const resumeResult = await run.resume({
stepId: "reviewRecommendations",
context: {
approvedProducts,
customerNote,
offerDiscount,
},
});
console.log("\n===================================");
console.log("Workflow completed!");
console.log("Email content:");
console.log("===================================\n");
console.log(
resumeResult?.results?.sendRecommendations ||
"No email content generated",
);
return resumeResult;
}
return result;
}
// Invoke the workflow with interactive terminal input
runRecommendationWorkflow().catch(console.error);
Advanced Example with Multiple User Inputs
This example demonstrates a more complex workflow that requires multiple human intervention points, such as in a content moderation system.
import { Mastra } from "@mastra/core";
import { LegacyStep, LegacyWorkflow } from "@mastra/core/workflows/legacy";
import { z } from "zod";
import { select, input } from "@inquirer/prompts";
// Step 1: Receive and analyze content
const analyzeContent = new LegacyStep({
id: "analyzeContent",
outputSchema: z.object({
content: z.string(),
aiAnalysisScore: z.number(),
flaggedCategories: z.array(z.string()).optional(),
}),
execute: async ({ context }) => {
const content = context.triggerData.content;
// Simulate AI analysis
const aiAnalysisScore = simulateContentAnalysis(content);
const flaggedCategories =
aiAnalysisScore < 0.7
? ["potentially inappropriate", "needs review"]
: [];
return {
content,
aiAnalysisScore,
flaggedCategories,
};
},
});
// Step 2: Moderate content that needs review
const moderateContent = new LegacyStep({
id: "moderateContent",
// Define the schema for human input that will be provided when resuming
inputSchema: z.object({
moderatorDecision: z.enum(["approve", "reject", "modify"]).optional(),
moderatorNotes: z.string().optional(),
modifiedContent: z.string().optional(),
}),
outputSchema: z.object({
moderationResult: z.enum(["approved", "rejected", "modified"]),
moderatedContent: z.string(),
notes: z.string().optional(),
}),
// @ts-ignore
execute: async ({ context, suspend }) => {
const analysisResult = context.getStepResult(analyzeContent);
// Access the input provided when resuming the workflow
const moderatorInput = {
decision: context.inputData?.moderatorDecision,
notes: context.inputData?.moderatorNotes,
modifiedContent: context.inputData?.modifiedContent,
};
// If the AI analysis score is high enough, auto-approve
if (
analysisResult?.aiAnalysisScore > 0.9 &&
!analysisResult?.flaggedCategories?.length
) {
return {
moderationResult: "approved",
moderatedContent: analysisResult.content,
notes: "Auto-approved by system",
};
}
// If we don't have moderator input yet, suspend for human review
if (!moderatorInput.decision) {
await suspend({
content: analysisResult?.content,
aiScore: analysisResult?.aiAnalysisScore,
flaggedCategories: analysisResult?.flaggedCategories,
message: "Please review this content and make a moderation decision",
});
// Placeholder return
return {
moderationResult: "approved",
moderatedContent: "",
};
}
// Process the moderator's decision
switch (moderatorInput.decision) {
case "approve":
return {
moderationResult: "approved",
moderatedContent: analysisResult?.content || "",
notes: moderatorInput.notes || "Approved by moderator",
};
case "reject":
return {
moderationResult: "rejected",
moderatedContent: "",
notes: moderatorInput.notes || "Rejected by moderator",
};
case "modify":
return {
moderationResult: "modified",
moderatedContent:
moderatorInput.modifiedContent || analysisResult?.content || "",
notes: moderatorInput.notes || "Modified by moderator",
};
default:
return {
moderationResult: "rejected",
moderatedContent: "",
notes: "Invalid moderator decision",
};
}
},
});
// Step 3: Apply moderation actions
const applyModeration = new LegacyStep({
id: "applyModeration",
outputSchema: z.object({
finalStatus: z.string(),
content: z.string().optional(),
auditLog: z.object({
originalContent: z.string(),
moderationResult: z.string(),
aiScore: z.number(),
timestamp: z.string(),
}),
}),
execute: async ({ context }) => {
const analysisResult = context.getStepResult(analyzeContent);
const moderationResult = context.getStepResult(moderateContent);
// Create audit log
const auditLog = {
originalContent: analysisResult?.content || "",
moderationResult: moderationResult?.moderationResult || "unknown",
aiScore: analysisResult?.aiAnalysisScore || 0,
timestamp: new Date().toISOString(),
};
// Apply moderation action
switch (moderationResult?.moderationResult) {
case "approved":
return {
finalStatus: "Content published",
content: moderationResult.moderatedContent,
auditLog,
};
case "modified":
return {
finalStatus: "Content modified and published",
content: moderationResult.moderatedContent,
auditLog,
};
case "rejected":
return {
finalStatus: "Content rejected",
auditLog,
};
default:
return {
finalStatus: "Error in moderation process",
auditLog,
};
}
},
});
// Build the workflow
const contentModerationWorkflow = new LegacyWorkflow({
name: "content-moderation-workflow",
triggerSchema: z.object({
content: z.string(),
}),
});
contentModerationWorkflow
.step(analyzeContent)
.then(moderateContent)
.then(applyModeration)
.commit();
// Register the workflow
const mastra = new Mastra({
legacy_workflows: { contentModerationWorkflow },
});
// Example of using the workflow with Inquirer prompts
async function runModerationDemo() {
const registeredWorkflow = mastra.legacy_getWorkflow("contentModerationWorkflow");
const run = registeredWorkflow.createRun();
// Start the workflow with content that needs review
console.log("Starting content moderation workflow...");
const result = await run.start({
triggerData: {
content: "This is some user-generated content that requires moderation.",
},
});
const isReviewStepSuspended =
result.activePaths.get("moderateContent")?.status === "suspended";
// Check if workflow is suspended
if (isReviewStepSuspended) {
const { content, aiScore, flaggedCategories, message } =
result.activePaths.get("moderateContent")?.suspendPayload;
console.log("\n===================================");
console.log(message);
console.log("===================================\n");
console.log("Content to review:");
console.log(content);
console.log(`\nAI Analysis Score: ${aiScore}`);
console.log(
`Flagged Categories: ${flaggedCategories?.join(", ") || "None"}\n`,
);
// Collect moderator decision using Inquirer
const moderatorDecision = await select({
message: "Select your moderation decision:",
choices: [
{ name: "Approve content as is", value: "approve" },
{ name: "Reject content completely", value: "reject" },
{ name: "Modify content before publishing", value: "modify" },
],
});
// Collect additional information based on decision
let moderatorNotes = "";
let modifiedContent = "";
moderatorNotes = await input({
message: "Enter any notes about your decision:",
});
if (moderatorDecision === "modify") {
modifiedContent = await input({
message: "Enter the modified content:",
default: content,
});
}
console.log("\nSubmitting your moderation decision...");
// Resume the workflow with the moderator's input
const resumeResult = await run.resume({
stepId: "moderateContent",
context: {
moderatorDecision,
moderatorNotes,
modifiedContent,
},
});
if (resumeResult?.results?.applyModeration?.status === "success") {
console.log("\n===================================");
console.log(
`Moderation complete: ${resumeResult?.results?.applyModeration?.output.finalStatus}`,
);
console.log("===================================\n");
if (resumeResult?.results?.applyModeration?.output.content) {
console.log("Published content:");
console.log(resumeResult.results.applyModeration.output.content);
}
}
return resumeResult;
}
console.log(
"Workflow completed without requiring human intervention:",
result.results,
);
return result;
}
// Helper function for AI content analysis simulation
function simulateContentAnalysis(content: string): number {
// In a real application, this would call an AI service
// For the example, we're returning a random score
return Math.random();
}
// Invoke the demo function
runModerationDemo().catch(console.error);
Key Concepts
-
Suspension Points - Use the
suspend()
function within a step’s execute to pause workflow execution. -
Suspension Payload - Pass relevant data when suspending to provide context for human decision-making:
await suspend({
messageForHuman: "Please review this data",
data: someImportantData,
});
- Checking Workflow Status - After starting a workflow, check the returned status to see if it’s suspended:
const result = await workflow.start({ triggerData });
if (result.status === "suspended" && result.suspendedStepId === "stepId") {
// Process suspension
console.log("Workflow is waiting for input:", result.suspendPayload);
}
- Interactive Terminal Input - Use libraries like Inquirer to create interactive prompts:
import { select, input, confirm } from "@inquirer/prompts";
// When the workflow is suspended
if (result.status === "suspended") {
// Display information from the suspend payload
console.log(result.suspendPayload.message);
// Collect user input interactively
const decision = await select({
message: "What would you like to do?",
choices: [
{ name: "Approve", value: "approve" },
{ name: "Reject", value: "reject" },
],
});
// Resume the workflow with the collected input
await run.resume({
stepId: result.suspendedStepId,
context: { decision },
});
}
- Resuming Workflow - Use the
resume()
method to continue workflow execution with human input:
const resumeResult = await run.resume({
stepId: "suspendedStepId",
context: {
// This data is passed to the suspended step as context.inputData
// and must conform to the step's inputSchema
userDecision: "approve",
},
});
- Input Schema for Human Data - Define an input schema on steps that might be resumed with human input to ensure type safety:
const myStep = new LegacyStep({
id: "myStep",
inputSchema: z.object({
// This schema validates the data passed in resume's context
// and makes it available as context.inputData
userDecision: z.enum(["approve", "reject"]),
userComments: z.string().optional(),
}),
execute: async ({ context, suspend }) => {
// Check if we have user input from a previous suspension
if (context.inputData?.userDecision) {
// Process the user's decision
return { result: `User decided: ${context.inputData.userDecision}` };
}
// If no input, suspend for human decision
await suspend();
},
});
Human-in-the-loop workflows are powerful for building systems that blend automation with human judgment, such as:
- Content moderation systems
- Approval workflows
- Supervised AI systems
- Customer service automation with escalation