ExamplesWorkflowsHuman in the Loop

Human in the Loop Workflow

Human-in-the-loop workflows allow you to pause execution at specific points to collect user input, make decisions, or perform actions that require human judgment. This example demonstrates how to create a workflow with human intervention points.

How It Works

  1. A workflow step can suspend execution using the suspend() function, optionally passing a payload with context for the human decision maker.
  2. When the workflow is resumed, the human input is passed in the context parameter of the resume() call.
  3. This input becomes available in the step’s execution context as context.inputData, which is typed according to the step’s inputSchema.
  4. The step can then continue execution based on the human input.

This pattern allows for safe, type-checked human intervention in automated workflows.

Interactive Terminal Example Using Inquirer

This example demonstrates how to use the Inquirer library to collect user input directly from the terminal when a workflow is suspended, creating a truly interactive human-in-the-loop experience.

import { Mastra } from '@mastra/core';
import { Step, Workflow } from '@mastra/core/workflows';
import { z } from 'zod';
import { confirm, input, select } from '@inquirer/prompts';
 
// Step 1: Generate product recommendations
const generateRecommendations = new Step({
  id: 'generateRecommendations',
  outputSchema: z.object({
    customerName: z.string(),
    recommendations: z.array(
      z.object({
        productId: z.string(),
        productName: z.string(),
        price: z.number(),
        description: z.string(),
      }),
    ),
  }),
  execute: async ({ context }) => {
    const customerName = context.triggerData.customerName;
 
    // In a real application, you might call an API or ML model here
    // For this example, we'll return mock data
    return {
      customerName,
      recommendations: [
        {
          productId: 'prod-001',
          productName: 'Premium Widget',
          price: 99.99,
          description: 'Our best-selling premium widget with advanced features',
        },
        {
          productId: 'prod-002',
          productName: 'Basic Widget',
          price: 49.99,
          description: 'Affordable entry-level widget for beginners',
        },
        {
          productId: 'prod-003',
          productName: 'Widget Pro Plus',
          price: 149.99,
          description: 'Professional-grade widget with extended warranty',
        },
      ],
    };
  },
});
 
// Step 2: Get human approval and customization for the recommendations
const reviewRecommendations = new Step({
  id: 'reviewRecommendations',
  inputSchema: z.object({
    approvedProducts: z.array(z.string()),
    customerNote: z.string().optional(),
    offerDiscount: z.boolean().optional(),
  }),
  outputSchema: z.object({
    finalRecommendations: z.array(
      z.object({
        productId: z.string(),
        productName: z.string(),
        price: z.number(),
      }),
    ),
    customerNote: z.string().optional(),
    offerDiscount: z.boolean(),
  }),
  execute: async ({ context, suspend }) => {
    const { customerName, recommendations } = context.getStepResult(generateRecommendations) || {
      customerName: '',
      recommendations: [],
    };
 
    // Check if we have input from a resumed workflow
    const reviewInput = {
      approvedProducts: context.inputData?.approvedProducts || [],
      customerNote: context.inputData?.customerNote,
      offerDiscount: context.inputData?.offerDiscount,
    };
 
    // If we don't have agent input yet, suspend for human review
    if (!reviewInput.approvedProducts.length) {
      console.log(`Generating recommendations for customer: ${customerName}`);
      await suspend({
        customerName,
        recommendations,
        message: 'Please review these product recommendations before sending to the customer',
      });
 
      // Placeholder return (won't be reached due to suspend)
      return {
        finalRecommendations: [],
        customerNote: '',
        offerDiscount: false,
      };
    }
 
    // Process the agent's product selections
    const finalRecommendations = recommendations
      .filter(product => reviewInput.approvedProducts.includes(product.productId))
      .map(product => ({
        productId: product.productId,
        productName: product.productName,
        price: product.price,
      }));
 
    return {
      finalRecommendations,
      customerNote: reviewInput.customerNote || '',
      offerDiscount: reviewInput.offerDiscount || false,
    };
  },
});
 
// Step 3: Send the recommendations to the customer
const sendRecommendations = new Step({
  id: 'sendRecommendations',
  outputSchema: z.object({
    emailSent: z.boolean(),
    emailContent: z.string(),
  }),
  execute: async ({ context }) => {
    const { customerName } = context.getStepResult(generateRecommendations) || { customerName: '' };
    const { finalRecommendations, customerNote, offerDiscount } = context.getStepResult(reviewRecommendations) || {
      finalRecommendations: [],
      customerNote: '',
      offerDiscount: false,
    };
 
    // Generate email content based on the recommendations
    let emailContent = `Dear ${customerName},\n\nBased on your preferences, we recommend:\n\n`;
 
    finalRecommendations.forEach(product => {
      emailContent += `- ${product.productName}: $${product.price.toFixed(2)}\n`;
    });
 
    if (offerDiscount) {
      emailContent += '\nAs a valued customer, use code SAVE10 for 10% off your next purchase!\n';
    }
 
    if (customerNote) {
      emailContent += `\nPersonal note: ${customerNote}\n`;
    }
 
    emailContent += '\nThank you for your business,\nThe Sales Team';
 
    // In a real application, you would send this email
    console.log('Email content generated:', emailContent);
 
    return {
      emailSent: true,
      emailContent,
    };
  },
});
 
