Skip to Content
ExamplesWorkflowsHuman in the Loop

Human in the Loop Workflow

Human-in-the-loop workflows allow you to pause execution at specific points to collect user input, make decisions, or perform actions that require human judgment. This example demonstrates how to create a workflow with human intervention points.

How It Works

  1. A workflow step can suspend execution using the suspend() function, optionally passing a payload with context for the human decision maker.
  2. When the workflow is resumed, the human input is passed in the context parameter of the resume() call.
  3. This input becomes available in the step’s execution context as context.inputData, which is typed according to the step’s inputSchema.
  4. The step can then continue execution based on the human input.

This pattern allows for safe, type-checked human intervention in automated workflows.

Interactive Terminal Example Using Inquirer

This example demonstrates how to use the Inquirer library to collect user input directly from the terminal when a workflow is suspended, creating a truly interactive human-in-the-loop experience.

import { Mastra } from '@mastra/core'; import { Step, Workflow } from '@mastra/core/workflows'; import { z } from 'zod'; import { confirm, input, select } from '@inquirer/prompts'; // Step 1: Generate product recommendations const generateRecommendations = new Step({ id: 'generateRecommendations', outputSchema: z.object({ customerName: z.string(), recommendations: z.array( z.object({ productId: z.string(), productName: z.string(), price: z.number(), description: z.string(), }), ), }), execute: async ({ context }) => { const customerName = context.triggerData.customerName; // In a real application, you might call an API or ML model here // For this example, we'll return mock data return { customerName, recommendations: [ { productId: 'prod-001', productName: 'Premium Widget', price: 99.99, description: 'Our best-selling premium widget with advanced features', }, { productId: 'prod-002', productName: 'Basic Widget', price: 49.99, description: 'Affordable entry-level widget for beginners', }, { productId: 'prod-003', productName: 'Widget Pro Plus', price: 149.99, description: 'Professional-grade widget with extended warranty', }, ], }; }, }); // Step 2: Get human approval and customization for the recommendations const reviewRecommendations = new Step({ id: 'reviewRecommendations', inputSchema: z.object({ approvedProducts: z.array(z.string()), customerNote: z.string().optional(), offerDiscount: z.boolean().optional(), }), outputSchema: z.object({ finalRecommendations: z.array( z.object({ productId: z.string(), productName: z.string(), price: z.number(), }), ), customerNote: z.string().optional(), offerDiscount: z.boolean(), }), execute: async ({ context, suspend }) => { const { customerName, recommendations } = context.getStepResult(generateRecommendations) || { customerName: '', recommendations: [], }; // Check if we have input from a resumed workflow const reviewInput = { approvedProducts: context.inputData?.approvedProducts || [], customerNote: context.inputData?.customerNote, offerDiscount: context.inputData?.offerDiscount, }; // If we don't have agent input yet, suspend for human review if (!reviewInput.approvedProducts.length) { console.log(`Generating recommendations for customer: ${customerName}`); await suspend({ customerName, recommendations, message: 'Please review these product recommendations before sending to the customer', }); // Placeholder return (won't be reached due to suspend) return { finalRecommendations: [], customerNote: '', offerDiscount: false, }; } // Process the agent's product selections const finalRecommendations = recommendations .filter(product => reviewInput.approvedProducts.includes(product.productId)) .map(product => ({ productId: product.productId, productName: product.productName, price: product.price, })); return { finalRecommendations, customerNote: reviewInput.customerNote || '', offerDiscount: reviewInput.offerDiscount || false, }; }, }); // Step 3: Send the recommendations to the customer const sendRecommendations = new Step({ id: 'sendRecommendations', outputSchema: z.object({ emailSent: z.boolean(), emailContent: z.string(), }), execute: async ({ context }) => { const { customerName } = context.getStepResult(generateRecommendations) || { customerName: '' }; const { finalRecommendations, customerNote, offerDiscount } = context.getStepResult(reviewRecommendations) || { finalRecommendations: [], customerNote: '', offerDiscount: false, }; // Generate email content based on the recommendations let emailContent = `Dear ${customerName},\n\nBased on your preferences, we recommend:\n\n`; finalRecommendations.forEach(product => { emailContent += `- ${product.productName}: $${product.price.toFixed(2)}\n`; }); if (offerDiscount) { emailContent += '\nAs a valued customer, use code SAVE10 for 10% off your next purchase!\n'; } if (customerNote) { emailContent += `\nPersonal note: ${customerNote}\n`; } emailContent += '\nThank you for your business,\nThe Sales Team'; // In a real application, you would send this email console.log('Email content generated:', emailContent); return { emailSent: true, emailContent, }; }, }); // Build the workflow const recommendationWorkflow = new Workflow({ name: 'product-recommendation-workflow', triggerSchema: z.object({ customerName: z.string(), }), }); recommendationWorkflow .step(generateRecommendations) .then(reviewRecommendations) .then(sendRecommendations) .commit(); // Register the workflow const mastra = new Mastra({ workflows: { recommendationWorkflow }, }); // Example of using the workflow with Inquirer prompts async function runRecommendationWorkflow() { const registeredWorkflow = mastra.getWorkflow('recommendationWorkflow'); const run = registeredWorkflow.createRun(); console.log('Starting product recommendation workflow...'); const result = await run.start({ triggerData: { customerName: 'Jane Smith', }, }); const isReviewStepSuspended = result.activePaths.get('reviewRecommendations')?.status === 'suspended'; // Check if workflow is suspended for human review if (isReviewStepSuspended) { const { customerName, recommendations, message } = result.activePaths.get('reviewRecommendations')?.suspendPayload; console.log('\n==================================='); console.log(message); console.log(`Customer: ${customerName}`); console.log('===================================\n'); // Use Inquirer to collect input from the sales agent in the terminal console.log('Available product recommendations:'); recommendations.forEach((product, index) => { console.log(`${index + 1}. ${product.productName} - $${product.price.toFixed(2)}`); console.log(` ${product.description}\n`); }); // Let the agent select which products to recommend const approvedProducts = await checkbox({ message: 'Select products to recommend to the customer:', choices: recommendations.map(product => ({ name: `${product.productName} ($${product.price.toFixed(2)})`, value: product.productId, })), }); // Let the agent add a personal note const includeNote = await confirm({ message: 'Would you like to add a personal note?', default: false, }); let customerNote = ''; if (includeNote) { customerNote = await input({ message: 'Enter your personalized note for the customer:', }); } // Ask if a discount should be offered const offerDiscount = await confirm({ message: 'Offer a 10% discount to this customer?', default: false, }); console.log('\nSubmitting your review...'); // Resume the workflow with the agent's input const resumeResult = await run.resume({ stepId: 'reviewRecommendations', context: { approvedProducts, customerNote, offerDiscount, }, }); console.log('\n==================================='); console.log('Workflow completed!'); console.log('Email content:'); console.log('===================================\n'); console.log(resumeResult?.results?.sendRecommendations || 'No email content generated'); return resumeResult; } return result; } // Invoke the workflow with interactive terminal input runRecommendationWorkflow().catch(console.error);

