Skip to main content

Agent.streamUntilIdle()

Added in: @mastra/core@1.28.0

streamUntilIdle() streams an agent's response and keeps the stream open until every background task dispatched during the run completes. When a task finishes, its result is written to memory and the agentic loop re-enters automatically so the LLM can react to it. The stream closes once no tasks are running and no completions are queued.

Use it when the agent dispatches background tasks (typically long-running tools or subagents) and you want a single stream that spans the initial response plus every continuation triggered by a task completion. For foreground-only runs or if you prefer to manage the continuation manually (manually prompt agent to process the result), use Agent.stream().

Usage example
Direct link to Usage example

const stream = await agent.streamUntilIdle('Research solana for me', {
memory: { thread: 't1', resource: 'u1' },
})

for await (const chunk of stream.fullStream) {
// chunks from the initial turn AND any continuation turns triggered by
// background task completions flow through here
}
info

streamUntilIdle() requires both a BackgroundTaskManager and a memory backend. Without either, it falls through to a plain agent.stream() call.

Parameters
Direct link to Parameters

messages:

string | string[] | CoreMessage[] | AiMessageType[] | UIMessageWithMetadata[]
The messages to send to the agent. Can be a single string, array of strings, or structured message objects.

options?:

AgentExecutionOptions<Output> & { maxIdleMs?: number }
Accepts every option that Agent.stream() accepts, plus maxIdleMs. See the Agent.stream() reference for the full list.

maxIdleMs?:

number
Closes the outer stream after this many ms of idleness between turns. The timer only runs while the wrapper is between turns, so a slow first token does not close the stream. Default: 5 minutes.

memory?:

{ thread?: string | { id: string }; resource?: string }
Memory thread and resource for the run. Required for continuations to write background task results back into the conversation.

structuredOutput?:

PublicStructuredOutputOptions<Output>
Schema-based structured output. Same shape as Agent.stream(). Note that aggregate properties resolve against the first turn only.

For every other option (maxSteps, modelSettings, toolChoice, outputProcessors, onFinish, onChunk, etc.), see the Agent.stream() parameters. streamUntilIdle() forwards them to the initial turn.

Returns
Direct link to Returns

stream:

MastraModelOutput<Output>
A MastraModelOutput where fullStream spans the initial turn plus every auto-continuation. Aggregate properties (text, toolCalls, toolResults, finishReason, messageList, getFullOutput()) resolve against the first turn only.

Aggregate properties caveat
Direct link to Aggregate properties caveat

streamUntilIdle() returns a proxy over the first turn's MastraModelOutput. Only fullStream is replaced with a combined stream that spans every continuation. Every other property — text, toolCalls, toolResults, finishReason, messageList, getFullOutput() — resolves against the first turn's internal buffer.

If you need an aggregate view across all continuations, consume fullStream yourself and accumulate.

Continuation behavior
Direct link to Continuation behavior

Internally, streamUntilIdle():

  1. Runs the initial turn via agent.stream(...) and pipes its fullStream into the outer stream.
  2. Subscribes to background-task completion events for the resolved memory scope.
  3. Queues each terminal event (background-task-completed, background-task-failed, background-task-cancelled) and, when the outer wrapper is idle between turns, re-invokes agent.stream([], ...) with a directive listing the just-completed toolCallIds. The continuation turn flows into the same outer stream.
  4. Closes the outer stream once no tasks are running and no completions are queued.

Extended usage example
Direct link to Extended usage example

Cap idle time between turns
Direct link to Cap idle time between turns

index.ts
const stream = await agent.streamUntilIdle('Kick off the long jobs', {
memory: { thread: 't1', resource: 'u1' },
maxIdleMs: 60_000, // close the stream after 1 minute of idleness between turns
})

for await (const chunk of stream.fullStream) {
if (chunk.type === 'background-task-completed') {
console.log('Task complete:', chunk.payload.taskId)
}
}

Aggregate text across continuations
Direct link to Aggregate text across continuations

index.ts
const stream = await agent.streamUntilIdle('Research and summarize', {
memory: { thread: 't1', resource: 'u1' },
})

let fullText = ''
for await (const chunk of stream.fullStream) {
if (chunk.type === 'text-delta') {
fullText += chunk.payload.text
}
}