Parallel Execution with Steps
When building AI applications, you often need to process multiple independent tasks simultaneously to improve efficiency.
We make this functionality a core part of workflows through the .parallel
method.
Define Planning Agent
Define a planning agent which leverages an LLM call to plan activities given a location and corresponding weather conditions.
import { Agent } from '@mastra/core/agent'
import { openai } from '@ai-sdk/openai'
const llm = openai('gpt-4o')
const planningAgent = new Agent({
name: 'planningAgent',
model: llm,
instructions: `
You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations.
📅 [Day, Month Date, Year]
═══════════════════════════
🌡️ WEATHER SUMMARY
• Conditions: [brief description]
• Temperature: [X°C/Y°F to A°C/B°F]
• Precipitation: [X% chance]
🌅 MORNING ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🌞 AFTERNOON ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🏠 INDOOR ALTERNATIVES
• [Activity Name] - [Brief description including specific venue]
Ideal for: [weather condition that would trigger this alternative]
⚠️ SPECIAL CONSIDERATIONS
• [Any relevant weather warnings, UV index, wind conditions, etc.]
Guidelines:
- Suggest 2-3 time-specific outdoor activities per day
- Include 1-2 indoor backup options
- For precipitation >50%, lead with indoor activities
- All activities must be specific to the location
- Include specific venues, trails, or locations
- Consider activity intensity based on temperature
- Keep descriptions concise but informative
Maintain this exact formatting for consistency, using the emoji and section headers as shown.
`,
})
export { planningAgent }
Define Synthesize Agent
Define a synthesize agent which takes planned indoor and outdoor activities and provides a full report on the day.
import { Agent } from '@mastra/core/agent'
import { openai } from '@ai-sdk/openai'
const llm = openai('gpt-4o')
const synthesizeAgent = new Agent({
name: 'synthesizeAgent',
model: llm,
instructions: `
You are given two different blocks of text, one about indoor activities and one about outdoor activities.
Make this into a full report about the day and the possibilities depending on whether it rains or not.
`,
})
export { synthesizeAgent }
Define Parallel Workflow
Here, we’ll define a workflow which orchestrates a parallel -> sequential flow between the planning steps and the synthesize step.
import { Step, Workflow } from '@mastra/core/workflows'
import { z } from 'zod'
import { activityPlannerAgent } from '../agents'
import { createStep, createWorkflow } from '@mastra/core/workflows/vNext'
const forecastSchema = z.object({
date: z.string(),
maxTemp: z.number(),
minTemp: z.number(),
precipitationChance: z.number(),
condition: z.string(),
location: z.string(),
})
const fetchWeather = createStep({
id: 'fetch-weather',
description: 'Fetches weather forecast for a given city',
inputSchema: z.object({
city: z.string(),
}),
outputSchema: forecastSchema,
execute: async ({ inputData }) => {
if (!inputData) {
throw new Error('Trigger data not found')
}
const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1`
const geocodingResponse = await fetch(geocodingUrl)
const geocodingData = (await geocodingResponse.json()) as {
results: { latitude: number; longitude: number; name: string }[]
}
if (!geocodingData.results?.[0]) {
throw new Error(`Location '${inputData.city}' not found`)
}
const { latitude, longitude, name } = geocodingData.results[0]
const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}¤t=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m`
const response = await fetch(weatherUrl)
const data = (await response.json()) as {
current: {
time: string
precipitation: number
weathercode: number
}
hourly: {
precipitation_probability: number[]
temperature_2m: number[]
}
}
const forecast = {
date: new Date().toISOString(),
maxTemp: Math.max(...data.hourly.temperature_2m),
minTemp: Math.min(...data.hourly.temperature_2m),
condition: getWeatherCondition(data.current.weathercode),
location: name,
precipitationChance: data.hourly.precipitation_probability.reduce(
(acc, curr) => Math.max(acc, curr),
0
),
}
return forecast
},
})
const planActivities = createStep({
id: 'plan-activities',
description: 'Suggests activities based on weather conditions',
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
console.log('mastra', mastra)
console.log('planActivities', inputData)
const forecast = inputData
if (!forecast) {
throw new Error('Forecast data not found')
}
const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities:
${JSON.stringify(forecast, null, 2)}
`
const agent = mastra?.getAgent('planningAgent')
if (!agent) {
throw new Error('Planning agent not found')
}
const response = await agent.stream([
{
role: 'user',
content: prompt,
},
])
let activitiesText = ''
for await (const chunk of response.textStream) {
process.stdout.write(chunk)
activitiesText += chunk
}
console.log('planActivities', activitiesText)
return {
activities: activitiesText,
}
},
})
function getWeatherCondition(code: number): string {
const conditions: Record<number, string> = {
0: 'Clear sky',
1: 'Mainly clear',
2: 'Partly cloudy',
3: 'Overcast',
45: 'Foggy',
48: 'Depositing rime fog',
51: 'Light drizzle',
53: 'Moderate drizzle',
55: 'Dense drizzle',
61: 'Slight rain',
63: 'Moderate rain',
65: 'Heavy rain',
71: 'Slight snow fall',
73: 'Moderate snow fall',
75: 'Heavy snow fall',
95: 'Thunderstorm',
}
return conditions[code] || 'Unknown'
}
const planIndoorActivities = createStep({
id: 'plan-indoor-activities',
description: 'Suggests indoor activities based on weather conditions',
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
console.log('planIndoorActivities', inputData)
const forecast = inputData
if (!forecast) {
throw new Error('Forecast data not found')
}
const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}`
const agent = mastra?.getAgent('planningAgent')
if (!agent) {
throw new Error('Planning agent not found')
}
const response = await agent.stream([
{
role: 'user',
content: prompt,
},
])
let activitiesText = ''
for await (const chunk of response.textStream) {
activitiesText += chunk
}
console.log('planIndoorActivities', activitiesText)
return {
activities: activitiesText,
}
},
})
const sythesizeStep = createStep({
id: 'sythesize-step',
description: 'Synthesizes the results of the indoor and outdoor activities',
inputSchema: z.object({
'plan-activities': z.object({
activities: z.string(),
}),
'plan-indoor-activities': z.object({
activities: z.string(),
}),
}),
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
console.log('sythesizeStep', inputData)
const indoorActivities = inputData?.['plan-indoor-activities']
const outdoorActivities = inputData?.['plan-activities']
const prompt = `Indoor activtities:
${indoorActivities?.activities}
Outdoor activities:
${outdoorActivities?.activities}
There is a chance of rain so be prepared to do indoor activities if needed.`
const agent = mastra?.getAgent('synthesizeAgent')
if (!agent) {
throw new Error('Planning agent not found')
}
const response = await agent.stream([
{
role: 'user',
content: prompt,
},
])
let activitiesText = ''
for await (const chunk of response.textStream) {
process.stdout.write(chunk)
activitiesText += chunk
}
return {
activities: activitiesText,
}
},
})
const weatherWorkflow = createWorkflow({
id: 'plan-both-workflow',
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
steps: [planActivities, planIndoorActivities, sythesizeStep],
})
// run `planActivities` and `planIndoorActivities` in parallel
// `synthesizeStep` waits for both steps to be completed before executing.
.parallel([planActivities, planIndoorActivities])
.then(sythesizeStep)
.commit()
export { weatherWorkflow }
Register Agent and Workflow instances with Mastra class
Register the agents and workflow with the mastra instance. This is critical for enabling access to the agents within the workflow.
import { Mastra } from '@mastra/core/mastra'
import { createLogger } from '@mastra/core/logger'
import { weatherWorkflow } from './workflows'
import { planningAgent, synthesizeAgent } from './agents'
const mastra = new Mastra({
vnext_workflows: {
weatherWorkflow,
},
agents: {
planningAgent,
synthesizeAgent
},
logger: createLogger({
name: 'Mastra',
level: 'info',
}),
})
export { mastra }
Execute the weather workflow
Here, we’ll get the weather workflow from the mastra instance, then create a run and execute the created run with the required inputData.
import { mastra } from "./"
const workflow = mastra.vnext_getWorkflow('weatherWorkflow')
const run = workflow.createRun()
const result = await run.start({ inputData: { city: 'Ibiza' } })
console.dir(result, { depth: null })