Skip to Content

Inngest Workflow

This example demonstrates how to build an Inngest workflow with Mastra.

Setup

npm install @mastra/inngest @mastra/core @mastra/deployer @hono/node-server @ai-sdk/openai docker run --rm -p 8288:8288 \ inngest/inngest \ inngest dev -u http://host.docker.internal:3000/inngest/api

Alternatively, you can use the Inngest CLI for local development by following the official Inngest Dev Server guide.

Define the Planning Agent

Define a planning agent which leverages an LLM call to plan activities given a location and corresponding weather conditions.

agents/planning-agent.ts
import { Agent } from '@mastra/core/agent' import { openai } from '@ai-sdk/openai' // Create a new planning agent that uses the OpenAI model const planningAgent = new Agent({ name: 'planningAgent', model: openai('gpt-4o'), instructions: ` You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations. 📅 [Day, Month Date, Year] ═══════════════════════════ 🌡️ WEATHER SUMMARY • Conditions: [brief description] • Temperature: [X°C/Y°F to A°C/B°F] • Precipitation: [X% chance] 🌅 MORNING ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🌞 AFTERNOON ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🏠 INDOOR ALTERNATIVES • [Activity Name] - [Brief description including specific venue] Ideal for: [weather condition that would trigger this alternative] ⚠️ SPECIAL CONSIDERATIONS • [Any relevant weather warnings, UV index, wind conditions, etc.] Guidelines: - Suggest 2-3 time-specific outdoor activities per day - Include 1-2 indoor backup options - For precipitation >50%, lead with indoor activities - All activities must be specific to the location - Include specific venues, trails, or locations - Consider activity intensity based on temperature - Keep descriptions concise but informative Maintain this exact formatting for consistency, using the emoji and section headers as shown. `, }) export { planningAgent }

Define the Activity Planner Workflow

Define the activity planner workflow with 3 steps: one to fetch the weather via a network call, one to plan activities, and another to plan only indoor activities.

workflows/inngest-workflow.ts
import { z } from 'zod' const { createWorkflow, createStep } = init( new Inngest({ id: 'mastra', baseUrl: `http://localhost:8288`, }) ) // Helper function to convert weather codes to human-readable descriptions function getWeatherCondition(code: number): string { const conditions: Record<number, string> = { 0: 'Clear sky', 1: 'Mainly clear', 2: 'Partly cloudy', 3: 'Overcast', 45: 'Foggy', 48: 'Depositing rime fog', 51: 'Light drizzle', 53: 'Moderate drizzle', 55: 'Dense drizzle', 61: 'Slight rain', 63: 'Moderate rain', 65: 'Heavy rain', 71: 'Slight snow fall', 73: 'Moderate snow fall', 75: 'Heavy snow fall', 95: 'Thunderstorm', } return conditions[code] || 'Unknown' } const forecastSchema = z.object({ date: z.string(), maxTemp: z.number(), minTemp: z.number(), precipitationChance: z.number(), condition: z.string(), location: z.string(), })

Step 1: Fetch weather data for a given city

