Skip to Content

Parallel Execution with Steps

When building AI applications, you often need to process multiple independent tasks simultaneously to improve efficiency. We make this functionality a core part of workflows through the .parallel method.

Define Planning Agent

Define a planning agent which leverages an LLM call to plan activities given a location and corresponding weather conditions.

agents/planning-agent.ts
import { Agent } from '@mastra/core/agent' import { openai } from '@ai-sdk/openai' const llm = openai('gpt-4o') const planningAgent = new Agent({ name: 'planningAgent', model: llm, instructions: ` You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations. 📅 [Day, Month Date, Year] ═══════════════════════════ 🌡️ WEATHER SUMMARY • Conditions: [brief description] • Temperature: [X°C/Y°F to A°C/B°F] • Precipitation: [X% chance] 🌅 MORNING ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🌞 AFTERNOON ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🏠 INDOOR ALTERNATIVES • [Activity Name] - [Brief description including specific venue] Ideal for: [weather condition that would trigger this alternative] ⚠️ SPECIAL CONSIDERATIONS • [Any relevant weather warnings, UV index, wind conditions, etc.] Guidelines: - Suggest 2-3 time-specific outdoor activities per day - Include 1-2 indoor backup options - For precipitation >50%, lead with indoor activities - All activities must be specific to the location - Include specific venues, trails, or locations - Consider activity intensity based on temperature - Keep descriptions concise but informative Maintain this exact formatting for consistency, using the emoji and section headers as shown. `, }) export { planningAgent }

Define Synthesize Agent

Define a synthesize agent which takes planned indoor and outdoor activities and provides a full report on the day.

agents/synthesize-agent.ts
import { Agent } from '@mastra/core/agent' import { openai } from '@ai-sdk/openai' const llm = openai('gpt-4o') const synthesizeAgent = new Agent({ name: 'synthesizeAgent', model: llm, instructions: ` You are given two different blocks of text, one about indoor activities and one about outdoor activities. Make this into a full report about the day and the possibilities depending on whether it rains or not. `, }) export { synthesizeAgent }

Define Parallel Workflow

Here, we’ll define a workflow which orchestrates a parallel -> sequential flow between the planning steps and the synthesize step.

