Inngest Workflow
This example demonstrates how to build an Inngest workflow with Mastra.
Setup
npm install @mastra/inngest @mastra/core @mastra/deployer @hono/node-server @ai-sdk/openai
docker run --rm -p 8288:8288 \
inngest/inngest \
inngest dev -u http://host.docker.internal:3000/inngest/api
Alternatively, you can use the Inngest CLI for local development by following the official Inngest Dev Server guide .
Define the Planning Agent
Define a planning agent which leverages an LLM call to plan activities given a location and corresponding weather conditions.
import { Agent } from '@mastra/core/agent'
import { openai } from '@ai-sdk/openai'
// Create a new planning agent that uses the OpenAI model
const planningAgent = new Agent({
name: 'planningAgent',
model: openai('gpt-4o'),
instructions: `
You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations.
📅 [Day, Month Date, Year]
═══════════════════════════
🌡️ WEATHER SUMMARY
• Conditions: [brief description]
• Temperature: [X°C/Y°F to A°C/B°F]
• Precipitation: [X% chance]
🌅 MORNING ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🌞 AFTERNOON ACTIVITIES
Outdoor:
• [Activity Name] - [Brief description including specific location/route]
Best timing: [specific time range]
Note: [relevant weather consideration]
🏠 INDOOR ALTERNATIVES
• [Activity Name] - [Brief description including specific venue]
Ideal for: [weather condition that would trigger this alternative]
⚠️ SPECIAL CONSIDERATIONS
• [Any relevant weather warnings, UV index, wind conditions, etc.]
Guidelines:
- Suggest 2-3 time-specific outdoor activities per day
- Include 1-2 indoor backup options
- For precipitation >50%, lead with indoor activities
- All activities must be specific to the location
- Include specific venues, trails, or locations
- Consider activity intensity based on temperature
- Keep descriptions concise but informative
Maintain this exact formatting for consistency, using the emoji and section headers as shown.
`,
})
export { planningAgent }
Define the Activity Planner Workflow
Define the activity planner workflow with 3 steps: one to fetch the weather via a network call, one to plan activities, and another to plan only indoor activities.
import { z } from 'zod'
const { createWorkflow, createStep } = init(
new Inngest({
id: 'mastra',
baseUrl: `http://localhost:8288`,
})
)
// Helper function to convert weather codes to human-readable descriptions
function getWeatherCondition(code: number): string {
const conditions: Record<number, string> = {
0: 'Clear sky',
1: 'Mainly clear',
2: 'Partly cloudy',
3: 'Overcast',
45: 'Foggy',
48: 'Depositing rime fog',
51: 'Light drizzle',
53: 'Moderate drizzle',
55: 'Dense drizzle',
61: 'Slight rain',
63: 'Moderate rain',
65: 'Heavy rain',
71: 'Slight snow fall',
73: 'Moderate snow fall',
75: 'Heavy snow fall',
95: 'Thunderstorm',
}
return conditions[code] || 'Unknown'
}
const forecastSchema = z.object({
date: z.string(),
maxTemp: z.number(),
minTemp: z.number(),
precipitationChance: z.number(),
condition: z.string(),
location: z.string(),
})
Step 1: Fetch weather data for a given city
const fetchWeather = createStep({
id: 'fetch-weather',
description: 'Fetches weather forecast for a given city',
inputSchema: z.object({
city: z.string(),
}),
outputSchema: forecastSchema,
execute: async ({ inputData }) => {
if (!inputData) {
throw new Error('Trigger data not found')
}
// Get latitude and longitude for the city
const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1`
const geocodingResponse = await fetch(geocodingUrl)
const geocodingData = (await geocodingResponse.json()) as {
results: { latitude: number; longitude: number; name: string }[]
}
if (!geocodingData.results?.[0]) {
throw new Error(`Location '${inputData.city}' not found`)
}
const { latitude, longitude, name } = geocodingData.results[0]
// Fetch weather data using the coordinates
const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}¤t=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m`
const response = await fetch(weatherUrl)
const data = (await response.json()) as {
current: {
time: string
precipitation: number
weathercode: number
}
hourly: {
precipitation_probability: number[]
temperature_2m: number[]
}
}
const forecast = {
date: new Date().toISOString(),
maxTemp: Math.max(...data.hourly.temperature_2m),
minTemp: Math.min(...data.hourly.temperature_2m),
condition: getWeatherCondition(data.current.weathercode),
location: name,
precipitationChance: data.hourly.precipitation_probability.reduce(
(acc, curr) => Math.max(acc, curr),
0
),
}
return forecast
},
})
Step 2: Suggest activities (indoor or outdoor) based on weather
const planActivities = createStep({
id: 'plan-activities',
description: 'Suggests activities based on weather conditions',
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const forecast = inputData
if (!forecast) {
throw new Error('Forecast data not found')
}
const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities:
${JSON.stringify(forecast, null, 2)}
`
const agent = mastra?.getAgent('planningAgent')
if (!agent) {
throw new Error('Planning agent not found')
}
const response = await agent.stream([
{
role: 'user',
content: prompt,
},
])
let activitiesText = ''
for await (const chunk of response.textStream) {
process.stdout.write(chunk)
activitiesText += chunk
}
return {
activities: activitiesText,
}
},
})
Step 3: Suggest indoor activities only (for rainy weather)
const planIndoorActivities = createStep({
id: 'plan-indoor-activities',
description: 'Suggests indoor activities based on weather conditions',
inputSchema: forecastSchema,
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const forecast = inputData
if (!forecast) {
throw new Error('Forecast data not found')
}
const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}`
const agent = mastra?.getAgent('planningAgent')
if (!agent) {
throw new Error('Planning agent not found')
}
const response = await agent.stream([
{
role: 'user',
content: prompt,
},
])
let activitiesText = ''
for await (const chunk of response.textStream) {
process.stdout.write(chunk)
activitiesText += chunk
}
return {
activities: activitiesText,
}
},
})
Define the activity planner workflow
const activityPlanningWorkflow = createWorkflow({
id: 'activity-planning-workflow-step2-if-else',
inputSchema: z.object({
city: z.string().describe('The city to get the weather for'),
}),
outputSchema: z.object({
activities: z.string(),
}),
})
.then(fetchWeather)
.branch([
[
// If precipitation chance is greater than 50%, suggest indoor activities
async ({ inputData }) => {
return inputData?.precipitationChance > 50
},
planIndoorActivities,
],
[
// Otherwise, suggest a mix of activities
async ({ inputData }) => {
return inputData?.precipitationChance <= 50
},
planActivities,
],
])
activityPlanningWorkflow.commit()
export { activityPlanningWorkflow }
Register Agent and Workflow instances with Mastra class
Register the agents and workflow with the mastra instance. This allows access to the agents within the workflow.
import { Mastra } from '@mastra/core/mastra'
import { serve as inngestServe } from '@mastra/inngest'
import { createLogger } from '@mastra/core/logger'
import { Inngest } from 'inngest'
import { activityPlanningWorkflow } from './workflows/inngest-workflow'
import { planningAgent } from './agents/planning-agent'
import { realtimeMiddleware } from '@inngest/realtime'
// Create an Inngest instance for workflow orchestration and event handling
const inngest = new Inngest({
id: 'mastra',
baseUrl: `http://localhost:8288`, // URL of your local Inngest server
isDev: true,
middleware: [realtimeMiddleware()], // Enable real-time updates in the Inngest dashboard
})
// Create and configure the main Mastra instance
export const mastra = new Mastra({
vnext_workflows: {
activityPlanningWorkflow,
},
agents: {
planningAgent,
},
server: {
host: '0.0.0.0',
apiRoutes: [
{
path: '/api/inngest', // API endpoint for Inngest to send events to
method: 'ALL',
createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }),
},
],
},
logger: createLogger({
name: 'Mastra',
level: 'info',
}),
})
Execute the activity planner workflow
Here, we’ll get the activity planner workflow from the mastra instance, then create a run and execute the created run with the required inputData.
import { mastra } from './'
import { serve } from '@hono/node-server'
import { createHonoServer } from '@mastra/deployer/server'
const app = await createHonoServer(mastra)
// Start the server on port 3000 so Inngest can send events to it
const srv = serve({
fetch: app.fetch,
port: 3000,
})
const workflow = mastra.vnext_getWorkflow('activityPlanningWorkflow')
const run = workflow.createRun({})
// Start the workflow with the required input data (city name)
// This will trigger the workflow steps and stream the result to the console
const result = await run.start({ inputData: { city: 'New York' } })
console.dir(result, { depth: null })
// Close the server after the workflow run is complete
srv.close()
After running the workflow, you can view and monitor your workflow runs in real time using the Inngest dashboard at http://localhost:8288 .