Skip to Content

Parallel Execution with Steps

When building AI applications, you often need to process multiple independent tasks simultaneously to improve efficiency. We make this functionality a core part of workflows through the .parallel method.

Setup

npm install @ai-sdk/openai @mastra/core

Define Planning Agent

Define a planning agent which leverages an LLM call to plan activities given a location and corresponding weather conditions.

agents/planning-agent.ts
import { Agent } from "@mastra/core/agent"; import { openai } from "@ai-sdk/openai"; const llm = openai("gpt-4o"); // Define the planning agent with specific instructions for formatting // and structuring weather-based activity recommendations const planningAgent = new Agent({ name: "planningAgent", model: llm, instructions: ` You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations. 📅 [Day, Month Date, Year] ═══════════════════════════ 🌡️ WEATHER SUMMARY • Conditions: [brief description] • Temperature: [X°C/Y°F to A°C/B°F] • Precipitation: [X% chance] 🌅 MORNING ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🌞 AFTERNOON ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🏠 INDOOR ALTERNATIVES • [Activity Name] - [Brief description including specific venue] Ideal for: [weather condition that would trigger this alternative] ⚠️ SPECIAL CONSIDERATIONS • [Any relevant weather warnings, UV index, wind conditions, etc.] Guidelines: - Suggest 2-3 time-specific outdoor activities per day - Include 1-2 indoor backup options - For precipitation >50%, lead with indoor activities - All activities must be specific to the location - Include specific venues, trails, or locations - Consider activity intensity based on temperature - Keep descriptions concise but informative Maintain this exact formatting for consistency, using the emoji and section headers as shown. `, }); export { planningAgent };

Define Synthesize Agent

Define a synthesize agent which takes planned indoor and outdoor activities and provides a full report on the day.

agents/synthesize-agent.ts
import { Agent } from "@mastra/core/agent"; import { openai } from "@ai-sdk/openai"; const llm = openai("gpt-4o"); // Define the synthesize agent that combines indoor and outdoor activity plans // into a comprehensive report, considering weather conditions and alternatives const synthesizeAgent = new Agent({ name: "synthesizeAgent", model: llm, instructions: ` You are given two different blocks of text, one about indoor activities and one about outdoor activities. Make this into a full report about the day and the possibilities depending on whether it rains or not. `, }); export { synthesizeAgent };

Define Parallel Workflow

Here, we’ll define a workflow which orchestrates a parallel -> sequential flow between the planning steps and the synthesize step.

