Durable agents
Added in: @mastra/core@1.45.0
Durable agents are currently in beta. APIs may change in future releases.
A durable agent wraps a regular Agent so the agentic loop runs inside a workflow. Events flow through PubSub, which means a client can disconnect and reconnect without missing chunks. The run state is persisted, so it survives process restarts.
When to use durable agentsDirect link to When to use durable agents
Use a durable agent when any of the following apply:
- The client may drop and reconnect mid-stream (mobile, spotty networks, long-running calls).
- The agentic loop may outlive a single HTTP request (background research, multi-step tool use).
- You need an observe/reconnect API where a second client picks up a stream that a first client started.
- You want Inngest-powered execution with step memoization, retries, and monitoring.
For short-lived, request-scoped calls where the client stays connected, a regular Agent with stream() or generate() is simpler.
QuickstartDirect link to Quickstart
Wrap an existing agent with createDurableAgent() from @mastra/core/agent/durable:
import { Agent } from '@mastra/core/agent'
import { createDurableAgent } from '@mastra/core/agent/durable'
const agent = new Agent({
id: 'researcher',
name: 'Researcher',
instructions: 'You research topics thoroughly.',
model: 'openai/gpt-5.5',
})
export const durableResearcher = createDurableAgent({ agent })
Register the durable agent with Mastra and call stream():
import { Mastra } from '@mastra/core'
import { durableResearcher } from './agents/researcher'
const mastra = new Mastra({
agents: { durableResearcher },
})
const { output, runId, cleanup } = await durableResearcher.stream(
'Research quantum computing advances in 2025',
)
for await (const chunk of output.fullStream) {
// Process each chunk as it arrives
}
// Release PubSub subscriptions and clear the run from the registry.
// If you skip this, an automatic cleanup timer fires after the stream ends.
cleanup()
The returned runId identifies the execution. Pass it to observe() to reconnect from a different client.
Visit the DurableAgent reference for the full configuration and method API.
How it worksDirect link to How it works
A durable agent adds three layers on top of a regular agent:
-
Workflow execution:
stream()serializes the messages and options into a workflow input, then triggers the agentic loop inside a durable workflow. The workflow runs the same loop asAgent.stream()but each step can be memoized and replayed. -
PubSub streaming: As the loop runs, chunks are published to a PubSub topic keyed by the run ID. The caller subscribes to this topic and pipes chunks into a
ReadableStream. If the caller disconnects and reconnects, missed chunks are replayed from the cache. -
Cache layer: An optional cache (in-memory by default, Redis or another backend in production) stores published events so that a late subscriber can catch up.
Execution variantsDirect link to Execution variants
Mastra provides three factory functions that produce durable agents. They differ in how the workflow is executed:
| Factory | Package | Best for |
|---|---|---|
createDurableAgent() | @mastra/core | Local development and single-process servers. You get a stream you can await directly. |
createEventedAgent() | @mastra/core | Background execution. The workflow starts without blocking, and you consume chunks through PubSub. |
createInngestAgent() | @mastra/inngest | Production deployments. Inngest adds step memoization, retries, and a monitoring dashboard. |
All three return an object you register with Mastra the same way as a regular agent. createDurableAgent() and createEventedAgent() return class instances that extend Agent. createInngestAgent() returns a Proxy-backed object that forwards Agent methods to the underlying agent.
In-process with createDurableAgent()Direct link to in-process-with-createdurableagent
Wrap your agent and call stream(). You get a DurableAgentStreamResult back in the same process. No external infrastructure is required, so this is the fastest way to get started:
import { Agent } from '@mastra/core/agent'
import { createDurableAgent } from '@mastra/core/agent/durable'
const agent = new Agent({
id: 'helper',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5.5',
})
export const durableHelper = createDurableAgent({ agent })
Fire-and-forget with createEventedAgent()Direct link to fire-and-forget-with-createeventedagent
The workflow starts in the background without blocking the caller. You still receive chunks through PubSub, so stream() returns a result you can consume. The HTTP handler that triggered the run does not need to wait for the workflow to finish:
import { Agent } from '@mastra/core/agent'
import { createEventedAgent } from '@mastra/core/agent/durable'
const agent = new Agent({
id: 'writer',
instructions: 'You write articles.',
model: 'openai/gpt-5.5',
})
export const eventedWriter = createEventedAgent({ agent })
Inngest-powered with createInngestAgent()Direct link to inngest-powered-with-createinngestagent
Run the workflow on the Inngest platform. Each tool call becomes a memoized step that Inngest can retry independently, and you get a dashboard for monitoring runs:
import { Agent } from '@mastra/core/agent'
import { createInngestAgent } from '@mastra/inngest'
import { Inngest } from 'inngest'
const inngest = new Inngest({ id: 'my-app' })
const agent = new Agent({
id: 'analyst',
instructions: 'You analyze data.',
model: 'openai/gpt-5.5',
})
export const inngestAnalyst = createInngestAgent({ agent, inngest })
Visit the createInngestAgent() reference for the full API, including Inngest-specific options like PubSub and cache configuration.
Resumable streamsDirect link to Resumable streams
Durable agents support resumable streams through PubSub and an event cache. When a client disconnects mid-stream, the cache continues storing events. The same client can reconnect by calling observe() with the runId:
const { output, cleanup } = await durableResearcher.observe(runId)
for await (const chunk of output.fullStream) {
// Chunks from the run, including any missed while disconnected
}
cleanup()
createDurableAgent() and createEventedAgent() use an in-memory cache by default, which means resumable streams work within a single process. For production, provide a persistent cache backend (e.g., Redis) so cached events survive process restarts:
import { createDurableAgent } from '@mastra/core/agent/durable'
import { RedisServerCache } from '@mastra/redis'
const cache = new RedisServerCache({ url: 'redis://localhost:6379' })
export const durableAgent = createDurableAgent({
agent,
cache,
})
createInngestAgent() does not enable caching by default. Pass a cache option or register the agent with a Mastra instance that has a serverCache configured to enable resumable streams.
Streaming with background tasksDirect link to Streaming with background tasks
Durable agents support the same untilIdle option as regular agents. When untilIdle is set, stream() keeps the connection open across background-task continuations until the agent is idle:
const { output, cleanup } = await durableAgent.stream('Research and summarize the topic', {
untilIdle: true,
memory: { thread: 'thread-1', resource: 'user-1' },
})
for await (const chunk of output.fullStream) {
// Chunks from the initial turn AND any follow-up turns triggered by
// background task completions
}
cleanup()
Pass { maxIdleMs } to customize the idle timeout (defaults to 5 minutes):
await durableAgent.stream('Research topic', {
untilIdle: { maxIdleMs: 30_000 },
memory: { thread: 'thread-1', resource: 'user-1' },
})
Visit Background tasks for the full background task guide, including configuration, subagents, and suspend/resume.
CleanupDirect link to Cleanup
Every stream() and observe() call returns a cleanup function. Calling it unsubscribes from PubSub and removes the run from the internal registry. If you forget to call it, an automatic timer fires after the stream ends, but calling cleanup() yourself frees resources immediately.
Tool approvalDirect link to Tool approval
Durable agents support tool approval (human-in-the-loop). When a tool call requires approval, the workflow suspends, emits an onSuspended callback, and waits for the caller to resume with resume():
const { output, runId, cleanup } = await durableAgent.stream('Delete the old records', {
requireToolApproval: true,
onSuspended: ({ toolCallId, toolName, args }) => {
// Notify the user and ask for approval
},
})
Resume the suspended run after approval:
await durableAgent.resume(runId, { approved: true })