createInngestAgent()
createInngestAgent() wraps an existing Agent with Inngest-powered durable execution. Like createDurableAgent(), it streams events over PubSub and supports resumable streams, but runs the agentic loop on Inngest's execution engine instead of in-process. Use it when a run must survive process restarts or run in a distributed environment.
For in-process durable execution, use createDurableAgent(). For fire-and-forget execution on the built-in workflow engine, use createEventedAgent().
Usage exampleDirect link to Usage example
Set up the Inngest client, wrap an agent, register it with Mastra, and expose the Inngest serve endpoint:
import { Mastra } from '@mastra/core'
import { Agent } from '@mastra/core/agent'
import { createInngestAgent, serve as inngestServe } from '@mastra/inngest'
import { Inngest } from 'inngest'
const inngest = new Inngest({ id: 'my-app' })
const agent = new Agent({
id: 'my-agent',
name: 'My Agent',
instructions: 'You are a helpful assistant',
model: 'openai/gpt-5.5',
})
const durableAgent = createInngestAgent({ agent, inngest })
export const mastra = new Mastra({
agents: { myAgent: durableAgent },
server: {
apiRoutes: [
{
path: '/inngest/api',
method: 'ALL',
createHandler: async ({ mastra }) => inngestServe({ mastra, inngest }),
},
],
},
})
Stream a response and read the result:
const { output, runId, cleanup } = await durableAgent.stream('Hello!')
const text = await output.text
cleanup()
createInngestAgent(options)Direct link to createinngestagentoptions
Wraps an Agent with Inngest-powered durable execution and resumable streams.
import { createInngestAgent } from '@mastra/inngest'
const durableAgent = createInngestAgent({ agent, inngest })
Returns: InngestAgent
ParametersDirect link to Parameters
agent:
inngest:
id?:
name?:
pubsub?:
cache?:
mastra?:
InngestAgent interfaceDirect link to inngestagent-interface
The object returned by createInngestAgent(). It implements the full Agent interface via a Proxy: any property or method not explicitly defined (e.g., generate(), listTools(), getMemory()) is forwarded to the underlying agent.
PropertiesDirect link to Properties
id:
name:
agent:
inngest:
cache:
pubsub:
MethodsDirect link to Methods
ExecutionDirect link to Execution
stream(messages, options?)Direct link to streammessages-options
Streams a response using Inngest's durable execution engine. The workflow is triggered via an Inngest event after the PubSub subscription is established.
const { output, runId, cleanup } = await durableAgent.stream('Hello!', {
onChunk: chunk => console.log(chunk),
onFinish: result => console.log('done', result),
})
const text = await output.text
cleanup()
Returns: Promise<InngestAgentStreamResult>
resume(runId, resumeData, options?)Direct link to resumerunid-resumedata-options
Resumes a suspended Inngest run, for example after a tool approval. Loads the workflow snapshot from storage, finds the suspended step, and sends a resume event to Inngest.
const { output, cleanup } = await durableAgent.resume(
runId,
{
approved: true,
},
{ threadId: 'thread-1', resourceId: 'user-1' },
)
await output.text
cleanup()
The third argument accepts threadId and resourceId in addition to the lifecycle callbacks:
threadId?:
resourceId?:
onChunk?:
onStepFinish?:
onFinish?:
onError?:
onSuspended?:
Returns: Promise<InngestAgentStreamResult>
observe(runId, options?)Direct link to observerunid-options
Reconnects to an existing run, replaying cached events before delivering live ones. Use this after a network disconnection. Pass offset to start replay from a known position.
const { output, cleanup } = await durableAgent.observe(runId, {
offset: 0,
onChunk: chunk => console.log(chunk),
})
await output.text
The result from observe() does not include threadId or resourceId.
Returns: Promise<Omit<InngestAgentStreamResult, 'threadId' | 'resourceId'>>
The cleanup() returned by observe() destroys the run's registry entries and cached events. Only call it when you are done with the run. If the run is suspended and you intend to resume later, don't call cleanup().
prepare(messages, options?)Direct link to preparemessages-options
Prepares a run for durable execution without triggering it. Returns the serialized workflow input that can be used to manually trigger the Inngest workflow event.
const { runId, messageId, workflowInput, threadId, resourceId } = await durableAgent.prepare(
'Summarize the document',
{
memory: { threadId: 'thread-1', resourceId: 'user-1' },
},
)
Returns:
interface PrepareResult {
runId: string
messageId: string
workflowInput: any
threadId?: string
resourceId?: string
}
IntrospectionDirect link to Introspection
isInngestAgent(obj)Direct link to isinngestagentobj
Type guard that checks whether an object is an InngestAgent.
import { isInngestAgent } from '@mastra/inngest'
if (isInngestAgent(agent)) {
// agent is InngestAgent
}
Returns: boolean
Stream optionsDirect link to Stream options
stream() accepts an InngestAgentStreamOptions object. It supports the same agent execution options as DurableAgent.stream(), plus lifecycle callbacks.
runId?:
instructions?:
context?:
memory?:
requestContext?:
maxSteps?:
toolsets?:
clientTools?:
toolChoice?:
modelSettings?:
requireToolApproval?:
autoResumeSuspendedTools?:
toolCallConcurrency?:
includeRawChunks?:
maxProcessorRetries?:
untilIdle?:
onChunk?:
onStepFinish?:
onFinish?:
onError?:
onSuspended?:
observe() accepts the lifecycle callbacks (onChunk, onStepFinish, onFinish, onError, onSuspended) plus an offset to control where replay starts.
InngestAgentStreamResultDirect link to inngestagentstreamresult
The object returned by stream() and resume(). The observe() method returns the same shape but omits threadId and resourceId.
interface InngestAgentStreamResult<OUTPUT = undefined> {
output: MastraModelOutput<OUTPUT>
readonly fullStream: ReadableStream<any>
runId: string
threadId?: string
resourceId?: string
cleanup: () => void
}
output:
fullStream:
runId:
threadId?:
resourceId?:
cleanup:
Serving Inngest functionsDirect link to Serving Inngest functions
The @mastra/inngest package provides serve() and createServe() to register Inngest workflow functions with your HTTP framework.
serve(options)Direct link to serveoptions
Serves Mastra workflows with Hono (the default framework). Collects all Inngest-backed workflows from Mastra and registers them as Inngest functions.
import { serve } from '@mastra/inngest'
app.use('/inngest/api', async c => {
return serve({ mastra, inngest })(c)
})
createServe(adapter)Direct link to createserveadapter
Factory that accepts any Inngest serve adapter (inngest/express, inngest/fastify, inngest/next, etc.) and returns a serve function for that framework.
import { createServe } from '@mastra/inngest'
import { serve } from 'inngest/express'
const serveExpress = createServe(serve)
app.use('/inngest/api', serveExpress({ mastra, inngest }))
import { createServe } from '@mastra/inngest'
import { serve } from 'inngest/next'
const serveNext = createServe(serve)
export const { GET, POST, PUT } = serveNext({ mastra, inngest })