// Build the workflow
const recommendationWorkflow = new Workflow({
  name: 'product-recommendation-workflow',
  triggerSchema: z.object({
    customerName: z.string(),
  }),
});
 
recommendationWorkflow
.step(generateRecommendations)
.then(reviewRecommendations)
.then(sendRecommendations)
.commit();
 
// Register the workflow
const mastra = new Mastra({
  workflows: { recommendationWorkflow },
});
 
// Example of using the workflow with Inquirer prompts
async function runRecommendationWorkflow() {
  const registeredWorkflow = mastra.getWorkflow('recommendationWorkflow');
  const run = registeredWorkflow.createRun();
 
  console.log('Starting product recommendation workflow...');
  const result = await run.start({
    triggerData: {
      customerName: 'Jane Smith',
    },
  });
 
  const isReviewStepSuspended = result.activePaths.get('reviewRecommendations')?.status === 'suspended';
 
  // Check if workflow is suspended for human review
  if (isReviewStepSuspended) {
    const { customerName, recommendations, message } = result.activePaths.get('reviewRecommendations')?.suspendPayload;
 
    console.log('\n===================================');
    console.log(message);
    console.log(`Customer: ${customerName}`);
    console.log('===================================\n');
 
    // Use Inquirer to collect input from the sales agent in the terminal
    console.log('Available product recommendations:');
    recommendations.forEach((product, index) => {
      console.log(`${index + 1}. ${product.productName} - $${product.price.toFixed(2)}`);
      console.log(`   ${product.description}\n`);
    });
 
    // Let the agent select which products to recommend
    const approvedProducts = await checkbox({
      message: 'Select products to recommend to the customer:',
      choices: recommendations.map(product => ({
        name: `${product.productName} ($${product.price.toFixed(2)})`,
        value: product.productId,
      })),
    });
 
    // Let the agent add a personal note
    const includeNote = await confirm({
      message: 'Would you like to add a personal note?',
      default: false,
    });
 
    let customerNote = '';
    if (includeNote) {
      customerNote = await input({
        message: 'Enter your personalized note for the customer:',
      });
    }
 
    // Ask if a discount should be offered
    const offerDiscount = await confirm({
      message: 'Offer a 10% discount to this customer?',
      default: false,
    });
 
    console.log('\nSubmitting your review...');
 
    // Resume the workflow with the agent's input
    const resumeResult = await run.resume({
      stepId: 'reviewRecommendations',
      context: {
        approvedProducts,
        customerNote,
        offerDiscount,
      },
    });
 
    console.log('\n===================================');
    console.log('Workflow completed!');
    console.log('Email content:');
    console.log('===================================\n');
    console.log(resumeResult?.results?.sendRecommendations || 'No email content generated');
 
    return resumeResult;
  }
 
  return result;
}
 
// Invoke the workflow with interactive terminal input
runRecommendationWorkflow().catch(console.error);
 

Advanced Example with Multiple User Inputs

This example demonstrates a more complex workflow that requires multiple human intervention points, such as in a content moderation system.

import { Mastra } from '@mastra/core';
import { Step, Workflow } from '@mastra/core/workflows';
import { z } from 'zod';
import { select, input } from '@inquirer/prompts';
 
// Step 1: Receive and analyze content
const analyzeContent = new Step({
  id: 'analyzeContent',
  outputSchema: z.object({
    content: z.string(),
    aiAnalysisScore: z.number(),
    flaggedCategories: z.array(z.string()).optional(),
  }),
  execute: async ({ context }) => {
    const content = context.triggerData.content;
 
    // Simulate AI analysis
    const aiAnalysisScore = simulateContentAnalysis(content);
    const flaggedCategories = aiAnalysisScore < 0.7
      ? ['potentially inappropriate', 'needs review']
      : [];
 
    return {
      content,
      aiAnalysisScore,
      flaggedCategories,
    };
  },
});
 
