Inngest workflow
Inngest is a developer platform for building and running background workflows, without managing infrastructure.
For a complete example with advanced flow control features, see the Inngest workflow example.
How Inngest works with MastraDirect link to How Inngest works with Mastra
Inngest and Mastra integrate by aligning their workflow models: Inngest organizes logic into functions composed of steps, and Mastra workflows defined using createWorkflow() and createStep() map directly onto this paradigm. Each Mastra workflow becomes an Inngest function with a unique identifier, and each step within the workflow maps to an Inngest step.
The serve() function bridges the two systems by registering Mastra workflows as Inngest functions and setting up the necessary event handlers for execution and monitoring.
When an event triggers a workflow, Inngest executes it step by step, memoizing each step's result. This means if a workflow is retried or resumed, completed steps are skipped, ensuring efficient and reliable execution. Control flow primitives in Mastra, such as loops, conditionals, and nested workflows are seamlessly translated into the same Inngest's function/step model, preserving advanced workflow features like composition, branching, and suspension.
Real-time monitoring, suspend/resume, and step-level observability are enabled via Inngest's publish-subscribe system and dashboard. As each step executes, its state and output are tracked using Mastra storage and can be resumed as needed.
SetupDirect link to Setup
Install the required packages:
npm install @mastra/inngest@beta inngest @inngest/realtime
Building an Inngest workflowDirect link to Building an Inngest workflow
This guide walks through creating a workflow with Inngest and Mastra, demonstrating a counter application that increments a value until it reaches 10.
Inngest initializationDirect link to Inngest initialization
Initialize the Inngest integration to obtain Mastra-compatible workflow helpers. The createWorkflow() and createStep() functions are used to create workflow and step objects that are compatible with Mastra and inngest.
In development:
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime/middleware";
export const inngest = new Inngest({
id: "mastra",
baseUrl: "http://localhost:8288",
isDev: true,
middleware: [realtimeMiddleware()],
});
In production:
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime/middleware";
export const inngest = new Inngest({
id: "mastra",
middleware: [realtimeMiddleware()],
});
Creating stepsDirect link to Creating steps
Define the individual steps that will compose your workflow:
import { z } from "zod";
import { inngest } from "../inngest";
import { init } from "@mastra/inngest";
// Initialize Inngest with Mastra, pointing to your local Inngest server
const { createWorkflow, createStep } = init(inngest);
// Step: Increment the counter value
const incrementStep = createStep({
id: "increment",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
return { value: inputData.value + 1 };
},
});
Creating the workflowDirect link to Creating the workflow
Compose the steps into a workflow using the dountil loop pattern. The createWorkflow() function creates a function on Inngest server that is invocable.
// workflow that is registered as a function on inngest server
const workflow = createWorkflow({
id: "increment-workflow",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
}).then(incrementStep);
workflow.commit();
export { workflow as incrementWorkflow };
Configuring the Mastra instanceDirect link to Configuring the Mastra instance
Register the workflow with Mastra and configure the Inngest API endpoint:
import { Mastra } from "@mastra/core";
import { serve } from "@mastra/inngest";
import { incrementWorkflow } from "./workflows";
import { inngest } from "./inngest";
import { PinoLogger } from "@mastra/loggers";
export const mastra = new Mastra({
workflows: { incrementWorkflow },
server: {
host: "0.0.0.0",
apiRoutes: [
{
path: "/api/inngest",
method: "ALL",
createHandler: async ({ mastra }) => {
return serve({ mastra, inngest });
},
},
],
},
logger: new PinoLogger({ name: "Mastra", level: "info" }),
});
Running workflowsDirect link to Running workflows
Running locallyDirect link to Running locally
-
Run
npx mastra devto start the Mastra server locally on port 4111 -
Start the Inngest Dev Server. In a new terminal, run:
npx inngest-cli@latest dev -u http://localhost:4111/api/inngestnoteThe URL after
-utells the Inngest dev server where to find your Mastra/api/inngestendpoint -
Open the Inngest Dashboard at http://localhost:8288 and go to the Apps section in the sidebar to verify your Mastra workflow is registered
-
Invoke the workflow by going to Functions, selecting your workflow, and clicking Invoke with the following input:
{
"data": {
"inputData": {
"value": 5
}
}
} -
Monitor the workflow execution in the Runs tab to see step-by-step execution progress
Running in productionDirect link to Running in production
Before you begin, make sure you have:
- Vercel account and Vercel CLI installed (
npm i -g vercel) - Inngest account
- Vercel token
-
Set your Vercel token in your environment:
.envexport VERCEL_TOKEN=your_vercel_token -
Add
VercelDeployerto Mastra instancesrc/mastra/index.tsimport { VercelDeployer } from "@mastra/deployer-vercel";
export const mastra = new Mastra({
deployer: new VercelDeployer({
teamSlug: "your_team_slug",
projectName: "your_project_name",
// you can get your vercel token from the vercel dashboard by clicking on the user icon in the top right corner
// and then clicking on "Account Settings" and then clicking on "Tokens" on the left sidebar.
token: process.env.VERCEL_TOKEN,
}),
}); -
Build the mastra instance
npx mastra build -
Deploy to Vercel
cd .mastra/output
vercel login
vercel --prod -
Sync with the Inngest dashboard by clicking Sync new app with Vercel and following the instructions
-
Invoke the workflow by going to Functions, selecting
workflow.increment-workflow, clicking All actions > Invoke, and providing the following input:{
"data": {
"inputData": {
"value": 5
}
}
} -
Monitor execution in the Runs tab to see step-by-step progress
Adding custom Inngest functionsDirect link to Adding custom Inngest functions
You can serve additional Inngest functions alongside your Mastra workflows by using the optional functions parameter in serve().
Creating custom functionsDirect link to Creating custom functions
First, create your custom Inngest functions:
import { inngest } from "../inngest";
// Define custom Inngest functions
export const customEmailFunction = inngest.createFunction(
{ id: "send-welcome-email" },
{ event: "user/registered" },
async ({ event }) => {
// Custom email logic here
console.log(`Sending welcome email to ${event.data.email}`);
return { status: "email_sent" };
},
);
export const customWebhookFunction = inngest.createFunction(
{ id: "process-webhook" },
{ event: "webhook/received" },
async ({ event }) => {
// Custom webhook processing
console.log(`Processing webhook: ${event.data.type}`);
return { processed: true };
},
);
Serving custom functions with workflowsDirect link to Serving custom functions with workflows
Update your Mastra configuration to import and include the custom functions. The highlighted lines show the additions:
import { Mastra } from "@mastra/core";
import { serve } from "@mastra/inngest";
import { incrementWorkflow } from "./workflows";
import { inngest } from "./inngest";
import {
customEmailFunction,
customWebhookFunction,
} from "./inngest/custom-functions";
import { PinoLogger } from "@mastra/loggers";
export const mastra = new Mastra({
workflows: { incrementWorkflow },
server: {
host: "0.0.0.0",
apiRoutes: [
{
path: "/api/inngest",
method: "ALL",
createHandler: async ({ mastra }) => {
return serve({
mastra,
inngest,
functions: [customEmailFunction, customWebhookFunction],
});
},
},
],
},
logger: new PinoLogger({ name: "Mastra", level: "info" }),
});
Function registrationDirect link to Function registration
When you include custom functions:
- Mastra workflows are automatically converted to Inngest functions with IDs like
workflow.${workflowId} - Custom functions retain their specified IDs (e.g.,
send-welcome-email,process-webhook) - All functions are served together on the same
/api/inngestendpoint
This allows you to combine Mastra's workflow orchestration with your existing Inngest functions.
Flow controlDirect link to Flow control
Inngest workflows support flow control features including concurrency limits, rate limiting, throttling, debouncing, and priority queuing. These options are configured in the createWorkflow() call and help manage workflow execution at scale.
ConcurrencyDirect link to Concurrency
Control how many workflow instances can run simultaneously:
const workflow = createWorkflow({
id: "user-processing-workflow",
inputSchema: z.object({ userId: z.string() }),
outputSchema: z.object({ result: z.string() }),
steps: [processUserStep],
// Limit to 10 concurrent executions, scoped by user ID
concurrency: {
limit: 10,
key: "event.data.userId",
},
});
Rate limitingDirect link to Rate limiting
Limit the number of workflow executions within a time period:
const workflow = createWorkflow({
id: "api-sync-workflow",
inputSchema: z.object({ endpoint: z.string() }),
outputSchema: z.object({ status: z.string() }),
steps: [apiSyncStep],
// Maximum 1000 executions per hour
rateLimit: {
period: "1h",
limit: 1000,
},
});
ThrottlingDirect link to Throttling
Ensure minimum time between workflow executions:
const workflow = createWorkflow({
id: "email-notification-workflow",
inputSchema: z.object({ organizationId: z.string(), message: z.string() }),
outputSchema: z.object({ sent: z.boolean() }),
steps: [sendEmailStep],
// Only one execution per 10 seconds per organization
throttle: {
period: "10s",
limit: 1,
key: "event.data.organizationId",
},
});
DebouncingDirect link to Debouncing
Delay execution until no new events arrive within a time window:
const workflow = createWorkflow({
id: "search-index-workflow",
inputSchema: z.object({ documentId: z.string() }),
outputSchema: z.object({ indexed: z.boolean() }),
steps: [indexDocumentStep],
// Wait 5 seconds of no updates before indexing
debounce: {
period: "5s",
key: "event.data.documentId",
},
});
PriorityDirect link to Priority
Set execution priority for workflows:
const workflow = createWorkflow({
id: "order-processing-workflow",
inputSchema: z.object({
orderId: z.string(),
priority: z.number().optional(),
}),
outputSchema: z.object({ processed: z.boolean() }),
steps: [processOrderStep],
// Higher priority orders execute first
priority: {
run: "event.data.priority ?? 50",
},
});
Combining flow control optionsDirect link to Combining flow control options
Multiple flow control options can be combined in a single workflow:
const workflow = createWorkflow({
id: "comprehensive-workflow",
inputSchema: z.object({
userId: z.string(),
organizationId: z.string(),
priority: z.number().optional(),
}),
outputSchema: z.object({ result: z.string() }),
steps: [comprehensiveStep],
concurrency: {
limit: 5,
key: "event.data.userId",
},
rateLimit: {
period: "1m",
limit: 100,
},
throttle: {
period: "10s",
limit: 1,
key: "event.data.organizationId",
},
priority: {
run: "event.data.priority ?? 0",
},
});
All flow control options are optional. If not specified, workflows run with Inngest's default behavior. For more information, see the Inngest flow control documentation.
Cron schedulingDirect link to Cron scheduling
Inngest workflows can be automatically triggered on a schedule using cron expressions. This allows you to run workflows at regular intervals, such as daily reports, hourly data syncs, or maintenance tasks.
Basic cron schedulingDirect link to Basic cron scheduling
Configure a workflow to run on a schedule by adding a cron property:
const workflow = createWorkflow({
id: "daily-report-workflow",
inputSchema: z.object({ reportType: z.string() }),
outputSchema: z.object({ generated: z.boolean() }),
steps: [generateReportStep],
// Run daily at midnight
cron: "0 0 * * *",
});
Cron schedule formatDirect link to Cron schedule format
The cron property accepts standard cron expressions in the format: minute hour day month dayOfWeek
- minute: 0-59
- hour: 0-23
- day: 1-31
- month: 1-12 or JAN-DEC
- dayOfWeek: 0-6 (Sunday = 0) or SUN-SAT
Common cron patterns:
// Every 15 minutes
cron: "*/15 * * * *"
// Every hour at minute 0
cron: "0 * * * *"
// Every 6 hours
cron: "0 */6 * * *"
// Daily at midnight
cron: "0 0 * * *"
// Daily at 9 AM
cron: "0 9 * * *"
// Every weekday at 9 AM
cron: "0 9 * * 1-5"
// First day of every month at midnight
cron: "0 0 1 * *"
// Every Monday at 8 AM
cron: "0 8 * * 1"
Providing input data for scheduled runsDirect link to Providing input data for scheduled runs
You can provide static input data that will be used for each scheduled execution:
const workflow = createWorkflow({
id: "scheduled-data-sync",
inputSchema: z.object({
source: z.string(),
destination: z.string(),
}),
outputSchema: z.object({ synced: z.boolean() }),
steps: [syncDataStep],
cron: "0 */6 * * *", // Every 6 hours
// Input data provided to each scheduled run
inputData: {
source: "production-db",
destination: "analytics-warehouse",
},
});
Providing initial state for scheduled runsDirect link to Providing initial state for scheduled runs
You can also set an initial state for scheduled workflow runs:
const workflow = createWorkflow({
id: "scheduled-aggregation",
inputSchema: z.object({ date: z.string() }),
outputSchema: z.object({ aggregated: z.boolean() }),
stateSchema: z.object({
processedCount: z.number(),
lastProcessedDate: z.string(),
}),
steps: [aggregateDataStep],
cron: "0 0 * * *", // Daily at midnight
inputData: {
date: new Date().toISOString().split("T")[0], // Today's date
},
initialState: {
processedCount: 0,
lastProcessedDate: "",
},
});
Combining cron with flow controlDirect link to Combining cron with flow control
Cron scheduling can be combined with flow control options:
const workflow = createWorkflow({
id: "scheduled-api-sync",
inputSchema: z.object({ endpoint: z.string() }),
outputSchema: z.object({ synced: z.boolean() }),
steps: [syncApiStep],
cron: "*/30 * * * *", // Every 30 minutes
inputData: {
endpoint: "https://api.example.com/data",
},
// Limit concurrent executions even for scheduled runs
concurrency: {
limit: 5,
},
// Rate limit scheduled executions
rateLimit: {
period: "1h",
limit: 100,
},
});
How cron functions workDirect link to How cron functions work
When you configure a workflow with a cron property:
- A separate Inngest function is automatically created with the ID
workflow.${workflowId}.cron - This function is registered with Inngest and will trigger at the specified schedule
- Each scheduled execution creates a new workflow run with the provided
inputDataandinitialState - The cron function and the main workflow function are both served together when you call
serve()
You can monitor scheduled executions in the Inngest dashboard under Functions and Runs sections. The cron function will appear as a separate function alongside your main workflow function.
For more information on cron scheduling, see the Inngest cron documentation.