BatchPartsProcessor
The BatchPartsProcessor
is an output processor that batches multiple stream parts together to reduce the frequency of emissions during streaming. This processor is useful for reducing network overhead, improving user experience by consolidating small text chunks, and optimizing streaming performance by controlling when parts are emitted to the client.
Usage example
import { BatchPartsProcessor } from "@mastra/core/processors";
const processor = new BatchPartsProcessor({
batchSize: 5,
maxWaitTime: 100,
emitOnNonText: true
});
Constructor parameters
options?:
Options
Configuration options for batching stream parts
Options
batchSize?:
number
Number of parts to batch together before emitting
maxWaitTime?:
number
Maximum time to wait before emitting a batch (in milliseconds). If set, will emit the current batch even if it hasn't reached batchSize
emitOnNonText?:
boolean
Whether to emit immediately when a non-text part is encountered
Returns
name:
string
Processor name set to 'batch-parts'
processOutputStream:
(args: { part: ChunkType; streamParts: ChunkType[]; state: Record<string, any>; abort: (reason?: string) => never }) => Promise<ChunkType | null>
Processes streaming output parts to batch them together
flush:
(state?: BatchPartsState) => ChunkType | null
Force flush any remaining batched parts when the stream ends
Extended usage example
src/mastra/agents/batched-agent.ts
import { openai } from "@ai-sdk/openai";
import { Agent } from "@mastra/core/agent";
import { BatchPartsProcessor } from "@mastra/core/processors";
export const agent = new Agent({
name: "batched-agent",
instructions: "You are a helpful assistant",
model: openai("gpt-4o-mini"),
outputProcessors: [
new BatchPartsProcessor({
batchSize: 5,
maxWaitTime: 100,
emitOnNonText: true
})
]
});