# DurableAgent `DurableAgent` wraps an existing [`Agent`](https://mastra.ai/reference/agents/agent) with durable execution and resumable streams. It runs the agentic loop so that a client can disconnect and reconnect without missing events, and it streams those events over [PubSub](https://mastra.ai/docs/server/pubsub). Use it when a run must outlive a single request or survive a dropped connection. Create one with the [`createDurableAgent`](#createdurableagentoptions) factory, or use [`createEventedAgent`](#createeventedagentoptions) for fire-and-forget execution on the built-in workflow engine. For Inngest-powered execution, use `createInngestAgent` from `@mastra/inngest`. ## Usage example ```typescript import { Mastra } from '@mastra/core' import { Agent } from '@mastra/core/agent' import { createDurableAgent } from '@mastra/core/agent/durable' const agent = new Agent({ id: 'my-agent', name: 'My Agent', instructions: 'You are a helpful assistant', model: 'openai/gpt-5.5', }) const durableAgent = createDurableAgent({ agent }) export const mastra = new Mastra({ agents: { myAgent: durableAgent }, }) ``` Stream a response and read the result. The `cleanup` function unsubscribes from PubSub when you are done with the run: ```typescript const { output, runId, cleanup } = await durableAgent.stream('Hello!') const text = await output.text cleanup() ``` ## `createDurableAgent(options)` Wraps an `Agent` with durable execution and resumable streams. This is the recommended way to create a `DurableAgent`. ```typescript import { createDurableAgent } from '@mastra/core/agent/durable' const durableAgent = createDurableAgent({ agent }) ``` Returns: `DurableAgent` ### Parameters **agent** (`Agent`): The Agent to wrap with durable execution capabilities. Agent methods delegate to this agent. **id** (`string`): ID override. (Default: `agent.id`) **name** (`string`): Name override. (Default: `agent.name`) **cache** (`MastraServerCache | false`): Cache for stored stream events, which enables resumable streams. If omitted, the agent inherits the cache from the Mastra instance, or uses an InMemoryServerCache. Set to \`false\` to disable caching, which makes streams non-resumable. **pubsub** (`PubSub`): PubSub instance for streaming events. (Default: `EventEmitterPubSub`) **maxSteps** (`number`): Maximum number of steps for the agentic loop. ## `createEventedAgent(options)` Wraps an `Agent` with fire-and-forget durable execution on the built-in workflow engine. Like `createDurableAgent`, it returns a result you stream from, but the underlying workflow runs non-blocking (via `startAsync`) instead of running to completion before the stream is wired up. Use it when you want the run to progress independently of the caller. It does not accept `id` or `name` overrides. ```typescript import { createEventedAgent } from '@mastra/core/agent/durable' const eventedAgent = createEventedAgent({ agent }) ``` Returns: `EventedAgent` (a subclass of `DurableAgent`) ### Parameters **agent** (`Agent`): The Agent to wrap with evented durable execution capabilities. **cache** (`MastraServerCache | false`): Cache for stored stream events, which enables resumable streams. If omitted, the agent inherits the cache from the Mastra instance, or uses an InMemoryServerCache. Set to \`false\` to disable caching. **pubsub** (`PubSub`): PubSub instance for streaming events. (Default: `EventEmitterPubSub`) **maxSteps** (`number`): Maximum number of steps for the agentic loop. ## Constructor parameters The `DurableAgent` class accepts the same options as `createDurableAgent`, plus `cleanupTimeoutMs`. Prefer the factory unless you need to subclass. **agent** (`Agent`): The Agent to wrap with durable execution capabilities. **id** (`string`): ID override. (Default: `agent.id`) **name** (`string`): Name override. (Default: `agent.name`) **cache** (`MastraServerCache | false`): Cache for stored stream events. If omitted, inherits from the Mastra instance or uses an InMemoryServerCache. Set to \`false\` to disable caching. **pubsub** (`PubSub`): PubSub instance for streaming events. (Default: `EventEmitterPubSub`) **maxSteps** (`number`): Maximum number of steps for the agentic loop. **cleanupTimeoutMs** (`number`): Grace period in milliseconds before registry entries are cleaned up automatically after a stream finishes or errors. Set to 0 to disable auto-cleanup and require a manual \`cleanup()\` call. Auto-cleanup does not fire on suspended events. (Default: `30000`) ## Methods ### Execution #### `stream(messages, options?)` Streams a response using durable execution. Returns immediately with a result whose `output` produces events as the run progresses. ```typescript const { output, runId, cleanup } = await durableAgent.stream('Hello!', { onChunk: chunk => console.log(chunk), onFinish: result => console.log('done', result), }) const text = await output.text cleanup() ``` Returns: [`Promise`](#durableagentstreamresult) #### `resume(runId, resumeData, options?)` Resumes a suspended run, for example after a tool approval. Pass the `runId` from the original stream and the data the run was waiting on. Throws if no registry entry exists for the run. ```typescript const { output, cleanup } = await durableAgent.resume(runId, { approved: true, }) await output.text cleanup() ``` Returns: [`Promise`](#durableagentstreamresult) #### `observe(runId, options?)` Reconnects to an existing run, replaying cached events before delivering live ones. Use this after a network disconnection. Pass `offset` to start replay from a known position. ```typescript const { output, cleanup } = await durableAgent.observe(runId, { offset: 0, onChunk: chunk => console.log(chunk), }) await output.text ``` Returns: `Promise` > **Warning:** The `cleanup()` returned by `observe()` destroys the run's registry entries and cached events. Only call it when you are done with the run. If the run is suspended and you intend to resume later, do not call `cleanup()` — let the auto-cleanup timer handle it after the run finishes or errors. Auto-cleanup does not fire on suspended events. ## Stream options `stream()` accepts a `DurableAgentStreamOptions` object. It supports the agent execution options below, plus lifecycle callbacks. **runId** (`string`): Unique identifier for this run. Use it later with \`resume()\` or \`observe()\`. **instructions** (`AgentExecutionOptions['instructions']`): Overrides the agent's default instructions for this run. Accepts a static string or the same dynamic instructions value the agent supports. **context** (`ModelMessage[]`): Additional context messages to provide to the agent. **memory** (`object`): Memory configuration for conversation persistence and retrieval. **requestContext** (`RequestContext`): Request context carrying dynamic configuration and state for this run. **maxSteps** (`number`): Maximum number of steps to run for this stream. **toolsets** (`object`): Additional tool sets available for this run. **clientTools** (`object`): Client-side tools available during execution. **toolChoice** (`'auto' | 'none' | 'required' | { type: 'tool'; toolName: string }`): Tool selection strategy. **activeTools** (`string[]`): Restricts execution to the named subset of the agent's tools. **modelSettings** (`object`): Model-specific settings such as temperature. **requireToolApproval** (`boolean`): Require approval for all tool calls, which suspends the run until resumed. **autoResumeSuspendedTools** (`boolean`): Automatically resume tools that suspended, instead of waiting for an external \`resume()\` call. **toolCallConcurrency** (`number`): Maximum number of tool calls to execute concurrently. **includeRawChunks** (`boolean`): Include raw provider chunks in the stream output. **maxProcessorRetries** (`number`): Maximum number of processor retries per generation. **structuredOutput** (`object`): Structured output configuration. **versions** (`object`): Version overrides for sub-agent delegation. **onChunk** (`(chunk: ChunkType) => void | Promise`): Called for each streamed chunk. **onStepFinish** (`(result: AgentStepFinishEventData) => void | Promise`): Called when a step in the agentic loop finishes. **onFinish** (`(result: AgentFinishEventData) => void | Promise`): Called when the run finishes. **onError** (`(error: Error) => void | Promise`): Called when the run errors. **onSuspended** (`(data: AgentSuspendedEventData) => void | Promise`): Called when the run suspends, for example for tool approval. `resume()` and `observe()` accept the same lifecycle callbacks (`onChunk`, `onStepFinish`, `onFinish`, `onError`, `onSuspended`). `observe()` also accepts an `offset` to control where replay starts. ## DurableAgentStreamResult The object returned by `stream()`, `resume()`, and `observe()`. ```typescript interface DurableAgentStreamResult { output: MastraModelOutput readonly fullStream: ReadableStream runId: string threadId?: string resourceId?: string cleanup: () => void } ``` **output** (`MastraModelOutput`): The streaming output. Await \`output.text\` for the full text, or consume \`output.fullStream\`. **fullStream** (`ReadableStream`): The full event stream, delegating to \`output.fullStream\`. **runId** (`string`): The unique run ID. Pass it to \`resume()\` or \`observe()\` to reconnect. **threadId** (`string`): Thread ID when using memory. **resourceId** (`string`): Resource ID when using memory. **cleanup** (`() => void`): Unsubscribes from PubSub and clears registry entries for the run. Call it when done with the run. ## Related - [Agent class](https://mastra.ai/reference/agents/agent) - [PubSub](https://mastra.ai/docs/server/pubsub) - [`.getMemory()`](https://mastra.ai/reference/agents/getMemory)