Skip to main content

createInngestAgent()

createInngestAgent() wraps an existing Agent with Inngest-powered durable execution. Like createDurableAgent(), it streams events over PubSub and supports resumable streams, but runs the agentic loop on Inngest's execution engine instead of in-process. Use it when a run must survive process restarts or run in a distributed environment.

For in-process durable execution, use createDurableAgent(). For fire-and-forget execution on the built-in workflow engine, use createEventedAgent().

Usage example
Direct link to Usage example

Set up the Inngest client, wrap an agent, register it with Mastra, and expose the Inngest serve endpoint:

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { Agent } from '@mastra/core/agent'
import { createInngestAgent, serve as inngestServe } from '@mastra/inngest'
import { Inngest } from 'inngest'

const inngest = new Inngest({ id: 'my-app' })

const agent = new Agent({
id: 'my-agent',
name: 'My Agent',
instructions: 'You are a helpful assistant',
model: 'openai/gpt-5.5',
})

const durableAgent = createInngestAgent({ agent, inngest })

export const mastra = new Mastra({
agents: { myAgent: durableAgent },
server: {
apiRoutes: [
{
path: '/inngest/api',
method: 'ALL',
createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }),
},
],
},
})

Stream a response and read the result:

const { output, runId, cleanup } = await durableAgent.stream('Hello!')

const text = await output.text

cleanup()

createInngestAgent(options)
Direct link to createinngestagentoptions

Wraps an Agent with Inngest-powered durable execution and resumable streams.

import { createInngestAgent } from '@mastra/inngest'

const durableAgent = createInngestAgent({ agent, inngest })

Returns: InngestAgent

Parameters
Direct link to Parameters

agent:

Agent
The Agent to wrap with Inngest durable execution. Agent methods (e.g., `generate()`, `listTools()`, `getMemory()`) delegate to this agent via a Proxy.

inngest:

Inngest
The Inngest client instance. Used to send workflow events and, in SDK v4, publish realtime stream events.

id?:

string
= agent.id
ID override.

name?:

string
= agent.name
Name override.

pubsub?:

PubSub
= InngestPubSub
PubSub instance for streaming events. The default `InngestPubSub` uses Inngest Realtime, which works across processes.

cache?:

MastraServerCache
Cache for stored stream events, which enables resumable streams. When provided, the PubSub is automatically wrapped with `CachingPubSub`. If omitted, the agent inherits the cache from the Mastra instance.

mastra?:

Mastra
Mastra instance for observability. Set automatically when the agent is registered with Mastra.

InngestAgent interface
Direct link to inngestagent-interface

The object returned by createInngestAgent(). It implements the full Agent interface via a Proxy: any property or method not explicitly defined (e.g., generate(), listTools(), getMemory()) is forwarded to the underlying agent.

Properties
Direct link to Properties

id:

string
The agent ID.

name:

string
The agent name.

agent:

Agent
The underlying Mastra Agent.

inngest:

Inngest
The Inngest client.

cache:

MastraServerCache | undefined
The resolved cache instance, if resumable streams are enabled.

pubsub:

PubSub
The PubSub instance used for streaming events.

Methods
Direct link to Methods

Execution
Direct link to Execution

stream(messages, options?)
Direct link to streammessages-options

Streams a response using Inngest's durable execution engine. The workflow is triggered via an Inngest event after the PubSub subscription is established.

const { output, runId, cleanup } = await durableAgent.stream('Hello!', {
onChunk: chunk => console.log(chunk),
onFinish: result => console.log('done', result),
})

const text = await output.text
cleanup()

Returns: Promise<InngestAgentStreamResult>

resume(runId, resumeData, options?)
Direct link to resumerunid-resumedata-options

Resumes a suspended Inngest run, for example after a tool approval. Loads the workflow snapshot from storage, finds the suspended step, and sends a resume event to Inngest.

const { output, cleanup } = await durableAgent.resume(
runId,
{
approved: true,
},
{ threadId: 'thread-1', resourceId: 'user-1' },
)

await output.text
cleanup()

The third argument accepts threadId and resourceId in addition to the lifecycle callbacks:

threadId?:

string
Thread ID to associate with the resumed run.

resourceId?:

string
Resource ID to associate with the resumed run.

onChunk?:

(chunk: ChunkType) => void | Promise<void>
Called for each streamed chunk.

onStepFinish?:

(result: AgentStepFinishEventData) => void | Promise<void>
Called when a step in the agentic loop finishes.

onFinish?:

(result: AgentFinishEventData) => void | Promise<void>
Called when the run finishes.

onError?:

(error: Error) => void | Promise<void>
Called when the run errors.

onSuspended?:

(data: AgentSuspendedEventData) => void | Promise<void>
Called when the run suspends.

Returns: Promise<InngestAgentStreamResult>

observe(runId, options?)
Direct link to observerunid-options

Reconnects to an existing run, replaying cached events before delivering live ones. Use this after a network disconnection. Pass offset to start replay from a known position.

const { output, cleanup } = await durableAgent.observe(runId, {
offset: 0,
onChunk: chunk => console.log(chunk),
})

await output.text

The result from observe() does not include threadId or resourceId.

Returns: Promise<Omit<InngestAgentStreamResult, 'threadId' | 'resourceId'>>

warning

The cleanup() returned by observe() destroys the run's registry entries and cached events. Only call it when you are done with the run. If the run is suspended and you intend to resume later, don't call cleanup().