// Step 2: Moderate content that needs review
const moderateContent = new Step({
  id: 'moderateContent',
  // Define the schema for human input that will be provided when resuming
  inputSchema: z.object({
    moderatorDecision: z.enum(['approve', 'reject', 'modify']).optional(),
    moderatorNotes: z.string().optional(),
    modifiedContent: z.string().optional(),
  }),
  outputSchema: z.object({
    moderationResult: z.enum(['approved', 'rejected', 'modified']),
    moderatedContent: z.string(),
    notes: z.string().optional(),
  }),
  // @ts-ignore
  execute: async ({ context, suspend }) => {
    const analysisResult = context.getStepResult(analyzeContent);
    // Access the input provided when resuming the workflow
    const moderatorInput = {
      decision: context.inputData?.moderatorDecision,
      notes: context.inputData?.moderatorNotes,
      modifiedContent: context.inputData?.modifiedContent,
    };
 
    // If the AI analysis score is high enough, auto-approve
    if (analysisResult?.aiAnalysisScore > 0.9 && !analysisResult?.flaggedCategories?.length) {
      return {
        moderationResult: 'approved',
        moderatedContent: analysisResult.content,
        notes: 'Auto-approved by system',
      };
    }
 
    // If we don't have moderator input yet, suspend for human review
    if (!moderatorInput.decision) {
      await suspend({
        content: analysisResult?.content,
        aiScore: analysisResult?.aiAnalysisScore,
        flaggedCategories: analysisResult?.flaggedCategories,
        message: 'Please review this content and make a moderation decision',
      });
 
      // Placeholder return
      return {
        moderationResult: 'approved',
        moderatedContent: '',
      };
    }
 
    // Process the moderator's decision
    switch (moderatorInput.decision) {
      case 'approve':
        return {
          moderationResult: 'approved',
          moderatedContent: analysisResult?.content || '',
          notes: moderatorInput.notes || 'Approved by moderator',
        };
 
      case 'reject':
        return {
          moderationResult: 'rejected',
          moderatedContent: '',
          notes: moderatorInput.notes || 'Rejected by moderator',
        };
 
      case 'modify':
        return {
          moderationResult: 'modified',
          moderatedContent: moderatorInput.modifiedContent || analysisResult?.content || '',
          notes: moderatorInput.notes || 'Modified by moderator',
        };
 
      default:
        return {
          moderationResult: 'rejected',
          moderatedContent: '',
          notes: 'Invalid moderator decision',
        };
    }
  },
});
 
// Step 3: Apply moderation actions
const applyModeration = new Step({
  id: 'applyModeration',
  outputSchema: z.object({
    finalStatus: z.string(),
    content: z.string().optional(),
    auditLog: z.object({
      originalContent: z.string(),
      moderationResult: z.string(),
      aiScore: z.number(),
      timestamp: z.string(),
    }),
  }),
  execute: async ({ context }) => {
    const analysisResult = context.getStepResult(analyzeContent);
    const moderationResult = context.getStepResult(moderateContent);
 
    // Create audit log
    const auditLog = {
      originalContent: analysisResult?.content || '',
      moderationResult: moderationResult?.moderationResult || 'unknown',
      aiScore: analysisResult?.aiAnalysisScore || 0,
      timestamp: new Date().toISOString(),
    };
 
    // Apply moderation action
    switch (moderationResult?.moderationResult) {
      case 'approved':
        return {
          finalStatus: 'Content published',
          content: moderationResult.moderatedContent,
          auditLog,
        };
 
      case 'modified':
        return {
          finalStatus: 'Content modified and published',
          content: moderationResult.moderatedContent,
          auditLog,
        };
 
      case 'rejected':
        return {
          finalStatus: 'Content rejected',
          auditLog,
        };
 
      default:
        return {
          finalStatus: 'Error in moderation process',
          auditLog,
        };
    }
  },
});
 
// Build the workflow
const contentModerationWorkflow = new Workflow({
  name: 'content-moderation-workflow',
  triggerSchema: z.object({
    content: z.string(),
  }),
});
 
contentModerationWorkflow
  .step(analyzeContent)
  .then(moderateContent)
  .then(applyModeration)
  .commit();
 
// Register the workflow
const mastra = new Mastra({
  workflows: { contentModerationWorkflow },
});
 
