Human in the Loop Workflow
Human-in-the-loop workflows allow you to pause execution at specific points to collect user input, make decisions, or perform actions that require human judgment. This example demonstrates how to create a workflow with human intervention points.
How It Works
- A workflow step can suspend execution using the
suspend()
function, optionally passing a payload with context for the human decision maker. - When the workflow is resumed, the human input is passed in the
context
parameter of theresume()
call. - This input becomes available in the step’s execution context as
context.inputData
, which is typed according to the step’sinputSchema
. - The step can then continue execution based on the human input.
This pattern allows for safe, type-checked human intervention in automated workflows.
Interactive Terminal Example Using Inquirer
This example demonstrates how to use the Inquirer library to collect user input directly from the terminal when a workflow is suspended, creating a truly interactive human-in-the-loop experience.
import { Mastra } from '@mastra/core';
import { Step, Workflow } from '@mastra/core/workflows';
import { z } from 'zod';
import { confirm, input, select } from '@inquirer/prompts';
// Step 1: Generate product recommendations
const generateRecommendations = new Step({
id: 'generateRecommendations',
outputSchema: z.object({
customerName: z.string(),
recommendations: z.array(
z.object({
productId: z.string(),
productName: z.string(),
price: z.number(),
description: z.string(),
}),
),
}),
execute: async ({ context }) => {
const customerName = context.triggerData.customerName;
// In a real application, you might call an API or ML model here
// For this example, we'll return mock data
return {
customerName,
recommendations: [
{
productId: 'prod-001',
productName: 'Premium Widget',
price: 99.99,
description: 'Our best-selling premium widget with advanced features',
},
{
productId: 'prod-002',
productName: 'Basic Widget',
price: 49.99,
description: 'Affordable entry-level widget for beginners',
},
{
productId: 'prod-003',
productName: 'Widget Pro Plus',
price: 149.99,
description: 'Professional-grade widget with extended warranty',
},
],
};
},
});
// Step 2: Get human approval and customization for the recommendations
const reviewRecommendations = new Step({
id: 'reviewRecommendations',
inputSchema: z.object({
approvedProducts: z.array(z.string()),
customerNote: z.string().optional(),
offerDiscount: z.boolean().optional(),
}),
outputSchema: z.object({
finalRecommendations: z.array(
z.object({
productId: z.string(),
productName: z.string(),
price: z.number(),
}),
),
customerNote: z.string().optional(),
offerDiscount: z.boolean(),
}),
execute: async ({ context, suspend }) => {
const { customerName, recommendations } = context.getStepResult(generateRecommendations) || {
customerName: '',
recommendations: [],
};
// Check if we have input from a resumed workflow
const reviewInput = {
approvedProducts: context.inputData?.approvedProducts || [],
customerNote: context.inputData?.customerNote,
offerDiscount: context.inputData?.offerDiscount,
};
// If we don't have agent input yet, suspend for human review
if (!reviewInput.approvedProducts.length) {
console.log(`Generating recommendations for customer: ${customerName}`);
await suspend({
customerName,
recommendations,
message: 'Please review these product recommendations before sending to the customer',
});
// Placeholder return (won't be reached due to suspend)
return {
finalRecommendations: [],
customerNote: '',
offerDiscount: false,
};
}
// Process the agent's product selections
const finalRecommendations = recommendations
.filter(product => reviewInput.approvedProducts.includes(product.productId))
.map(product => ({
productId: product.productId,
productName: product.productName,
price: product.price,
}));
return {
finalRecommendations,
customerNote: reviewInput.customerNote || '',
offerDiscount: reviewInput.offerDiscount || false,
};
},
});
// Step 3: Send the recommendations to the customer
const sendRecommendations = new Step({
id: 'sendRecommendations',
outputSchema: z.object({
emailSent: z.boolean(),
emailContent: z.string(),
}),
execute: async ({ context }) => {
const { customerName } = context.getStepResult(generateRecommendations) || { customerName: '' };
const { finalRecommendations, customerNote, offerDiscount } = context.getStepResult(reviewRecommendations) || {
finalRecommendations: [],
customerNote: '',
offerDiscount: false,
};
// Generate email content based on the recommendations
let emailContent = `Dear ${customerName},\n\nBased on your preferences, we recommend:\n\n`;
finalRecommendations.forEach(product => {
emailContent += `- ${product.productName}: $${product.price.toFixed(2)}\n`;
});
if (offerDiscount) {
emailContent += '\nAs a valued customer, use code SAVE10 for 10% off your next purchase!\n';
}
if (customerNote) {
emailContent += `\nPersonal note: ${customerNote}\n`;
}
emailContent += '\nThank you for your business,\nThe Sales Team';
// In a real application, you would send this email
console.log('Email content generated:', emailContent);
return {
emailSent: true,
emailContent,
};
},
});
// Build the workflow
const recommendationWorkflow = new Workflow({
name: 'product-recommendation-workflow',
triggerSchema: z.object({
customerName: z.string(),
}),
});
recommendationWorkflow
.step(generateRecommendations)
.then(reviewRecommendations)
.then(sendRecommendations)
.commit();
// Register the workflow
const mastra = new Mastra({
workflows: { recommendationWorkflow },
});
// Example of using the workflow with Inquirer prompts
async function runRecommendationWorkflow() {
const registeredWorkflow = mastra.getWorkflow('recommendationWorkflow');
const run = registeredWorkflow.createRun();
console.log('Starting product recommendation workflow...');
const result = await run.start({
triggerData: {
customerName: 'Jane Smith',
},
});
const isReviewStepSuspended = result.activePaths.get('reviewRecommendations')?.status === 'suspended';
// Check if workflow is suspended for human review
if (isReviewStepSuspended) {
const { customerName, recommendations, message } = result.activePaths.get('reviewRecommendations')?.suspendPayload;
console.log('\n===================================');
console.log(message);
console.log(`Customer: ${customerName}`);
console.log('===================================\n');
// Use Inquirer to collect input from the sales agent in the terminal
console.log('Available product recommendations:');
recommendations.forEach((product, index) => {
console.log(`${index + 1}. ${product.productName} - $${product.price.toFixed(2)}`);
console.log(` ${product.description}\n`);
});
// Let the agent select which products to recommend
const approvedProducts = await checkbox({
message: 'Select products to recommend to the customer:',
choices: recommendations.map(product => ({
name: `${product.productName} ($${product.price.toFixed(2)})`,
value: product.productId,
})),
});
// Let the agent add a personal note
const includeNote = await confirm({
message: 'Would you like to add a personal note?',
default: false,
});
let customerNote = '';
if (includeNote) {
customerNote = await input({
message: 'Enter your personalized note for the customer:',
});
}
// Ask if a discount should be offered
const offerDiscount = await confirm({
message: 'Offer a 10% discount to this customer?',
default: false,
});
console.log('\nSubmitting your review...');
// Resume the workflow with the agent's input
const resumeResult = await run.resume({
stepId: 'reviewRecommendations',
context: {
approvedProducts,
customerNote,
offerDiscount,
},
});
console.log('\n===================================');
console.log('Workflow completed!');
console.log('Email content:');
console.log('===================================\n');
console.log(resumeResult?.results?.sendRecommendations || 'No email content generated');
return resumeResult;
}
return result;
}
// Invoke the workflow with interactive terminal input
runRecommendationWorkflow().catch(console.error);
Advanced Example with Multiple User Inputs
This example demonstrates a more complex workflow that requires multiple human intervention points, such as in a content moderation system.
import { Mastra } from '@mastra/core';
import { Step, Workflow } from '@mastra/core/workflows';
import { z } from 'zod';
import { select, input } from '@inquirer/prompts';
// Step 1: Receive and analyze content
const analyzeContent = new Step({
id: 'analyzeContent',
outputSchema: z.object({
content: z.string(),
aiAnalysisScore: z.number(),
flaggedCategories: z.array(z.string()).optional(),
}),
execute: async ({ context }) => {
const content = context.triggerData.content;
// Simulate AI analysis
const aiAnalysisScore = simulateContentAnalysis(content);
const flaggedCategories = aiAnalysisScore < 0.7
? ['potentially inappropriate', 'needs review']
: [];
return {
content,
aiAnalysisScore,
flaggedCategories,
};
},
});
// Step 2: Moderate content that needs review
const moderateContent = new Step({
id: 'moderateContent',
// Define the schema for human input that will be provided when resuming
inputSchema: z.object({
moderatorDecision: z.enum(['approve', 'reject', 'modify']).optional(),
moderatorNotes: z.string().optional(),
modifiedContent: z.string().optional(),
}),
outputSchema: z.object({
moderationResult: z.enum(['approved', 'rejected', 'modified']),
moderatedContent: z.string(),
notes: z.string().optional(),
}),
// @ts-ignore
execute: async ({ context, suspend }) => {
const analysisResult = context.getStepResult(analyzeContent);
// Access the input provided when resuming the workflow
const moderatorInput = {
decision: context.inputData?.moderatorDecision,
notes: context.inputData?.moderatorNotes,
modifiedContent: context.inputData?.modifiedContent,
};
// If the AI analysis score is high enough, auto-approve
if (analysisResult?.aiAnalysisScore > 0.9 && !analysisResult?.flaggedCategories?.length) {
return {
moderationResult: 'approved',
moderatedContent: analysisResult.content,
notes: 'Auto-approved by system',
};
}
// If we don't have moderator input yet, suspend for human review
if (!moderatorInput.decision) {
await suspend({
content: analysisResult?.content,
aiScore: analysisResult?.aiAnalysisScore,
flaggedCategories: analysisResult?.flaggedCategories,
message: 'Please review this content and make a moderation decision',
});
// Placeholder return
return {
moderationResult: 'approved',
moderatedContent: '',
};
}
// Process the moderator's decision
switch (moderatorInput.decision) {
case 'approve':
return {
moderationResult: 'approved',
moderatedContent: analysisResult?.content || '',
notes: moderatorInput.notes || 'Approved by moderator',
};
case 'reject':
return {
moderationResult: 'rejected',
moderatedContent: '',
notes: moderatorInput.notes || 'Rejected by moderator',
};
case 'modify':
return {
moderationResult: 'modified',
moderatedContent: moderatorInput.modifiedContent || analysisResult?.content || '',
notes: moderatorInput.notes || 'Modified by moderator',
};
default:
return {
moderationResult: 'rejected',
moderatedContent: '',
notes: 'Invalid moderator decision',
};
}
},
});
// Step 3: Apply moderation actions
const applyModeration = new Step({
id: 'applyModeration',
outputSchema: z.object({
finalStatus: z.string(),
content: z.string().optional(),
auditLog: z.object({
originalContent: z.string(),
moderationResult: z.string(),
aiScore: z.number(),
timestamp: z.string(),
}),
}),
execute: async ({ context }) => {
const analysisResult = context.getStepResult(analyzeContent);
const moderationResult = context.getStepResult(moderateContent);
// Create audit log
const auditLog = {
originalContent: analysisResult?.content || '',
moderationResult: moderationResult?.moderationResult || 'unknown',
aiScore: analysisResult?.aiAnalysisScore || 0,
timestamp: new Date().toISOString(),
};
// Apply moderation action
switch (moderationResult?.moderationResult) {
case 'approved':
return {
finalStatus: 'Content published',
content: moderationResult.moderatedContent,
auditLog,
};
case 'modified':
return {
finalStatus: 'Content modified and published',
content: moderationResult.moderatedContent,
auditLog,
};
case 'rejected':
return {
finalStatus: 'Content rejected',
auditLog,
};
default:
return {
finalStatus: 'Error in moderation process',
auditLog,
};
}
},
});
// Build the workflow
const contentModerationWorkflow = new Workflow({
name: 'content-moderation-workflow',
triggerSchema: z.object({
content: z.string(),
}),
});
contentModerationWorkflow
.step(analyzeContent)
.then(moderateContent)
.then(applyModeration)
.commit();
// Register the workflow
const mastra = new Mastra({
workflows: { contentModerationWorkflow },
});
// Example of using the workflow with Inquirer prompts
async function runModerationDemo() {
const registeredWorkflow = mastra.getWorkflow('contentModerationWorkflow');
const run = registeredWorkflow.createRun();
// Start the workflow with content that needs review
console.log('Starting content moderation workflow...');
const result = await run.start({
triggerData: {
content: 'This is some user-generated content that requires moderation.'
}
});
const isReviewStepSuspended = result.activePaths.get('moderateContent')?.status === 'suspended';
// Check if workflow is suspended
if (isReviewStepSuspended) {
const { content, aiScore, flaggedCategories, message } = result.activePaths.get('moderateContent')?.suspendPayload;
console.log('\n===================================');
console.log(message);
console.log('===================================\n');
console.log('Content to review:');
console.log(content);
console.log(`\nAI Analysis Score: ${aiScore}`);
console.log(`Flagged Categories: ${flaggedCategories?.join(', ') || 'None'}\n`);
// Collect moderator decision using Inquirer
const moderatorDecision = await select({
message: 'Select your moderation decision:',
choices: [
{ name: 'Approve content as is', value: 'approve' },
{ name: 'Reject content completely', value: 'reject' },
{ name: 'Modify content before publishing', value: 'modify' }
],
});
// Collect additional information based on decision
let moderatorNotes = '';
let modifiedContent = '';
moderatorNotes = await input({
message: 'Enter any notes about your decision:',
});
if (moderatorDecision === 'modify') {
modifiedContent = await input({
message: 'Enter the modified content:',
default: content,
});
}
console.log('\nSubmitting your moderation decision...');
// Resume the workflow with the moderator's input
const resumeResult = await run.resume({
stepId: 'moderateContent',
context: {
moderatorDecision,
moderatorNotes,
modifiedContent,
},
});
if (resumeResult?.results?.applyModeration?.status === 'success') {
console.log('\n===================================');
console.log(`Moderation complete: ${resumeResult?.results?.applyModeration?.output.finalStatus}`);
console.log('===================================\n');
if (resumeResult?.results?.applyModeration?.output.content) {
console.log('Published content:');
console.log(resumeResult.results.applyModeration.output.content);
}
}
return resumeResult;
}
console.log('Workflow completed without requiring human intervention:', result.results);
return result;
}
// Helper function for AI content analysis simulation
function simulateContentAnalysis(content: string): number {
// In a real application, this would call an AI service
// For the example, we're returning a random score
return Math.random();
}
// Invoke the demo function
runModerationDemo().catch(console.error);
Key Concepts
-
Suspension Points - Use the
suspend()
function within a step’s execute to pause workflow execution. -
Suspension Payload - Pass relevant data when suspending to provide context for human decision-making:
await suspend({ messageForHuman: 'Please review this data', data: someImportantData });
-
Checking Workflow Status - After starting a workflow, check the returned status to see if it’s suspended:
const result = await workflow.start({ triggerData }); if (result.status === 'suspended' && result.suspendedStepId === 'stepId') { // Process suspension console.log('Workflow is waiting for input:', result.suspendPayload); }
-
Interactive Terminal Input - Use libraries like Inquirer to create interactive prompts:
import { select, input, confirm } from '@inquirer/prompts'; // When the workflow is suspended if (result.status === 'suspended') { // Display information from the suspend payload console.log(result.suspendPayload.message); // Collect user input interactively const decision = await select({ message: 'What would you like to do?', choices: [ { name: 'Approve', value: 'approve' }, { name: 'Reject', value: 'reject' } ] }); // Resume the workflow with the collected input await run.resume({ stepId: result.suspendedStepId, context: { decision } }); }
-
Resuming Workflow - Use the
resume()
method to continue workflow execution with human input:const resumeResult = await run.resume({ stepId: 'suspendedStepId', context: { // This data is passed to the suspended step as context.inputData // and must conform to the step's inputSchema userDecision: 'approve' }, });
-
Input Schema for Human Data - Define an input schema on steps that might be resumed with human input to ensure type safety:
const myStep = new Step({ id: 'myStep', inputSchema: z.object({ // This schema validates the data passed in resume's context // and makes it available as context.inputData userDecision: z.enum(['approve', 'reject']), userComments: z.string().optional(), }), execute: async ({ context, suspend }) => { // Check if we have user input from a previous suspension if (context.inputData?.userDecision) { // Process the user's decision return { result: `User decided: ${context.inputData.userDecision}` }; } // If no input, suspend for human decision await suspend(); } });
Human-in-the-loop workflows are powerful for building systems that blend automation with human judgment, such as:
- Content moderation systems
- Approval workflows
- Supervised AI systems
- Customer service automation with escalation