Human in the Loop Workflow
Human-in-the-loop workflows allow you to pause execution at specific points to collect user input, make decisions, or perform actions that require human judgment. This example demonstrates how to create a workflow with human intervention points.
How It Works
- A workflow step can suspend execution using the
function, optionally passing a payload with context for the human decision maker. - When the workflow is resumed, the human input is passed in the
parameter of theresume()
call. - This input becomes available in the step’s execution context as
, which is typed according to the step’sinputSchema
. - The step can then continue execution based on the human input.
This pattern allows for safe, type-checked human intervention in automated workflows.
Interactive Terminal Example Using Inquirer
This example demonstrates how to use the Inquirer library to collect user input directly from the terminal when a workflow is suspended, creating a truly interactive human-in-the-loop experience.
import { Mastra } from '@mastra/core';
import { Step, Workflow } from '@mastra/core/workflows';
import { z } from 'zod';
import { confirm, input, select } from '@inquirer/prompts';
// Step 1: Generate product recommendations
const generateRecommendations = new Step({
id: 'generateRecommendations',
outputSchema: z.object({
customerName: z.string(),
recommendations: z.array(
productId: z.string(),
productName: z.string(),
price: z.number(),
description: z.string(),
execute: async ({ context }) => {
const customerName = context.triggerData.customerName;
// In a real application, you might call an API or ML model here
// For this example, we'll return mock data
return {
recommendations: [
productId: 'prod-001',
productName: 'Premium Widget',
price: 99.99,
description: 'Our best-selling premium widget with advanced features',
productId: 'prod-002',
productName: 'Basic Widget',
price: 49.99,
description: 'Affordable entry-level widget for beginners',
productId: 'prod-003',
productName: 'Widget Pro Plus',
price: 149.99,
description: 'Professional-grade widget with extended warranty',
// Step 2: Get human approval and customization for the recommendations
const reviewRecommendations = new Step({
id: 'reviewRecommendations',
inputSchema: z.object({
approvedProducts: z.array(z.string()),
customerNote: z.string().optional(),
offerDiscount: z.boolean().optional(),
outputSchema: z.object({
finalRecommendations: z.array(
productId: z.string(),
productName: z.string(),
price: z.number(),
customerNote: z.string().optional(),
offerDiscount: z.boolean(),
execute: async ({ context, suspend }) => {
const { customerName, recommendations } = context.getStepResult(generateRecommendations) || {
customerName: '',
recommendations: [],
// Check if we have input from a resumed workflow
const reviewInput = {
approvedProducts: context.inputData?.approvedProducts || [],
customerNote: context.inputData?.customerNote,
offerDiscount: context.inputData?.offerDiscount,
// If we don't have agent input yet, suspend for human review
if (!reviewInput.approvedProducts.length) {
console.log(`Generating recommendations for customer: ${customerName}`);
await suspend({
message: 'Please review these product recommendations before sending to the customer',
// Placeholder return (won't be reached due to suspend)
return {
finalRecommendations: [],
customerNote: '',
offerDiscount: false,
// Process the agent's product selections
const finalRecommendations = recommendations
.filter(product => reviewInput.approvedProducts.includes(product.productId))
.map(product => ({
productId: product.productId,
productName: product.productName,
price: product.price,
return {
customerNote: reviewInput.customerNote || '',
offerDiscount: reviewInput.offerDiscount || false,
// Step 3: Send the recommendations to the customer
const sendRecommendations = new Step({
id: 'sendRecommendations',
outputSchema: z.object({
emailSent: z.boolean(),
emailContent: z.string(),
execute: async ({ context }) => {
const { customerName } = context.getStepResult(generateRecommendations) || { customerName: '' };
const { finalRecommendations, customerNote, offerDiscount } = context.getStepResult(reviewRecommendations) || {
finalRecommendations: [],
customerNote: '',
offerDiscount: false,
// Generate email content based on the recommendations
let emailContent = `Dear ${customerName},\n\nBased on your preferences, we recommend:\n\n`;
finalRecommendations.forEach(product => {
emailContent += `- ${product.productName}: $${product.price.toFixed(2)}\n`;
if (offerDiscount) {
emailContent += '\nAs a valued customer, use code SAVE10 for 10% off your next purchase!\n';
if (customerNote) {
emailContent += `\nPersonal note: ${customerNote}\n`;
emailContent += '\nThank you for your business,\nThe Sales Team';
// In a real application, you would send this email
console.log('Email content generated:', emailContent);
return {
emailSent: true,
// Build the workflow
const recommendationWorkflow = new Workflow({
name: 'product-recommendation-workflow',
triggerSchema: z.object({
customerName: z.string(),
// Register the workflow
const mastra = new Mastra({
workflows: { recommendationWorkflow },
// Example of using the workflow with Inquirer prompts
async function runRecommendationWorkflow() {
const registeredWorkflow = mastra.getWorkflow('recommendationWorkflow');
const run = registeredWorkflow.createRun();
console.log('Starting product recommendation workflow...');
const result = await run.start({
triggerData: {
customerName: 'Jane Smith',
const isReviewStepSuspended = result.activePaths.get('reviewRecommendations')?.status === 'suspended';
// Check if workflow is suspended for human review
if (isReviewStepSuspended) {
const { customerName, recommendations, message } = result.activePaths.get('reviewRecommendations')?.suspendPayload;
console.log(`Customer: ${customerName}`);
// Use Inquirer to collect input from the sales agent in the terminal
console.log('Available product recommendations:');
recommendations.forEach((product, index) => {
console.log(`${index + 1}. ${product.productName} - $${product.price.toFixed(2)}`);
console.log(` ${product.description}\n`);
// Let the agent select which products to recommend
const approvedProducts = await checkbox({
message: 'Select products to recommend to the customer:',
choices: => ({
name: `${product.productName} ($${product.price.toFixed(2)})`,
value: product.productId,
// Let the agent add a personal note
const includeNote = await confirm({
message: 'Would you like to add a personal note?',
default: false,
let customerNote = '';
if (includeNote) {
customerNote = await input({
message: 'Enter your personalized note for the customer:',
// Ask if a discount should be offered
const offerDiscount = await confirm({
message: 'Offer a 10% discount to this customer?',
default: false,
console.log('\nSubmitting your review...');
// Resume the workflow with the agent's input
const resumeResult = await run.resume({
stepId: 'reviewRecommendations',
context: {
console.log('Workflow completed!');
console.log('Email content:');
console.log(resumeResult?.results?.sendRecommendations || 'No email content generated');
return resumeResult;
return result;
// Invoke the workflow with interactive terminal input
Advanced Example with Multiple User Inputs
This example demonstrates a more complex workflow that requires multiple human intervention points, such as in a content moderation system.
import { Mastra } from '@mastra/core';
import { Step, Workflow } from '@mastra/core/workflows';
import { z } from 'zod';
import { select, input } from '@inquirer/prompts';
// Step 1: Receive and analyze content
const analyzeContent = new Step({
id: 'analyzeContent',
outputSchema: z.object({
content: z.string(),
aiAnalysisScore: z.number(),
flaggedCategories: z.array(z.string()).optional(),
execute: async ({ context }) => {
const content = context.triggerData.content;
// Simulate AI analysis
const aiAnalysisScore = simulateContentAnalysis(content);
const flaggedCategories = aiAnalysisScore < 0.7
? ['potentially inappropriate', 'needs review']
: [];
return {
// Step 2: Moderate content that needs review
const moderateContent = new Step({
id: 'moderateContent',
// Define the schema for human input that will be provided when resuming
inputSchema: z.object({
moderatorDecision: z.enum(['approve', 'reject', 'modify']).optional(),
moderatorNotes: z.string().optional(),
modifiedContent: z.string().optional(),
outputSchema: z.object({
moderationResult: z.enum(['approved', 'rejected', 'modified']),
moderatedContent: z.string(),
notes: z.string().optional(),
// @ts-ignore
execute: async ({ context, suspend }) => {
const analysisResult = context.getStepResult(analyzeContent);
// Access the input provided when resuming the workflow
const moderatorInput = {
decision: context.inputData?.moderatorDecision,
notes: context.inputData?.moderatorNotes,
modifiedContent: context.inputData?.modifiedContent,
// If the AI analysis score is high enough, auto-approve
if (analysisResult?.aiAnalysisScore > 0.9 && !analysisResult?.flaggedCategories?.length) {
return {
moderationResult: 'approved',
moderatedContent: analysisResult.content,
notes: 'Auto-approved by system',
// If we don't have moderator input yet, suspend for human review
if (!moderatorInput.decision) {
await suspend({
content: analysisResult?.content,
aiScore: analysisResult?.aiAnalysisScore,
flaggedCategories: analysisResult?.flaggedCategories,
message: 'Please review this content and make a moderation decision',
// Placeholder return
return {
moderationResult: 'approved',
moderatedContent: '',
// Process the moderator's decision
switch (moderatorInput.decision) {
case 'approve':
return {
moderationResult: 'approved',
moderatedContent: analysisResult?.content || '',
notes: moderatorInput.notes || 'Approved by moderator',
case 'reject':
return {
moderationResult: 'rejected',
moderatedContent: '',
notes: moderatorInput.notes || 'Rejected by moderator',
case 'modify':
return {
moderationResult: 'modified',
moderatedContent: moderatorInput.modifiedContent || analysisResult?.content || '',
notes: moderatorInput.notes || 'Modified by moderator',
return {
moderationResult: 'rejected',
moderatedContent: '',
notes: 'Invalid moderator decision',
// Step 3: Apply moderation actions
const applyModeration = new Step({
id: 'applyModeration',
outputSchema: z.object({
finalStatus: z.string(),
content: z.string().optional(),
auditLog: z.object({
originalContent: z.string(),
moderationResult: z.string(),
aiScore: z.number(),
timestamp: z.string(),
execute: async ({ context }) => {
const analysisResult = context.getStepResult(analyzeContent);
const moderationResult = context.getStepResult(moderateContent);
// Create audit log
const auditLog = {
originalContent: analysisResult?.content || '',
moderationResult: moderationResult?.moderationResult || 'unknown',
aiScore: analysisResult?.aiAnalysisScore || 0,
timestamp: new Date().toISOString(),
// Apply moderation action
switch (moderationResult?.moderationResult) {
case 'approved':
return {
finalStatus: 'Content published',
content: moderationResult.moderatedContent,
case 'modified':
return {
finalStatus: 'Content modified and published',
content: moderationResult.moderatedContent,
case 'rejected':
return {
finalStatus: 'Content rejected',
return {
finalStatus: 'Error in moderation process',
// Build the workflow
const contentModerationWorkflow = new Workflow({
name: 'content-moderation-workflow',
triggerSchema: z.object({
content: z.string(),
// Register the workflow
const mastra = new Mastra({
workflows: { contentModerationWorkflow },
// Example of using the workflow with Inquirer prompts
async function runModerationDemo() {
const registeredWorkflow = mastra.getWorkflow('contentModerationWorkflow');
const run = registeredWorkflow.createRun();
// Start the workflow with content that needs review
console.log('Starting content moderation workflow...');
const result = await run.start({
triggerData: {
content: 'This is some user-generated content that requires moderation.'
const isReviewStepSuspended = result.activePaths.get('moderateContent')?.status === 'suspended';
// Check if workflow is suspended
if (isReviewStepSuspended) {
const { content, aiScore, flaggedCategories, message } = result.activePaths.get('moderateContent')?.suspendPayload;
console.log('Content to review:');
console.log(`\nAI Analysis Score: ${aiScore}`);
console.log(`Flagged Categories: ${flaggedCategories?.join(', ') || 'None'}\n`);
// Collect moderator decision using Inquirer
const moderatorDecision = await select({
message: 'Select your moderation decision:',
choices: [
{ name: 'Approve content as is', value: 'approve' },
{ name: 'Reject content completely', value: 'reject' },
{ name: 'Modify content before publishing', value: 'modify' }
// Collect additional information based on decision
let moderatorNotes = '';
let modifiedContent = '';
moderatorNotes = await input({
message: 'Enter any notes about your decision:',
if (moderatorDecision === 'modify') {
modifiedContent = await input({
message: 'Enter the modified content:',
default: content,
console.log('\nSubmitting your moderation decision...');
// Resume the workflow with the moderator's input
const resumeResult = await run.resume({
stepId: 'moderateContent',
context: {
if (resumeResult?.results?.applyModeration?.status === 'success') {
console.log(`Moderation complete: ${resumeResult?.results?.applyModeration?.output.finalStatus}`);
if (resumeResult?.results?.applyModeration?.output.content) {
console.log('Published content:');
return resumeResult;
console.log('Workflow completed without requiring human intervention:', result.results);
return result;
// Helper function for AI content analysis simulation
function simulateContentAnalysis(content: string): number {
// In a real application, this would call an AI service
// For the example, we're returning a random score
return Math.random();
// Invoke the demo function
Key Concepts
Suspension Points - Use the
function within a step’s execute to pause workflow execution. -
Suspension Payload - Pass relevant data when suspending to provide context for human decision-making:
await suspend({ messageForHuman: 'Please review this data', data: someImportantData });
Checking Workflow Status - After starting a workflow, check the returned status to see if it’s suspended:
const result = await workflow.start({ triggerData }); if (result.status === 'suspended' && result.suspendedStepId === 'stepId') { // Process suspension console.log('Workflow is waiting for input:', result.suspendPayload); }
Interactive Terminal Input - Use libraries like Inquirer to create interactive prompts:
import { select, input, confirm } from '@inquirer/prompts'; // When the workflow is suspended if (result.status === 'suspended') { // Display information from the suspend payload console.log(result.suspendPayload.message); // Collect user input interactively const decision = await select({ message: 'What would you like to do?', choices: [ { name: 'Approve', value: 'approve' }, { name: 'Reject', value: 'reject' } ] }); // Resume the workflow with the collected input await run.resume({ stepId: result.suspendedStepId, context: { decision } }); }
Resuming Workflow - Use the
method to continue workflow execution with human input:const resumeResult = await run.resume({ stepId: 'suspendedStepId', context: { // This data is passed to the suspended step as context.inputData // and must conform to the step's inputSchema userDecision: 'approve' }, });
Input Schema for Human Data - Define an input schema on steps that might be resumed with human input to ensure type safety:
const myStep = new Step({ id: 'myStep', inputSchema: z.object({ // This schema validates the data passed in resume's context // and makes it available as context.inputData userDecision: z.enum(['approve', 'reject']), userComments: z.string().optional(), }), execute: async ({ context, suspend }) => { // Check if we have user input from a previous suspension if (context.inputData?.userDecision) { // Process the user's decision return { result: `User decided: ${context.inputData.userDecision}` }; } // If no input, suspend for human decision await suspend(); } });
Human-in-the-loop workflows are powerful for building systems that blend automation with human judgment, such as:
- Content moderation systems
- Approval workflows
- Supervised AI systems
- Customer service automation with escalation