workflows/parallel-workflow.ts
import { Step, Workflow } from '@mastra/core/workflows' import { z } from 'zod' import { activityPlannerAgent } from '../agents' import { createStep, createWorkflow } from '@mastra/core/workflows/vNext' const forecastSchema = z.object({ date: z.string(), maxTemp: z.number(), minTemp: z.number(), precipitationChance: z.number(), condition: z.string(), location: z.string(), }) const fetchWeather = createStep({ id: 'fetch-weather', description: 'Fetches weather forecast for a given city', inputSchema: z.object({ city: z.string(), }), outputSchema: forecastSchema, execute: async ({ inputData }) => { if (!inputData) { throw new Error('Trigger data not found') } const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1` const geocodingResponse = await fetch(geocodingUrl) const geocodingData = (await geocodingResponse.json()) as { results: { latitude: number; longitude: number; name: string }[] } if (!geocodingData.results?.[0]) { throw new Error(`Location '${inputData.city}' not found`) } const { latitude, longitude, name } = geocodingData.results[0] const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}&current=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m` const response = await fetch(weatherUrl) const data = (await response.json()) as { current: { time: string precipitation: number weathercode: number } hourly: { precipitation_probability: number[] temperature_2m: number[] } } const forecast = { date: new Date().toISOString(), maxTemp: Math.max(...data.hourly.temperature_2m), minTemp: Math.min(...data.hourly.temperature_2m), condition: getWeatherCondition(data.current.weathercode), location: name, precipitationChance: data.hourly.precipitation_probability.reduce( (acc, curr) => Math.max(acc, curr), 0 ), } return forecast }, }) const planActivities = createStep({ id: 'plan-activities', description: 'Suggests activities based on weather conditions', inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { console.log('mastra', mastra) console.log('planActivities', inputData) const forecast = inputData if (!forecast) { throw new Error('Forecast data not found') } const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities: ${JSON.stringify(forecast, null, 2)} ` const agent = mastra?.getAgent('planningAgent') if (!agent) { throw new Error('Planning agent not found') } const response = await agent.stream([ { role: 'user', content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { process.stdout.write(chunk) activitiesText += chunk } console.log('planActivities', activitiesText) return { activities: activitiesText, } }, }) function getWeatherCondition(code: number): string { const conditions: Record<number, string> = { 0: 'Clear sky', 1: 'Mainly clear', 2: 'Partly cloudy', 3: 'Overcast', 45: 'Foggy', 48: 'Depositing rime fog', 51: 'Light drizzle', 53: 'Moderate drizzle', 55: 'Dense drizzle', 61: 'Slight rain', 63: 'Moderate rain', 65: 'Heavy rain', 71: 'Slight snow fall', 73: 'Moderate snow fall', 75: 'Heavy snow fall', 95: 'Thunderstorm', } return conditions[code] || 'Unknown' } const planIndoorActivities = createStep({ id: 'plan-indoor-activities', description: 'Suggests indoor activities based on weather conditions', inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { console.log('planIndoorActivities', inputData) const forecast = inputData if (!forecast) { throw new Error('Forecast data not found') } const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}` const agent = mastra?.getAgent('planningAgent') if (!agent) { throw new Error('Planning agent not found') } const response = await agent.stream([ { role: 'user', content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { activitiesText += chunk } console.log('planIndoorActivities', activitiesText) return { activities: activitiesText, } }, }) const sythesizeStep = createStep({ id: 'sythesize-step', description: 'Synthesizes the results of the indoor and outdoor activities', inputSchema: z.object({ 'plan-activities': z.object({ activities: z.string(), }), 'plan-indoor-activities': z.object({ activities: z.string(), }), }), outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { console.log('sythesizeStep', inputData) const indoorActivities = inputData?.['plan-indoor-activities'] const outdoorActivities = inputData?.['plan-activities'] const prompt = `Indoor activtities: ${indoorActivities?.activities} Outdoor activities: ${outdoorActivities?.activities} There is a chance of rain so be prepared to do indoor activities if needed.` const agent = mastra?.getAgent('synthesizeAgent') if (!agent) { throw new Error('Planning agent not found') } const response = await agent.stream([ { role: 'user', content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { process.stdout.write(chunk) activitiesText += chunk } return { activities: activitiesText, } }, }) const weatherWorkflow = createWorkflow({ id: 'plan-both-workflow', inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), steps: [planActivities, planIndoorActivities, sythesizeStep], }) // run `planActivities` and `planIndoorActivities` in parallel // `synthesizeStep` waits for both steps to be completed before executing. .parallel([planActivities, planIndoorActivities]) .then(sythesizeStep) .commit() export { weatherWorkflow }

Register Agent and Workflow instances with Mastra class

Register the agents and workflow with the mastra instance. This is critical for enabling access to the agents within the workflow.

index.ts
import { Mastra } from '@mastra/core/mastra' import { createLogger } from '@mastra/core/logger' import { weatherWorkflow } from './workflows' import { planningAgent, synthesizeAgent } from './agents' const mastra = new Mastra({ vnext_workflows: { weatherWorkflow, }, agents: { planningAgent, synthesizeAgent }, logger: createLogger({ name: 'Mastra', level: 'info', }), }) export { mastra }

Execute the weather workflow

Here, we’ll get the weather workflow from the mastra instance, then create a run and execute the created run with the required inputData.

exec.ts
import { mastra } from "./" const workflow = mastra.vnext_getWorkflow('weatherWorkflow') const run = workflow.createRun() const result = await run.start({ inputData: { city: 'Ibiza' } }) console.dir(result, { depth: null })





View Example on GitHub