// Example of using the workflow with Inquirer prompts
async function runModerationDemo() {
  const registeredWorkflow = mastra.getWorkflow('contentModerationWorkflow');
  const run = registeredWorkflow.createRun();
 
  // Start the workflow with content that needs review
  console.log('Starting content moderation workflow...');
  const result = await run.start({
    triggerData: {
      content: 'This is some user-generated content that requires moderation.'
    }
  });
 
  const isReviewStepSuspended = result.activePaths.get('moderateContent')?.status === 'suspended';
 
  // Check if workflow is suspended
  if (isReviewStepSuspended) {
    const { content, aiScore, flaggedCategories, message } = result.activePaths.get('moderateContent')?.suspendPayload;
 
    console.log('\n===================================');
    console.log(message);
    console.log('===================================\n');
 
    console.log('Content to review:');
    console.log(content);
    console.log(`\nAI Analysis Score: ${aiScore}`);
    console.log(`Flagged Categories: ${flaggedCategories?.join(', ') || 'None'}\n`);
 
    // Collect moderator decision using Inquirer
    const moderatorDecision = await select({
      message: 'Select your moderation decision:',
      choices: [
        { name: 'Approve content as is', value: 'approve' },
        { name: 'Reject content completely', value: 'reject' },
        { name: 'Modify content before publishing', value: 'modify' }
      ],
    });
 
    // Collect additional information based on decision
    let moderatorNotes = '';
    let modifiedContent = '';
 
    moderatorNotes = await input({
      message: 'Enter any notes about your decision:',
    });
 
    if (moderatorDecision === 'modify') {
      modifiedContent = await input({
        message: 'Enter the modified content:',
        default: content,
      });
    }
 
    console.log('\nSubmitting your moderation decision...');
 
    // Resume the workflow with the moderator's input
    const resumeResult = await run.resume({
      stepId: 'moderateContent',
      context: {
        moderatorDecision,
        moderatorNotes,
        modifiedContent,
      },
    });
 
    if (resumeResult?.results?.applyModeration?.status === 'success') {
      console.log('\n===================================');
      console.log(`Moderation complete: ${resumeResult?.results?.applyModeration?.output.finalStatus}`);
      console.log('===================================\n');
 
      if (resumeResult?.results?.applyModeration?.output.content) {
        console.log('Published content:');
        console.log(resumeResult.results.applyModeration.output.content);
      }
    }
 
    return resumeResult;
  }
 
  console.log('Workflow completed without requiring human intervention:', result.results);
  return result;
}
 
// Helper function for AI content analysis simulation
function simulateContentAnalysis(content: string): number {
  // In a real application, this would call an AI service
  // For the example, we're returning a random score
  return Math.random();
}
 
// Invoke the demo function
runModerationDemo().catch(console.error);

Key Concepts

  1. Suspension Points - Use the suspend() function within a step’s execute to pause workflow execution.

  2. Suspension Payload - Pass relevant data when suspending to provide context for human decision-making:

    await suspend({
      messageForHuman: 'Please review this data',
      data: someImportantData
    });
  3. Checking Workflow Status - After starting a workflow, check the returned status to see if it’s suspended:

    const result = await workflow.start({ triggerData });
    if (result.status === 'suspended' && result.suspendedStepId === 'stepId') {
      // Process suspension
      console.log('Workflow is waiting for input:', result.suspendPayload);
    }
  4. Interactive Terminal Input - Use libraries like Inquirer to create interactive prompts:

    import { select, input, confirm } from '@inquirer/prompts';
     
    // When the workflow is suspended
    if (result.status === 'suspended') {
      // Display information from the suspend payload
      console.log(result.suspendPayload.message);
     
      // Collect user input interactively
      const decision = await select({
        message: 'What would you like to do?',
        choices: [
          { name: 'Approve', value: 'approve' },
          { name: 'Reject', value: 'reject' }
        ]
      });
     
      // Resume the workflow with the collected input
      await run.resume({
        stepId: result.suspendedStepId,
        context: { decision }
      });
    }
  5. Resuming Workflow - Use the resume() method to continue workflow execution with human input:

    const resumeResult = await run.resume({
      stepId: 'suspendedStepId',
      context: {
        // This data is passed to the suspended step as context.inputData
        // and must conform to the step's inputSchema
        userDecision: 'approve'
      },
    });
  6. Input Schema for Human Data - Define an input schema on steps that might be resumed with human input to ensure type safety:

    const myStep = new Step({
      id: 'myStep',
      inputSchema: z.object({
        // This schema validates the data passed in resume's context
        // and makes it available as context.inputData
        userDecision: z.enum(['approve', 'reject']),
        userComments: z.string().optional(),
      }),
      execute: async ({ context, suspend }) => {
        // Check if we have user input from a previous suspension
        if (context.inputData?.userDecision) {
          // Process the user's decision
          return { result: `User decided: ${context.inputData.userDecision}` };
        }
     
        // If no input, suspend for human decision
        await suspend();
      }
    });

Human-in-the-loop workflows are powerful for building systems that blend automation with human judgment, such as:

  • Content moderation systems
  • Approval workflows
  • Supervised AI systems
  • Customer service automation with escalation





View Example on GitHub