workflows/parallel-workflow.ts
import { z } from 'zod' import { createStep, createWorkflow } from '@mastra/core/workflows/vNext' const forecastSchema = z.object({ date: z.string(), maxTemp: z.number(), minTemp: z.number(), precipitationChance: z.number(), condition: z.string(), location: z.string(), }); // Step to fetch weather data for a given city // Makes API calls to get current weather conditions and forecast const fetchWeather = createStep({ id: "fetch-weather", description: "Fetches weather forecast for a given city", inputSchema: z.object({ city: z.string(), }), outputSchema: forecastSchema, execute: async ({ inputData }) => { if (!inputData) { throw new Error("Trigger data not found"); } const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1` const geocodingResponse = await fetch(geocodingUrl) const geocodingData = (await geocodingResponse.json()) as { results: { latitude: number; longitude: number; name: string }[] } if (!geocodingData.results?.[0]) { throw new Error(`Location '${inputData.city}' not found`); } const { latitude, longitude, name } = geocodingData.results[0] const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}&current=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m` const response = await fetch(weatherUrl) const data = (await response.json()) as { current: { time: string; precipitation: number; weathercode: number; }; hourly: { precipitation_probability: number[] temperature_2m: number[] } } const forecast = { date: new Date().toISOString(), maxTemp: Math.max(...data.hourly.temperature_2m), minTemp: Math.min(...data.hourly.temperature_2m), condition: getWeatherCondition(data.current.weathercode), location: name, precipitationChance: data.hourly.precipitation_probability.reduce( (acc, curr) => Math.max(acc, curr), 0, ), } return forecast }, }); // Step to plan outdoor activities based on weather conditions // Uses the planning agent to generate activity recommendations const planActivities = createStep({ id: "plan-activities", description: "Suggests activities based on weather conditions", inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { const forecast = inputData if (!forecast) { throw new Error("Forecast data not found"); } const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities: ${JSON.stringify(forecast, null, 2)} ` const agent = mastra?.getAgent('planningAgent') if (!agent) { throw new Error("Planning agent not found"); } const response = await agent.stream([ { role: "user", content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { process.stdout.write(chunk); activitiesText += chunk; } return { activities: activitiesText, }; }, }) // Helper function to convert weather codes to human-readable conditions // Maps numeric codes from the weather API to descriptive strings function getWeatherCondition(code: number): string { const conditions: Record<number, string> = { 0: "Clear sky", 1: "Mainly clear", 2: "Partly cloudy", 3: "Overcast", 45: "Foggy", 48: "Depositing rime fog", 51: "Light drizzle", 53: "Moderate drizzle", 55: "Dense drizzle", 61: "Slight rain", 63: "Moderate rain", 65: "Heavy rain", 71: "Slight snow fall", 73: "Moderate snow fall", 75: "Heavy snow fall", 95: "Thunderstorm", }; return conditions[code] || "Unknown"; } // Step to plan indoor activities as backup options // Generates alternative indoor activities in case of bad weather const planIndoorActivities = createStep({ id: "plan-indoor-activities", description: "Suggests indoor activities based on weather conditions", inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { const forecast = inputData if (!forecast) { throw new Error("Forecast data not found"); } const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}` const agent = mastra?.getAgent('planningAgent') if (!agent) { throw new Error("Planning agent not found"); } const response = await agent.stream([ { role: "user", content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { activitiesText += chunk; } return { activities: activitiesText, }; }, }); // Step to synthesize and combine indoor/outdoor activity plans // Creates a comprehensive plan that considers both options const synthesizeStep = createStep({ id: 'sythesize-step', description: 'Synthesizes the results of the indoor and outdoor activities', inputSchema: z.object({ "plan-activities": z.object({ activities: z.string(), }), "plan-indoor-activities": z.object({ activities: z.string(), }), }), outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { const indoorActivities = inputData?.['plan-indoor-activities'] const outdoorActivities = inputData?.['plan-activities'] const prompt = `Indoor activities: ${indoorActivities?.activities} Outdoor activities: ${outdoorActivities?.activities} There is a chance of rain so be prepared to do indoor activities if needed.` const agent = mastra?.getAgent('synthesizeAgent') if (!agent) { throw new Error('Synthesize agent not found') } const response = await agent.stream([ { role: "user", content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { process.stdout.write(chunk); activitiesText += chunk; } return { activities: activitiesText, }; }, }) const activityPlanningWorkflow = createWorkflow({ id: 'plan-both-workflow', inputSchema: z.object({ city: z.string(), }), outputSchema: z.object({ activities: z.string(), }), steps: [fetchWeather, planActivities, planIndoorActivities, synthesizeStep] }) .then(fetchWeather) .parallel([planActivities, planIndoorActivities]) .then(synthesizeStep) .commit() export { activityPlanningWorkflow }

Register Agent and Workflow instances with Mastra class

Register the agents and workflow with the mastra instance. This is critical for enabling access to the agents within the workflow.

index.ts
import { Mastra } from '@mastra/core/mastra' import { PinoLogger } from '@mastra/loggers' import { activityPlanningWorkflow } from './workflows/parallel-workflow' import { planningAgent } from './agents/planning-agent' import { synthesizeAgent } from './agents/synthesize-agent' // Initialize Mastra with required agents and workflows // This setup enables agent access within the workflow steps const mastra = new Mastra({ vnext_workflows: { activityPlanningWorkflow, }, agents: { planningAgent, synthesizeAgent, }, logger: new PinoLogger({ name: "Mastra", level: "info", }), }) export { mastra }

Execute the activity planning workflow

Here, we’ll get the weather workflow from the mastra instance, then create a run and execute the created run with the required inputData.

exec.ts
import { mastra } from "./"; const workflow = mastra.vnext_getWorkflow('activityPlanningWorkflow') const run = workflow.createRun() // Execute the workflow with a specific city // This will run through all steps and generate activity recommendations const result = await run.start({ inputData: { city: 'Ibiza' } }) console.dir(result, { depth: null })