Skip to main content

Agent.stream()

The .stream() method enables real-time streaming of responses from an agent with enhanced capabilities and format flexibility. This method accepts messages and optional streaming options, providing a next-generation streaming experience with support for both Mastra's native format and AI SDK v5 compatibility.

Usage example
Direct link to Usage example

index.ts
const stream = await agent.stream("message for agent");
info

Model Compatibility: This method is designed for V2 models. V1 models should use the .streamLegacy() method. The framework automatically detects your model version and will throw an error if there's a mismatch.

Parameters
Direct link to Parameters

messages:

string | string[] | CoreMessage[] | AiMessageType[] | UIMessageWithMetadata[]
The messages to send to the agent. Can be a single string, array of strings, or structured message objects.

options?:

AgentExecutionOptions<Output, Format>
Optional configuration for the streaming process.

Options
Direct link to Options

maxSteps?:

number
Maximum number of steps to run during execution.

scorers?:

MastraScorers | Record<string, { scorer: MastraScorer['name']; sampling?: ScoringSamplingConfig }>
Evaluation scorers to run on the execution results.

scorer:

string
Name of the scorer to use.

sampling?:

ScoringSamplingConfig
Sampling configuration for the scorer.

tracingContext?:

TracingContext
Tracing context for span hierarchy and metadata.

returnScorerData?:

boolean
Whether to return detailed scoring data in the response.

onChunk?:

(chunk: ChunkType) => Promise<void> | void
Callback function called for each chunk during streaming.

onError?:

({ error }: { error: Error | string }) => Promise<void> | void
Callback function called when an error occurs during streaming.

onAbort?:

(event: any) => Promise<void> | void
Callback function called when the stream is aborted.

abortSignal?:

AbortSignal
Signal object that allows you to abort the agent's execution. When the signal is aborted, all ongoing operations will be terminated.

activeTools?:

Array<keyof ToolSet> | undefined
Array of active tool names that can be used during execution.

prepareStep?:

PrepareStepFunction<any>
Callback function called before each step of multi-step execution.

context?:

ModelMessage[]
Additional context messages to provide to the agent.

structuredOutput?:

StructuredOutputOptions<S extends ZodTypeAny = ZodTypeAny>
Options to fine tune your structured output generation.

schema:

z.ZodSchema<S>
Zod schema defining the expected output structure.

model?:

MastraLanguageModel
Language model to use for structured output generation. If provided, enables the agent to respond in multi step with tool calls, text, and structured output

errorStrategy?:

'strict' | 'warn' | 'fallback'
Strategy for handling schema validation errors. 'strict' throws errors, 'warn' logs warnings, 'fallback' uses fallback values.

fallbackValue?:

<S extends ZodTypeAny>
Fallback value to use when schema validation fails and errorStrategy is 'fallback'.

instructions?:

string
Additional instructions for the structured output model.

jsonPromptInjection?:

boolean
Injects system prompt into the main agent instructing it to return structured output, useful for when a model does not natively support structured outputs.

providerOptions?:

ProviderOptions
Provider-specific options passed to the internal structuring agent. Use this to control model behavior like reasoning effort for thinking models (e.g., `{ openai: { reasoningEffort: 'low' } }`).

outputProcessors?:

Processor[]
Overrides the output processors set on the agent. Output processors that can modify or validate messages from the agent before they are returned to the user. Must implement either (or both) of the `processOutputResult` and `processOutputStream` functions.

includeRawChunks?:

boolean
Whether to include raw chunks in the stream output (not available on all model providers).

inputProcessors?:

Processor[]
Overrides the input processors set on the agent. Input processors that can modify or validate messages before they are processed by the agent. Must implement the `processInput` function.

instructions?:

string
Custom instructions that override the agent's default instructions for this specific generation. Useful for dynamically modifying agent behavior without creating a new agent instance.

system?:

string | string[] | CoreSystemMessage | SystemModelMessage | CoreSystemMessage[] | SystemModelMessage[]
Custom system message(s) to include in the prompt. Can be a single string, message object, or array of either. System messages provide additional context or behavior instructions that supplement the agent's main instructions.