workflows/inngest-workflow.ts
const fetchWeather = createStep({ id: 'fetch-weather', description: 'Fetches weather forecast for a given city', inputSchema: z.object({ city: z.string(), }), outputSchema: forecastSchema, execute: async ({ inputData }) => { if (!inputData) { throw new Error('Trigger data not found') } // Get latitude and longitude for the city const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1` const geocodingResponse = await fetch(geocodingUrl) const geocodingData = (await geocodingResponse.json()) as { results: { latitude: number; longitude: number; name: string }[] } if (!geocodingData.results?.[0]) { throw new Error(`Location '${inputData.city}' not found`) } const { latitude, longitude, name } = geocodingData.results[0] // Fetch weather data using the coordinates const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}&current=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m` const response = await fetch(weatherUrl) const data = (await response.json()) as { current: { time: string precipitation: number weathercode: number } hourly: { precipitation_probability: number[] temperature_2m: number[] } } const forecast = { date: new Date().toISOString(), maxTemp: Math.max(...data.hourly.temperature_2m), minTemp: Math.min(...data.hourly.temperature_2m), condition: getWeatherCondition(data.current.weathercode), location: name, precipitationChance: data.hourly.precipitation_probability.reduce( (acc, curr) => Math.max(acc, curr), 0 ), } return forecast }, })

Step 2: Suggest activities (indoor or outdoor) based on weather

workflows/inngest-workflow.ts
const planActivities = createStep({ id: 'plan-activities', description: 'Suggests activities based on weather conditions', inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { const forecast = inputData if (!forecast) { throw new Error('Forecast data not found') } const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities: ${JSON.stringify(forecast, null, 2)} ` const agent = mastra?.getAgent('planningAgent') if (!agent) { throw new Error('Planning agent not found') } const response = await agent.stream([ { role: 'user', content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { process.stdout.write(chunk) activitiesText += chunk } return { activities: activitiesText, } }, })

Step 3: Suggest indoor activities only (for rainy weather)

workflows/inngest-workflow.ts
const planIndoorActivities = createStep({ id: 'plan-indoor-activities', description: 'Suggests indoor activities based on weather conditions', inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, mastra }) => { const forecast = inputData if (!forecast) { throw new Error('Forecast data not found') } const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}` const agent = mastra?.getAgent('planningAgent') if (!agent) { throw new Error('Planning agent not found') } const response = await agent.stream([ { role: 'user', content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { process.stdout.write(chunk) activitiesText += chunk } return { activities: activitiesText, } }, })

Define the activity planner workflow

workflows/inngest-workflow.ts
const activityPlanningWorkflow = createWorkflow({ id: 'activity-planning-workflow-step2-if-else', inputSchema: z.object({ city: z.string().describe('The city to get the weather for'), }), outputSchema: z.object({ activities: z.string(), }), }) .then(fetchWeather) .branch([ [ // If precipitation chance is greater than 50%, suggest indoor activities async ({ inputData }) => { return inputData?.precipitationChance > 50 }, planIndoorActivities, ], [ // Otherwise, suggest a mix of activities async ({ inputData }) => { return inputData?.precipitationChance <= 50 }, planActivities, ], ]) activityPlanningWorkflow.commit() export { activityPlanningWorkflow }

Register Agent and Workflow instances with Mastra class

Register the agents and workflow with the mastra instance. This allows access to the agents within the workflow.

index.ts
import { Mastra } from '@mastra/core/mastra' import { serve as inngestServe } from '@mastra/inngest' import { createLogger } from '@mastra/core/logger' import { Inngest } from 'inngest' import { activityPlanningWorkflow } from './workflows/inngest-workflow' import { planningAgent } from './agents/planning-agent' import { realtimeMiddleware } from '@inngest/realtime' // Create an Inngest instance for workflow orchestration and event handling const inngest = new Inngest({ id: 'mastra', baseUrl: `http://localhost:8288`, // URL of your local Inngest server isDev: true, middleware: [realtimeMiddleware()], // Enable real-time updates in the Inngest dashboard }) // Create and configure the main Mastra instance export const mastra = new Mastra({ vnext_workflows: { activityPlanningWorkflow, }, agents: { planningAgent, }, server: { host: '0.0.0.0', apiRoutes: [ { path: '/api/inngest', // API endpoint for Inngest to send events to method: 'ALL', createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }), }, ], }, logger: createLogger({ name: 'Mastra', level: 'info', }), })

Execute the activity planner workflow

Here, we’ll get the activity planner workflow from the mastra instance, then create a run and execute the created run with the required inputData.

exec.ts
import { mastra } from './' import { serve } from '@hono/node-server' import { createHonoServer } from '@mastra/deployer/server' const app = await createHonoServer(mastra) // Start the server on port 3000 so Inngest can send events to it const srv = serve({ fetch: app.fetch, port: 3000, }) const workflow = mastra.vnext_getWorkflow('activityPlanningWorkflow') const run = workflow.createRun({}) // Start the workflow with the required input data (city name) // This will trigger the workflow steps and stream the result to the console const result = await run.start({ inputData: { city: 'New York' } }) console.dir(result, { depth: null }) // Close the server after the workflow run is complete srv.close()

After running the workflow, you can view and monitor your workflow runs in real time using the Inngest dashboard at http://localhost:8288.