Human in the Loop Workflow
Human-in-the-loop workflows allow you to pause execution at specific points to collect user input, make decisions, or perform actions that require human judgment. This example demonstrates how to create a workflow with human intervention points.
Define Agents
Define the travel agents.
import { Agent } from '@mastra/core/agent'
import { openai } from '@ai-sdk/openai'
const llm = openai('gpt-4o')
export const summaryAgent = new Agent({
name: 'summaryTravelAgent',
model: llm,
instructions: `
You are a travel agent who is given a user prompt about what kind of holiday they want to go on.
You then generate 3 different options for the holiday. Return the suggestions as a JSON array {"location": "string", "description": "string"}[]. Don't format as markdown.
Make the options as different as possible from each other.
Also make the plan very short and summarized.
`,
})
export const travelAgent = new Agent({
name: 'travelAgent',
model: llm,
instructions: `
You are a travel agent who is given a user prompt about what kind of holiday they want to go on. A summary of the plan is provided as well as the location.
You then generate a detailed travel plan for the holiday.
`,
})
Define Suspendable workflow
Defines a workflow which includes a suspending step: humanInputStep
.
import { createWorkflow, createStep } from '@mastra/core/workflows/vNext'
import { z } from 'zod'
const generateSuggestionsStep = createStep({
id: 'generate-suggestions',
inputSchema: z.object({
vacationDescription: z.string().describe('The description of the vacation'),
}),
outputSchema: z.object({
suggestions: z.array(z.string()),
vacationDescription: z.string(),
}),
execute: async ({ inputData, mastra, container }) => {
if (!mastra) {
throw new Error('Mastra is not initialized')
}
const { vacationDescription } = inputData
const result = await mastra.getAgent('summaryTravelAgent').generate([
{
role: 'user',
content: vacationDescription,
},
])
console.log(result.text)
return { suggestions: JSON.parse(result.text), vacationDescription }
},
})
const humanInputStep = createStep({
id: 'human-input',
inputSchema: z.object({
suggestions: z.array(z.string()),
vacationDescription: z.string(),
}),
outputSchema: z.object({
selection: z.string().describe('The selection of the user'),
vacationDescription: z.string(),
}),
resumeSchema: z.object({
selection: z.string().describe('The selection of the user'),
}),
suspendSchema: z.object({
suggestions: z.array(z.string()),
}),
execute: async ({ inputData, resumeData, suspend, getInitData }) => {
if (!resumeData?.selection) {
await suspend({ suggestions: inputData?.suggestions })
return {
selection: '',
vacationDescription: inputData?.vacationDescription,
}
}
return {
selection: resumeData?.selection,
vacationDescription: inputData?.vacationDescription,
}
},
})
const travelPlannerStep = createStep({
id: 'travel-planner',
inputSchema: z.object({
selection: z.string().describe('The selection of the user'),
vacationDescription: z.string(),
}),
outputSchema: z.object({
travelPlan: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const travelAgent = mastra?.getAgent('travelAgent')
if (!travelAgent) {
throw new Error('Travel agent is not initialized')
}
const { selection, vacationDescription } = inputData
const result = await travelAgent.generate([
{ role: 'assistant', content: vacationDescription },
{ role: 'user', content: selection || '' },
])
console.log(result.text)
return { travelPlan: result.text }
},
})
const weatherWorkflow = createWorkflow({
id: 'weather-workflow',
inputSchema: z.object({
vacationDescription: z.string().describe('The description of the vacation'),
}),
outputSchema: z.object({
travelPlan: z.string(),
}),
})
.then(generateSuggestionsStep)
.then(humanInputStep)
.then(travelPlannerStep)
travelAgentWorkflow.commit()
export { weatherWorkflow, humanInputStep }
Register Agent and Workflow instances with Mastra class
Register the agents and the weather workflow with the mastra instance. This is critical for enabling access to the agents within the workflow.
import { Mastra } from '@mastra/core/mastra'
import { createLogger } from '@mastra/core/logger'
import { weatherWorkflow } from './workflows'
import { travelAgent, summaryAgent } from './agents'
const mastra = new Mastra({
vnext_workflows: {
weatherWorkflow,
},
agents: {
travelAgent,
summaryAgent
},
logger: createLogger({
name: 'Mastra',
level: 'info',
}),
})
export { mastra }
Execute the suspendable weather workflow
Here, we’ll get the weather workflow from the mastra instance, then create a run and execute the created run with the required inputData.
In addition to this, we’ll resume the humanInputStep
after collecting user input with the readline package.
import { mastra } from "./"
import { createInterface } from 'readline'
import { humanInputStep } from './workflows'
const workflow = mastra.vnext_getWorkflow('weather-workflow')
const run = workflow.createRun({})
const result = await run.start({
inputData: { vacationDescription: 'I want to go to the beach' },
})
console.log('result', result)
const suggStep = result?.steps?.['generate-suggestions']
if (suggStep.status === 'success') {
const suggestions = suggStep.output?.suggestions
console.log(
suggestions
.map(({ location, description }) => `- ${location}: ${description}`)
.join('\n')
)
const userInput = await readInput()
console.log('Selected:', userInput)
console.log('resuming from', result, 'with', {
inputData: {
selection: userInput,
vacationDescription: 'I want to go to the beach',
suggestions: suggStep?.output?.suggestions,
},
step: humanInputStep,
})
const result2 = await run.resume({
resumeData: {
selection: userInput,
vacationDescription: 'I want to go to the beach',
suggestions: suggStep?.output?.suggestions,
},
step: humanInputStep,
})
console.dir(result2, { depth: null })
}
Human-in-the-loop workflows are powerful for building systems that blend automation with human judgment, such as:
- Content moderation systems
- Approval workflows
- Supervised AI systems
- Customer service automation with escalation