output?:

Zod schema | JsonSchema7
**Deprecated.** Use structuredOutput without a model to achieve the same thing. Defines the expected structure of the output. Can be a JSON Schema object or a Zod schema.

memory?:

object
Configuration for memory. This is the preferred way to manage memory.

thread:

string | { id: string; metadata?: Record<string, any>, title?: string }
The conversation thread, as a string ID or an object with an `id` and optional `metadata`.

resource:

string
Identifier for the user or resource associated with the thread.

options?:

MemoryConfig
Configuration for memory behavior, like message history and semantic recall.

onFinish?:

StreamTextOnFinishCallback<any> | StreamObjectOnFinishCallback<OUTPUT>
Callback function called when streaming completes. Receives the final result.

onStepFinish?:

StreamTextOnStepFinishCallback<any> | never
Callback function called after each execution step. Receives step details as a JSON string. Unavailable for structured output

resourceId?:

string
**Deprecated.** Use `memory.resource` instead. Identifier for the user or resource interacting with the agent. Must be provided if threadId is provided.

telemetry?:

TelemetrySettings
Settings for OTLP telemetry collection during streaming (not Tracing).

isEnabled?:

boolean
Enable or disable telemetry. Disabled by default while experimental.

recordInputs?:

boolean
Enable or disable input recording. Enabled by default. You might want to disable input recording to avoid recording sensitive information.

recordOutputs?:

boolean
Enable or disable output recording. Enabled by default. You might want to disable output recording to avoid recording sensitive information.

functionId?:

string
Identifier for this function. Used to group telemetry data by function.

modelSettings?:

CallSettings
Model-specific settings like temperature, maxOutputTokens, topP, etc. These settings control how the language model generates responses.

temperature?:

number
Controls randomness in generation (0-2). Higher values make output more random.

maxOutputTokens?:

number
Maximum number of tokens to generate in the response. Note: Use maxOutputTokens (not maxTokens) as per AI SDK v5 convention.

maxRetries?:

number
Maximum number of retry attempts for failed requests.

topP?:

number
Nucleus sampling parameter (0-1). Controls diversity of generated text.

topK?:

number
Top-k sampling parameter. Limits vocabulary to k most likely tokens.

presencePenalty?:

number
Penalty for token presence (-2 to 2). Reduces repetition.

frequencyPenalty?:

number
Penalty for token frequency (-2 to 2). Reduces repetition of frequent tokens.

stopSequences?:

string[]
Stop sequences. If set, the model will stop generating text when one of the stop sequences is generated.

threadId?:

string
**Deprecated.** Use `memory.thread` instead. Identifier for the conversation thread. Allows for maintaining context across multiple interactions. Must be provided if resourceId is provided.

toolChoice?:

'auto' | 'none' | 'required' | { type: 'tool'; toolName: string }
= 'auto'
Controls how the agent uses tools during streaming.

'auto':

string
Let the model decide whether to use tools (default).

'none':

string
Do not use any tools.

'required':

string
Require the model to use at least one tool.

{ type: 'tool'; toolName: string }:

object
Require the model to use a specific tool by name.

toolsets?:

ToolsetsInput
Additional toolsets to make available to the agent during streaming.

clientTools?:

ToolsInput
Tools that are executed on the 'client' side of the request. These tools do not have execute functions in the definition.

savePerStep?:

boolean
Save messages incrementally after each stream step completes (default: false).

providerOptions?:

Record<string, Record<string, JSONValue>>
Additional provider-specific options that are passed through to the underlying LLM provider. The structure is `{ providerName: { optionKey: value } }`. For example: `{ openai: { reasoningEffort: 'high' }, anthropic: { maxTokens: 1000 } }`.

openai?:

Record<string, JSONValue>
OpenAI-specific options. Example: `{ reasoningEffort: 'high' }`

anthropic?:

Record<string, JSONValue>
Anthropic-specific options. Example: `{ maxTokens: 1000 }`