prepare(messages, options?)
Direct link to preparemessages-options

Prepares a run for durable execution without triggering it. Returns the serialized workflow input that can be used to manually trigger the Inngest workflow event.

const { runId, messageId, workflowInput, threadId, resourceId } = await durableAgent.prepare(
'Summarize the document',
{
memory: { threadId: 'thread-1', resourceId: 'user-1' },
},
)

Returns:

interface PrepareResult {
runId: string
messageId: string
workflowInput: any
threadId?: string
resourceId?: string
}

Introspection
Direct link to Introspection

isInngestAgent(obj)
Direct link to isinngestagentobj

Type guard that checks whether an object is an InngestAgent.

import { isInngestAgent } from '@mastra/inngest'

if (isInngestAgent(agent)) {
// agent is InngestAgent
}

Returns: boolean

Stream options
Direct link to Stream options

stream() accepts an InngestAgentStreamOptions object. It supports the same agent execution options as DurableAgent.stream(), plus lifecycle callbacks.

runId?:

string
Unique identifier for this run. Use it later with `resume()` or `observe()`.

instructions?:

AgentExecutionOptions['instructions']
Overrides the agent's default instructions for this run.

context?:

ModelMessage[]
Additional context messages to provide to the agent.

memory?:

object
Memory configuration for conversation persistence and retrieval.

requestContext?:

RequestContext
Request context carrying dynamic configuration and state for this run.

maxSteps?:

number
Maximum number of steps to run.

toolsets?:

object
Additional tool sets available for this run.

clientTools?:

object
Client-side tools available during execution.

toolChoice?:

'auto' | 'none' | 'required' | { type: 'tool'; toolName: string }
Tool selection strategy.

modelSettings?:

object
Model-specific settings such as temperature.

requireToolApproval?:

boolean
Require approval for all tool calls, which suspends the run until resumed.

autoResumeSuspendedTools?:

boolean
Automatically resume tools that suspended, instead of waiting for an external `resume()` call.

toolCallConcurrency?:

number
Maximum number of tool calls to execute concurrently.

includeRawChunks?:

boolean
Include raw provider chunks in the stream output.

maxProcessorRetries?:

number
Maximum number of processor retries per generation.

untilIdle?:

boolean | { maxIdleMs?: number }
When set, keeps the stream open across background-task continuations until the agent is idle. Pass `true` for the default 5-minute idle timeout, or `{ maxIdleMs }` to customise.

onChunk?:

(chunk: ChunkType) => void | Promise<void>
Called for each streamed chunk.

onStepFinish?:

(result: AgentStepFinishEventData) => void | Promise<void>
Called when a step in the agentic loop finishes.

onFinish?:

(result: AgentFinishEventData) => void | Promise<void>
Called when the run finishes.

onError?:

(error: Error) => void | Promise<void>
Called when the run errors.

onSuspended?:

(data: AgentSuspendedEventData) => void | Promise<void>
Called when the run suspends, for example for tool approval.

observe() accepts the lifecycle callbacks (onChunk, onStepFinish, onFinish, onError, onSuspended) plus an offset to control where replay starts.

InngestAgentStreamResult
Direct link to inngestagentstreamresult

The object returned by stream() and resume(). The observe() method returns the same shape but omits threadId and resourceId.

interface InngestAgentStreamResult<OUTPUT = undefined> {
output: MastraModelOutput<OUTPUT>
readonly fullStream: ReadableStream<any>
runId: string
threadId?: string
resourceId?: string
cleanup: () => void
}

output:

MastraModelOutput
The streaming output. Await `output.text` for the full text, or consume `output.fullStream`.

fullStream:

ReadableStream
The full event stream, delegating to `output.fullStream`.

runId:

string
The unique run ID. Pass it to `resume()` or `observe()` to reconnect.

threadId?:

string
Thread ID when using memory.

resourceId?:

string
Resource ID when using memory.

cleanup:

() => void
Unsubscribes from PubSub and clears registry entries for the run. Call it when done with the run.

Serving Inngest functions
Direct link to Serving Inngest functions

The @mastra/inngest package provides serve() and createServe() to register Inngest workflow functions with your HTTP framework.

serve(options)
Direct link to serveoptions

Serves Mastra workflows with Hono (the default framework). Collects all Inngest-backed workflows from Mastra and registers them as Inngest functions.

import { serve } from '@mastra/inngest'

app.use('/inngest/api', async c => {
return serve({ mastra, inngest })(c)
})

createServe(adapter)
Direct link to createserveadapter

Factory that accepts any Inngest serve adapter (inngest/express, inngest/fastify, inngest/next, etc.) and returns a serve function for that framework.

Express
import { createServe } from '@mastra/inngest'
import { serve } from 'inngest/express'

const serveExpress = createServe(serve)
app.use('/inngest/api', serveExpress({ mastra, inngest }))
Next.js
import { createServe } from '@mastra/inngest'
import { serve } from 'inngest/next'

const serveNext = createServe(serve)
export const { GET, POST, PUT } = serveNext({ mastra, inngest })

Serve options
Direct link to Serve options

mastra:

Mastra
The Mastra instance containing registered agents and workflows.

inngest:

Inngest
The Inngest client instance.

functions?:

InngestFunction.Like[]
Additional Inngest functions to serve alongside Mastra workflows.

registerOptions?:

RegisterOptions
Options passed to the Inngest registration handler.