Agent.stream()
The .stream() method enables real-time streaming of responses from an agent with enhanced capabilities and format flexibility. This method accepts messages and optional streaming options, providing a next-generation streaming experience with support for both Mastra's native format and AI SDK v5+ compatibility.
Usage exampleDirect link to Usage example
const stream = await agent.stream('message for agent')
Model Compatibility: This method is designed for V2 models. V1 models should use the .streamLegacy() method. The framework automatically detects your model version and will throw an error if there's a mismatch.
ParametersDirect link to Parameters
messages:
options?:
OptionsDirect link to Options
maxSteps?:
scorers?:
scorer:
sampling?:
type:
rate?:
onIterationComplete?:
context.iteration:
context.maxIterations:
context.text:
context.isFinal:
context.finishReason:
context.toolCalls:
context.messages:
return.continue?:
return.feedback?:
isTaskComplete?:
scorers:
strategy?:
onComplete?:
parallel?:
timeout?:
delegation?:
onDelegationStart?:
onDelegationComplete?:
messageFilter?:
tracingContext?:
returnScorerData?:
onChunk?:
onError?:
onAbort?:
abortSignal?:
activeTools?:
prepareStep?:
context?:
structuredOutput?:
schema:
model?:
errorStrategy?:
fallbackValue?:
instructions?:
jsonPromptInjection?:
providerOptions?:
outputProcessors?:
includeRawChunks?:
inputProcessors?:
instructions?:
system?:
output?:
memory?:
thread:
resource:
options?:
onFinish?:
onStepFinish?:
telemetry?:
isEnabled?:
recordInputs?:
recordOutputs?:
functionId?:
modelSettings?:
temperature?:
maxOutputTokens?:
maxRetries?:
topP?:
topK?:
presencePenalty?:
frequencyPenalty?:
stopSequences?:
toolChoice?:
'auto':
'none':
'required':
{ type: 'tool'; toolName: string }:
toolsets?:
clientTools?:
savePerStep?:
requireToolApproval?:
autoResumeSuspendedTools?:
toolCallConcurrency?:
providerOptions?:
openai?:
anthropic?:
google?:
[providerName]?:
runId?:
requestContext?:
tracingContext?:
currentSpan?:
tracingOptions?:
metadata?:
requestContextKeys?:
traceId?:
parentSpanId?:
tags?:
ReturnsDirect link to Returns
stream:
traceId?:
Extended usage exampleDirect link to Extended usage example
Mastra Format (Default)Direct link to Mastra Format (Default)
import { stepCountIs } from 'ai-v5'
const stream = await agent.stream('Tell me a story', {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
})
// Access text stream
for await (const chunk of stream.textStream) {
console.log(chunk)
}
// or access full stream
for await (const chunk of stream.fullStream) {
console.log(chunk)
}
// Get full text after streaming
const fullText = await stream.text
AI SDK v5+ FormatDirect link to AI SDK v5+ Format
To use the stream with AI SDK v5 (and later), you can convert it using our utility function toAISdkStream.
import { stepCountIs, createUIMessageStreamResponse } from 'ai'
import { toAISdkStream } from '@mastra/ai-sdk'
const stream = await agent.stream('Tell me a story', {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
})
// In an API route for frontend integration
return createUIMessageStreamResponse({
stream: toAISdkStream(stream, { from: 'agent' }),
})
Using CallbacksDirect link to Using Callbacks
All callback functions are now available as top-level properties for a cleaner API experience.
const stream = await agent.stream('Tell me a story', {
onFinish: result => {
console.log('Streaming finished:', result)
},
onStepFinish: step => {
console.log('Step completed:', step)
},
onChunk: chunk => {
console.log('Received chunk:', chunk)
},
onError: ({ error }) => {
console.error('Streaming error:', error)
},
onAbort: event => {
console.log('Stream aborted:', event)
},
})
// Process the stream
for await (const chunk of stream.textStream) {
console.log(chunk)
}
Advanced Example with OptionsDirect link to Advanced Example with Options
import { z } from 'zod'
import { stepCountIs } from 'ai'
await agent.stream('message for agent', {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
memory: {
thread: 'user-123',
resource: 'test-app',
},
toolChoice: 'auto',
// Structured output with better DX
structuredOutput: {
schema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
confidence: z.number(),
}),
model: 'openai/gpt-5.1',
errorStrategy: 'warn',
},
// Output processors for streaming response validation
outputProcessors: [
new ModerationProcessor({ model: 'openrouter/openai/gpt-oss-safeguard-20b' }),
new BatchPartsProcessor({ maxBatchSize: 3, maxWaitTime: 100 }),
],
})
OpenAI WebSocket TransportDirect link to OpenAI WebSocket Transport
Opt into OpenAI Responses WebSocket streaming via providerOptions.openai.transport. This only applies to streaming calls and is currently supported for direct OpenAI models (for example, openai/gpt-4o). If WebSocket streaming is unavailable, Mastra falls back to HTTP streaming. By default, Mastra closes the WebSocket when the stream finishes.
const stream = await agent.stream('Hello', {
providerOptions: {
openai: {
transport: 'websocket', // 'websocket' | 'fetch' | 'auto'
websocket: {
url: 'wss://api.openai.com/v1/responses',
closeOnFinish: true, // default
},
},
},
})
To keep the connection open after the stream finishes, set closeOnFinish: false and close it manually.
const stream = await agent.stream('Hello', {
providerOptions: {
openai: {
transport: 'websocket',
websocket: { closeOnFinish: false },
},
},
})
// Later, when you're done with the connection:
stream.transport?.close()
RelatedDirect link to Related
- Generating responses
- Streaming responses
- Agent Approval
- Agent Networks - Using the supervisor pattern for multi-agent coordination
- Migration: .network() to Supervisor Pattern
- Guide: Research Coordinator