Workflow with Conditional Branching
Workflows often need to follow different paths based on some condition.
This example demonstrates how to use the branch
construct to create conditional flows within your workflows.
Setup
npm install @ai-sdk/openai @mastra/core
Define Planning Agent
Define a planning agent which leverages an LLM call to plan activities given a location and corresponding weather conditions.
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
const llm = openai("gpt-4o");
// Define the planning agent that generates activity recommendations
// based on weather conditions and location
const planningAgent = new Agent({
name: "planningAgent",
model: llm,
instructions: `
You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations.
📅 [Day, Month Date, Year]
═══════════════════════════
🌡️ WEATHER SUMMARY
• Conditions: [brief description]
• Temperature: [X°C/Y°F to A°C/B°F]
• Precipitation: [X% chance]
🌅 MORNING ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🌞 AFTERNOON ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🏠 INDOOR ALTERNATIVES
• [Activity Name] - [Brief description including specific venue]
Ideal for: [weather condition that would trigger this alternative]
⚠️ SPECIAL CONSIDERATIONS
• [Any relevant weather warnings, UV index, wind conditions, etc.]
Guidelines:
- Suggest 2-3 time-specific outdoor activities per day
- Include 1-2 indoor backup options
- For precipitation >50%, lead with indoor activities
- All activities must be specific to the location
- Include specific venues, trails, or locations
- Consider activity intensity based on temperature
- Keep descriptions concise but informative
Maintain this exact formatting for consistency, using the emoji and section headers as shown.
`,
});
export { planningAgent };
Define Activity Planning Workflow
Define the planning workflow with 3 steps: one to fetch the weather via a network call, one to plan activities, and another to plan only indoor activities. Both using the planning agent.
import { z } from 'zod'
import { createWorkflow, createStep } from "@mastra/core/workflows/vNext";
// Helper function to convert weather codes to human-readable conditions
function getWeatherCondition(code: number): string {
const conditions: Record<number, string> = {
0: "Clear sky",
1: "Mainly clear",
2: "Partly cloudy",
3: "Overcast",
45: "Foggy",
48: "Depositing rime fog",
51: "Light drizzle",
53: "Moderate drizzle",
55: "Dense drizzle",
61: "Slight rain",
63: "Moderate rain",
65: "Heavy rain",
71: "Slight snow fall",
73: "Moderate snow fall",
75: "Heavy snow fall",
95: "Thunderstorm",
};
return conditions[code] || "Unknown";
}
const forecastSchema = z.object({
date: z.string(),
maxTemp: z.number(),
minTemp: z.number(),
precipitationChance: z.number(),
condition: z.string(),
location: z.string(),
});
// Step to fetch weather data for a given city
// Makes API calls to get current weather conditions and forecast
const fetchWeather = createStep({
id: "fetch-weather",
description: "Fetches weather forecast for a given city",
inputSchema: z.object({
city: z.string(),
}),
outputSchema: forecastSchema,
execute: async ({ inputData }) => {
if (!inputData) {
throw new Error("Trigger data not found");
}
const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1`;
const geocodingResponse = await fetch(geocodingUrl);
const geocodingData = (await geocodingResponse.json()) as {
results: { latitude: number; longitude: number; name: string }[];
};
if (!geocodingData.results?.[0]) {
throw new Error(`Location '${inputData.city}' not found`);
}
const { latitude, longitude, name } = geocodingData.results[0];
const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}¤t=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m`;
const response = await fetch(weatherUrl);
const data = (await response.json()) as {
current: {
time: string;
precipitation: number;
weathercode: number;
};
hourly: {
precipitation_probability: number[];
temperature_2m: number[];
};
};
const forecast = {
date: new Date().toISOString(),
maxTemp: Math.max(...data.hourly.temperature_2m),
minTemp: Math.min(...data.hourly.temperature_2m),
condition: getWeatherCondition(data.current.weathercode),
location: name,
precipitationChance: data.hourly.precipitation_probability.reduce(
(acc, curr) => Math.max(acc, curr),
0,
),
};
return forecast;
},
});
// Step to plan activities based on weather conditions
// Uses the planning agent to generate activity recommendations
const planActivities = createStep({
id: "plan-activities",
description: "Suggests activities based on weather conditions",
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const forecast = inputData
if (!forecast) {
throw new Error("Forecast data not found");
}
const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities:
${JSON.stringify(forecast, null, 2)}
`;
const agent = mastra?.getAgent("planningAgent");
if (!agent) {
throw new Error("Planning agent not found");
}
const response = await agent.stream([
{
role: "user",
content: prompt,
},
]);
let activitiesText = "";
for await (const chunk of response.textStream) {
process.stdout.write(chunk);
activitiesText += chunk;
}
return {
activities: activitiesText,
};
},
});
// Step to plan indoor activities only
// Used when precipitation chance is high
const planIndoorActivities = createStep({
id: "plan-indoor-activities",
description: "Suggests indoor activities based on weather conditions",
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const forecast = inputData
if (!forecast) {
throw new Error("Forecast data not found");
}
const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}`;
const agent = mastra?.getAgent("planningAgent");
if (!agent) {
throw new Error("Planning agent not found");
}
const response = await agent.stream([
{
role: "user",
content: prompt,
},
]);
let activitiesText = "";
for await (const chunk of response.textStream) {
process.stdout.write(chunk);
activitiesText += chunk;
}
return {
activities: activitiesText,
};
},
});
// Main workflow that:
// 1. Fetches weather data
// 2. Branches based on precipitation chance:
// - If >50%: Plans indoor activities
// - If <=50%: Plans outdoor activities
const activityPlanningWorkflow = createWorkflow({
id: 'activity-planning-workflow-step2-if-else',
inputSchema: z.object({
city: z.string().describe("The city to get the weather for"),
}),
outputSchema: z.object({
activities: z.string(),
}),
})
.then(fetchWeather)
.branch([
// Branch for high precipitation (indoor activities)
[
async ({ inputData }) => {
return inputData?.precipitationChance > 50;
},
planIndoorActivities,
],
// Branch for low precipitation (outdoor activities)
[
async ({ inputData }) => {
return inputData?.precipitationChance <= 50;
},
planActivities,
],
]);
activityPlanningWorkflow.commit()
export { activityPlanningWorkflow }
Register Agent and Workflow instances with Mastra class
Register the agents and workflow with the mastra instance. This is critical for enabling access to the agents within the workflow.
import { Mastra } from '@mastra/core'
import { PinoLogger } from '@mastra/loggers'
import { activityPlanningWorkflow } from './workflows/conditional-workflow'
import { planningAgent } from './agents/planning-agent'
// Initialize Mastra with the activity planning workflow
// This enables the workflow to be executed and access the planning agent
const mastra = new Mastra({
vnext_workflows: {
activityPlanningWorkflow,
},
agents: {
planningAgent,
},
logger: PinoLogger({
name: "Mastra",
level: "info",
}),
})
export { mastra }
Execute the activity planning workflow
Here, we’ll get the activity planning workflow from the mastra instance, then create a run and execute the created run with the required inputData.
import { mastra } from "./";
const workflow = mastra.vnext_getWorkflow('activityPlanningWorkflow')
const run = workflow.createRun()
// Start the workflow with a city
// This will fetch weather and plan activities based on conditions
const result = await run.start({ inputData: { city: 'New York' } })
console.dir(result, { depth: null })