google?:

Record<string, JSONValue>
Google-specific options. Example: `{ safetySettings: [...] }`

[providerName]?:

Record<string, JSONValue>
Other provider-specific options. The key is the provider name and the value is a record of provider-specific options.

runId?:

string
Unique ID for this generation run. Useful for tracking and debugging purposes.

requestContext?:

RequestContext
Request Context for dependency injection and contextual information.

tracingContext?:

TracingContext
Tracing context for creating child spans and adding metadata. Automatically injected when using Mastra's tracing system.

currentSpan?:

Span
Current span for creating child spans and adding metadata. Use this to create custom child spans or update span attributes during execution.

tracingOptions?:

TracingOptions
Options for Tracing configuration.

metadata?:

Record<string, any>
Metadata to add to the root trace span. Useful for adding custom attributes like user IDs, session IDs, or feature flags.

requestContextKeys?:

string[]
Additional RequestContext keys to extract as metadata for this trace. Supports dot notation for nested values (e.g., 'user.id').

traceId?:

string
Trace ID to use for this execution (1-32 hexadecimal characters). If provided, this trace will be part of the specified trace.

parentSpanId?:

string
Parent span ID to use for this execution (1-16 hexadecimal characters). If provided, the root span will be created as a child of this span.

tags?:

string[]
Tags to apply to this trace. String labels for categorizing and filtering traces.

Returns
Direct link to Returns

stream:

MastraModelOutput<Output>
Returns a MastraModelOutput instance that provides access to the streaming output.

traceId?:

string
The trace ID associated with this execution when Tracing is enabled. Use this to correlate logs and debug execution flow.

Extended usage example
Direct link to Extended usage example

Mastra Format (Default)
Direct link to Mastra Format (Default)

index.ts
import { stepCountIs } from "ai-v5";

const stream = await agent.stream("Tell me a story", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
});

// Access text stream
for await (const chunk of stream.textStream) {
console.log(chunk);
}

// or access full stream
for await (const chunk of stream.fullStream) {
console.log(chunk);
}

// Get full text after streaming
const fullText = await stream.text;

AI SDK v5 Format
Direct link to AI SDK v5 Format

To use the stream with AI SDK v5, you can convert it using our utility function toAISdkStream.

index.ts
import { stepCountIs, createUIMessageStreamResponse } from "ai";
import { toAISdkStream } from "@mastra/ai-sdk";

const stream = await agent.stream("Tell me a story", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
});

// In an API route for frontend integration
return createUIMessageStreamResponse({
stream: toAISdkStream(stream, { from: "agent" }),
})

Using Callbacks
Direct link to Using Callbacks

All callback functions are now available as top-level properties for a cleaner API experience.

index.ts
const stream = await agent.stream("Tell me a story", {
onFinish: (result) => {
console.log("Streaming finished:", result);
},
onStepFinish: (step) => {
console.log("Step completed:", step);
},
onChunk: (chunk) => {
console.log("Received chunk:", chunk);
},
onError: ({ error }) => {
console.error("Streaming error:", error);
},
onAbort: (event) => {
console.log("Stream aborted:", event);
},
});

// Process the stream
for await (const chunk of stream.textStream) {
console.log(chunk);
}

Advanced Example with Options
Direct link to Advanced Example with Options

index.ts
import { z } from "zod";
import { stepCountIs } from "ai";

await agent.stream("message for agent", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
memory: {
thread: "user-123",
resource: "test-app",
},
toolChoice: "auto",
// Structured output with better DX
structuredOutput: {
schema: z.object({
sentiment: z.enum(["positive", "negative", "neutral"]),
confidence: z.number(),
}),
model: "openai/gpt-5.1",
errorStrategy: "warn",
},
// Output processors for streaming response validation
outputProcessors: [
new ModerationProcessor({ model: "openrouter/openai/gpt-oss-safeguard-20b" }),
new BatchPartsProcessor({ maxBatchSize: 3, maxWaitTime: 100 }),
],
});