Skip to main content

DurableAgent

DurableAgent wraps an existing Agent with durable execution and resumable streams. It runs the agentic loop so that a client can disconnect and reconnect without missing events, and it streams those events over PubSub. Use it when a run must outlive a single request or survive a dropped connection.

Create one with the createDurableAgent factory, or use createEventedAgent for fire-and-forget execution on the built-in workflow engine. For Inngest-powered execution, use createInngestAgent from @mastra/inngest.

Usage example
Direct link to Usage example

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { Agent } from '@mastra/core/agent'
import { createDurableAgent } from '@mastra/core/agent/durable'

const agent = new Agent({
id: 'my-agent',
name: 'My Agent',
instructions: 'You are a helpful assistant',
model: 'openai/gpt-5.5',
})

const durableAgent = createDurableAgent({ agent })

export const mastra = new Mastra({
agents: { myAgent: durableAgent },
})

Stream a response and read the result. The cleanup function unsubscribes from PubSub when you are done with the run:

const { output, runId, cleanup } = await durableAgent.stream('Hello!')

const text = await output.text

cleanup()

createDurableAgent(options)
Direct link to createdurableagentoptions

Wraps an Agent with durable execution and resumable streams. This is the recommended way to create a DurableAgent.

import { createDurableAgent } from '@mastra/core/agent/durable'

const durableAgent = createDurableAgent({ agent })

Returns: DurableAgent

Parameters
Direct link to Parameters

agent:

Agent
The Agent to wrap with durable execution capabilities. Agent methods delegate to this agent.

id?:

string
= agent.id
ID override.

name?:

string
= agent.name
Name override.

cache?:

MastraServerCache | false
Cache for stored stream events, which enables resumable streams. If omitted, the agent inherits the cache from the Mastra instance, or uses an InMemoryServerCache. Set to `false` to disable caching, which makes streams non-resumable.

pubsub?:

PubSub
= EventEmitterPubSub
PubSub instance for streaming events.

maxSteps?:

number
Maximum number of steps for the agentic loop.

createEventedAgent(options)
Direct link to createeventedagentoptions

Wraps an Agent with fire-and-forget durable execution on the built-in workflow engine. Like createDurableAgent, it returns a result you stream from, but the underlying workflow runs non-blocking (via startAsync) instead of running to completion before the stream is wired up. Use it when you want the run to progress independently of the caller. It does not accept id or name overrides.

import { createEventedAgent } from '@mastra/core/agent/durable'

const eventedAgent = createEventedAgent({ agent })

Returns: EventedAgent (a subclass of DurableAgent)

Parameters
Direct link to Parameters

agent:

Agent
The Agent to wrap with evented durable execution capabilities.

cache?:

MastraServerCache | false
Cache for stored stream events, which enables resumable streams. If omitted, the agent inherits the cache from the Mastra instance, or uses an InMemoryServerCache. Set to `false` to disable caching.

pubsub?:

PubSub
= EventEmitterPubSub
PubSub instance for streaming events.

maxSteps?:

number
Maximum number of steps for the agentic loop.

Constructor parameters
Direct link to Constructor parameters

The DurableAgent class accepts the same options as createDurableAgent, plus cleanupTimeoutMs. Prefer the factory unless you need to subclass.

agent:

Agent
The Agent to wrap with durable execution capabilities.

id?:

string
= agent.id
ID override.

name?:

string
= agent.name
Name override.

cache?:

MastraServerCache | false
Cache for stored stream events. If omitted, inherits from the Mastra instance or uses an InMemoryServerCache. Set to `false` to disable caching.

pubsub?:

PubSub
= EventEmitterPubSub
PubSub instance for streaming events.

maxSteps?:

number
Maximum number of steps for the agentic loop.

cleanupTimeoutMs?:

number
= 30000
Grace period in milliseconds before registry entries are cleaned up automatically after a stream finishes or errors. Set to 0 to disable auto-cleanup and require a manual `cleanup()` call. Auto-cleanup does not fire on suspended events.

Methods
Direct link to Methods

Execution
Direct link to Execution

stream(messages, options?)
Direct link to streammessages-options

Streams a response using durable execution. Returns immediately with a result whose output produces events as the run progresses.

const { output, runId, cleanup } = await durableAgent.stream('Hello!', {
onChunk: chunk => console.log(chunk),
onFinish: result => console.log('done', result),
})