Advanced Example with Multiple User Inputs

This example demonstrates a more complex workflow that requires multiple human intervention points, such as in a content moderation system.

import { Mastra } from '@mastra/core'; import { Step, Workflow } from '@mastra/core/workflows'; import { z } from 'zod'; import { select, input } from '@inquirer/prompts'; // Step 1: Receive and analyze content const analyzeContent = new Step({ id: 'analyzeContent', outputSchema: z.object({ content: z.string(), aiAnalysisScore: z.number(), flaggedCategories: z.array(z.string()).optional(), }), execute: async ({ context }) => { const content = context.triggerData.content; // Simulate AI analysis const aiAnalysisScore = simulateContentAnalysis(content); const flaggedCategories = aiAnalysisScore < 0.7 ? ['potentially inappropriate', 'needs review'] : []; return { content, aiAnalysisScore, flaggedCategories, }; }, }); // Step 2: Moderate content that needs review const moderateContent = new Step({ id: 'moderateContent', // Define the schema for human input that will be provided when resuming inputSchema: z.object({ moderatorDecision: z.enum(['approve', 'reject', 'modify']).optional(), moderatorNotes: z.string().optional(), modifiedContent: z.string().optional(), }), outputSchema: z.object({ moderationResult: z.enum(['approved', 'rejected', 'modified']), moderatedContent: z.string(), notes: z.string().optional(), }), // @ts-ignore execute: async ({ context, suspend }) => { const analysisResult = context.getStepResult(analyzeContent); // Access the input provided when resuming the workflow const moderatorInput = { decision: context.inputData?.moderatorDecision, notes: context.inputData?.moderatorNotes, modifiedContent: context.inputData?.modifiedContent, }; // If the AI analysis score is high enough, auto-approve if (analysisResult?.aiAnalysisScore > 0.9 && !analysisResult?.flaggedCategories?.length) { return { moderationResult: 'approved', moderatedContent: analysisResult.content, notes: 'Auto-approved by system', }; } // If we don't have moderator input yet, suspend for human review if (!moderatorInput.decision) { await suspend({ content: analysisResult?.content, aiScore: analysisResult?.aiAnalysisScore, flaggedCategories: analysisResult?.flaggedCategories, message: 'Please review this content and make a moderation decision', }); // Placeholder return return { moderationResult: 'approved', moderatedContent: '', }; } // Process the moderator's decision switch (moderatorInput.decision) { case 'approve': return { moderationResult: 'approved', moderatedContent: analysisResult?.content || '', notes: moderatorInput.notes || 'Approved by moderator', }; case 'reject': return { moderationResult: 'rejected', moderatedContent: '', notes: moderatorInput.notes || 'Rejected by moderator', }; case 'modify': return { moderationResult: 'modified', moderatedContent: moderatorInput.modifiedContent || analysisResult?.content || '', notes: moderatorInput.notes || 'Modified by moderator', }; default: return { moderationResult: 'rejected', moderatedContent: '', notes: 'Invalid moderator decision', }; } }, }); // Step 3: Apply moderation actions const applyModeration = new Step({ id: 'applyModeration', outputSchema: z.object({ finalStatus: z.string(), content: z.string().optional(), auditLog: z.object({ originalContent: z.string(), moderationResult: z.string(), aiScore: z.number(), timestamp: z.string(), }), }), execute: async ({ context }) => { const analysisResult = context.getStepResult(analyzeContent); const moderationResult = context.getStepResult(moderateContent); // Create audit log const auditLog = { originalContent: analysisResult?.content || '', moderationResult: moderationResult?.moderationResult || 'unknown', aiScore: analysisResult?.aiAnalysisScore || 0, timestamp: new Date().toISOString(), }; // Apply moderation action switch (moderationResult?.moderationResult) { case 'approved': return { finalStatus: 'Content published', content: moderationResult.moderatedContent, auditLog, }; case 'modified': return { finalStatus: 'Content modified and published', content: moderationResult.moderatedContent, auditLog, }; case 'rejected': return { finalStatus: 'Content rejected', auditLog, }; default: return { finalStatus: 'Error in moderation process', auditLog, }; } }, }); // Build the workflow const contentModerationWorkflow = new Workflow({ name: 'content-moderation-workflow', triggerSchema: z.object({ content: z.string(), }), }); contentModerationWorkflow .step(analyzeContent) .then(moderateContent) .then(applyModeration) .commit(); // Register the workflow const mastra = new Mastra({ workflows: { contentModerationWorkflow }, }); // Example of using the workflow with Inquirer prompts async function runModerationDemo() { const registeredWorkflow = mastra.getWorkflow('contentModerationWorkflow'); const run = registeredWorkflow.createRun(); // Start the workflow with content that needs review console.log('Starting content moderation workflow...'); const result = await run.start({ triggerData: { content: 'This is some user-generated content that requires moderation.' } }); const isReviewStepSuspended = result.activePaths.get('moderateContent')?.status === 'suspended'; // Check if workflow is suspended if (isReviewStepSuspended) { const { content, aiScore, flaggedCategories, message } = result.activePaths.get('moderateContent')?.suspendPayload; console.log('\n==================================='); console.log(message); console.log('===================================\n'); console.log('Content to review:'); console.log(content); console.log(`\nAI Analysis Score: ${aiScore}`); console.log(`Flagged Categories: ${flaggedCategories?.join(', ') || 'None'}\n`); // Collect moderator decision using Inquirer const moderatorDecision = await select({ message: 'Select your moderation decision:', choices: [ { name: 'Approve content as is', value: 'approve' }, { name: 'Reject content completely', value: 'reject' }, { name: 'Modify content before publishing', value: 'modify' } ], }); // Collect additional information based on decision let moderatorNotes = ''; let modifiedContent = ''; moderatorNotes = await input({ message: 'Enter any notes about your decision:', }); if (moderatorDecision === 'modify') { modifiedContent = await input({ message: 'Enter the modified content:', default: content, }); } console.log('\nSubmitting your moderation decision...'); // Resume the workflow with the moderator's input const resumeResult = await run.resume({ stepId: 'moderateContent', context: { moderatorDecision, moderatorNotes, modifiedContent, }, }); if (resumeResult?.results?.applyModeration?.status === 'success') { console.log('\n==================================='); console.log(`Moderation complete: ${resumeResult?.results?.applyModeration?.output.finalStatus}`); console.log('===================================\n'); if (resumeResult?.results?.applyModeration?.output.content) { console.log('Published content:'); console.log(resumeResult.results.applyModeration.output.content); } } return resumeResult; } console.log('Workflow completed without requiring human intervention:', result.results); return result; } // Helper function for AI content analysis simulation function simulateContentAnalysis(content: string): number { // In a real application, this would call an AI service // For the example, we're returning a random score return Math.random(); } // Invoke the demo function runModerationDemo().catch(console.error);

