DurableAgent
DurableAgent wraps an existing Agent with durable execution and resumable streams. It runs the agentic loop so that a client can disconnect and reconnect without missing events, and it streams those events over PubSub. Use it when a run must outlive a single request or survive a dropped connection.
Create one with the createDurableAgent factory, or use createEventedAgent for fire-and-forget execution on the built-in workflow engine. For Inngest-powered execution, use createInngestAgent from @mastra/inngest.
Usage exampleDirect link to Usage example
import { Mastra } from '@mastra/core'
import { Agent } from '@mastra/core/agent'
import { createDurableAgent } from '@mastra/core/agent/durable'
const agent = new Agent({
id: 'my-agent',
name: 'My Agent',
instructions: 'You are a helpful assistant',
model: 'openai/gpt-5.5',
})
const durableAgent = createDurableAgent({ agent })
export const mastra = new Mastra({
agents: { myAgent: durableAgent },
})
Stream a response and read the result. The cleanup function unsubscribes from PubSub when you are done with the run:
const { output, runId, cleanup } = await durableAgent.stream('Hello!')
const text = await output.text
cleanup()
createDurableAgent(options)Direct link to createdurableagentoptions
Wraps an Agent with durable execution and resumable streams. This is the recommended way to create a DurableAgent.
import { createDurableAgent } from '@mastra/core/agent/durable'
const durableAgent = createDurableAgent({ agent })
Returns: DurableAgent
ParametersDirect link to Parameters
agent:
id?:
name?:
cache?:
pubsub?:
maxSteps?:
createEventedAgent(options)Direct link to createeventedagentoptions
Wraps an Agent with fire-and-forget durable execution on the built-in workflow engine. Like createDurableAgent, it returns a result you stream from, but the underlying workflow runs non-blocking (via startAsync) instead of running to completion before the stream is wired up. Use it when you want the run to progress independently of the caller. It does not accept id or name overrides.
import { createEventedAgent } from '@mastra/core/agent/durable'
const eventedAgent = createEventedAgent({ agent })
Returns: EventedAgent (a subclass of DurableAgent)
ParametersDirect link to Parameters
agent:
cache?:
pubsub?:
maxSteps?:
Constructor parametersDirect link to Constructor parameters
The DurableAgent class accepts the same options as createDurableAgent, plus cleanupTimeoutMs. Prefer the factory unless you need to subclass.
agent:
id?:
name?:
cache?:
pubsub?:
maxSteps?:
cleanupTimeoutMs?:
MethodsDirect link to Methods
ExecutionDirect link to Execution
stream(messages, options?)Direct link to streammessages-options
Streams a response using durable execution. Returns immediately with a result whose output produces events as the run progresses.
const { output, runId, cleanup } = await durableAgent.stream('Hello!', {
onChunk: chunk => console.log(chunk),
onFinish: result => console.log('done', result),
})
const text = await output.text
cleanup()
Returns: Promise<DurableAgentStreamResult>
resume(runId, resumeData, options?)Direct link to resumerunid-resumedata-options
Resumes a suspended run, for example after a tool approval. Pass the runId from the original stream and the data the run was waiting on. Throws if no registry entry exists for the run.
const { output, cleanup } = await durableAgent.resume(runId, {
approved: true,
})
await output.text
cleanup()
Returns: Promise<DurableAgentStreamResult>
observe(runId, options?)Direct link to observerunid-options
Reconnects to an existing run, replaying cached events before delivering live ones. Use this after a network disconnection. Pass offset to start replay from a known position.
const { output, cleanup } = await durableAgent.observe(runId, {
offset: 0,
onChunk: chunk => console.log(chunk),
})
await output.text
Returns: Promise<DurableAgentStreamResult>
The cleanup() returned by observe() destroys the run's registry entries and cached events. Only call it when you are done with the run. If the run is suspended and you intend to resume later, do not call cleanup() — let the auto-cleanup timer handle it after the run finishes or errors. Auto-cleanup does not fire on suspended events.
Stream optionsDirect link to Stream options
stream() accepts a DurableAgentStreamOptions object. It supports the agent execution options below, plus lifecycle callbacks.
runId?:
instructions?:
context?:
memory?:
requestContext?:
maxSteps?:
toolsets?:
clientTools?:
toolChoice?:
activeTools?:
modelSettings?:
requireToolApproval?:
autoResumeSuspendedTools?:
toolCallConcurrency?:
includeRawChunks?:
maxProcessorRetries?:
structuredOutput?:
versions?:
onChunk?:
onStepFinish?:
onFinish?:
onError?:
onSuspended?:
resume() and observe() accept the same lifecycle callbacks (onChunk, onStepFinish, onFinish, onError, onSuspended). observe() also accepts an offset to control where replay starts.
DurableAgentStreamResultDirect link to DurableAgentStreamResult
The object returned by stream(), resume(), and observe().
interface DurableAgentStreamResult<OUTPUT = undefined> {
output: MastraModelOutput<OUTPUT>
readonly fullStream: ReadableStream<any>
runId: string
threadId?: string
resourceId?: string
cleanup: () => void
}