const text = await output.text
cleanup()

Returns: Promise<DurableAgentStreamResult>

resume(runId, resumeData, options?)
Direct link to resumerunid-resumedata-options

Resumes a suspended run, for example after a tool approval. Pass the runId from the original stream and the data the run was waiting on. Throws if no registry entry exists for the run.

const { output, cleanup } = await durableAgent.resume(runId, {
approved: true,
})

await output.text
cleanup()

Returns: Promise<DurableAgentStreamResult>

observe(runId, options?)
Direct link to observerunid-options

Reconnects to an existing run, replaying cached events before delivering live ones. Use this after a network disconnection. Pass offset to start replay from a known position.

const { output, cleanup } = await durableAgent.observe(runId, {
offset: 0,
onChunk: chunk => console.log(chunk),
})

await output.text

Returns: Promise<DurableAgentStreamResult>

warning

The cleanup() returned by observe() destroys the run's registry entries and cached events. Only call it when you are done with the run. If the run is suspended and you intend to resume later, do not call cleanup() — let the auto-cleanup timer handle it after the run finishes or errors. Auto-cleanup does not fire on suspended events.

Stream options
Direct link to Stream options

stream() accepts a DurableAgentStreamOptions object. It supports the agent execution options below, plus lifecycle callbacks.

runId?:

string
Unique identifier for this run. Use it later with `resume()` or `observe()`.

instructions?:

AgentExecutionOptions['instructions']
Overrides the agent's default instructions for this run. Accepts a static string or the same dynamic instructions value the agent supports.

context?:

ModelMessage[]
Additional context messages to provide to the agent.

memory?:

object
Memory configuration for conversation persistence and retrieval.

requestContext?:

RequestContext
Request context carrying dynamic configuration and state for this run.

maxSteps?:

number
Maximum number of steps to run for this stream.

toolsets?:

object
Additional tool sets available for this run.

clientTools?:

object
Client-side tools available during execution.

toolChoice?:

'auto' | 'none' | 'required' | { type: 'tool'; toolName: string }
Tool selection strategy.

activeTools?:

string[]
Restricts execution to the named subset of the agent's tools.

modelSettings?:

object
Model-specific settings such as temperature.

requireToolApproval?:

boolean
Require approval for all tool calls, which suspends the run until resumed.

autoResumeSuspendedTools?:

boolean
Automatically resume tools that suspended, instead of waiting for an external `resume()` call.

toolCallConcurrency?:

number
Maximum number of tool calls to execute concurrently.

includeRawChunks?:

boolean
Include raw provider chunks in the stream output.

maxProcessorRetries?:

number
Maximum number of processor retries per generation.

structuredOutput?:

object
Structured output configuration.

versions?:

object
Version overrides for sub-agent delegation.

onChunk?:

(chunk: ChunkType) => void | Promise<void>
Called for each streamed chunk.

onStepFinish?:

(result: AgentStepFinishEventData) => void | Promise<void>
Called when a step in the agentic loop finishes.

onFinish?:

(result: AgentFinishEventData) => void | Promise<void>
Called when the run finishes.

onError?:

(error: Error) => void | Promise<void>
Called when the run errors.

onSuspended?:

(data: AgentSuspendedEventData) => void | Promise<void>
Called when the run suspends, for example for tool approval.

resume() and observe() accept the same lifecycle callbacks (onChunk, onStepFinish, onFinish, onError, onSuspended). observe() also accepts an offset to control where replay starts.

DurableAgentStreamResult
Direct link to DurableAgentStreamResult

The object returned by stream(), resume(), and observe().

interface DurableAgentStreamResult<OUTPUT = undefined> {
output: MastraModelOutput<OUTPUT>
readonly fullStream: ReadableStream<any>
runId: string
threadId?: string
resourceId?: string
cleanup: () => void
}

output:

MastraModelOutput
The streaming output. Await `output.text` for the full text, or consume `output.fullStream`.

fullStream:

ReadableStream
The full event stream, delegating to `output.fullStream`.

runId:

string
The unique run ID. Pass it to `resume()` or `observe()` to reconnect.

threadId?:

string
Thread ID when using memory.

resourceId?:

string
Resource ID when using memory.

cleanup:

() => void
Unsubscribes from PubSub and clears registry entries for the run. Call it when done with the run.