Key Concepts

  1. Suspension Points - Use the suspend() function within a step’s execute to pause workflow execution.

  2. Suspension Payload - Pass relevant data when suspending to provide context for human decision-making:

    await suspend({ messageForHuman: 'Please review this data', data: someImportantData });
  3. Checking Workflow Status - After starting a workflow, check the returned status to see if it’s suspended:

    const result = await workflow.start({ triggerData }); if (result.status === 'suspended' && result.suspendedStepId === 'stepId') { // Process suspension console.log('Workflow is waiting for input:', result.suspendPayload); }
  4. Interactive Terminal Input - Use libraries like Inquirer to create interactive prompts:

    import { select, input, confirm } from '@inquirer/prompts'; // When the workflow is suspended if (result.status === 'suspended') { // Display information from the suspend payload console.log(result.suspendPayload.message); // Collect user input interactively const decision = await select({ message: 'What would you like to do?', choices: [ { name: 'Approve', value: 'approve' }, { name: 'Reject', value: 'reject' } ] }); // Resume the workflow with the collected input await run.resume({ stepId: result.suspendedStepId, context: { decision } }); }
  5. Resuming Workflow - Use the resume() method to continue workflow execution with human input:

    const resumeResult = await run.resume({ stepId: 'suspendedStepId', context: { // This data is passed to the suspended step as context.inputData // and must conform to the step's inputSchema userDecision: 'approve' }, });
  6. Input Schema for Human Data - Define an input schema on steps that might be resumed with human input to ensure type safety:

    const myStep = new Step({ id: 'myStep', inputSchema: z.object({ // This schema validates the data passed in resume's context // and makes it available as context.inputData userDecision: z.enum(['approve', 'reject']), userComments: z.string().optional(), }), execute: async ({ context, suspend }) => { // Check if we have user input from a previous suspension if (context.inputData?.userDecision) { // Process the user's decision return { result: `User decided: ${context.inputData.userDecision}` }; } // If no input, suspend for human decision await suspend(); } });

Human-in-the-loop workflows are powerful for building systems that blend automation with human judgment, such as:

  • Content moderation systems
  • Approval workflows
  • Supervised AI systems
  • Customer service automation with escalation





View Example on GitHub