Skip to main content

BatchPartsProcessor

The BatchPartsProcessor is an output processor that batches multiple stream parts together to reduce the frequency of emissions during streaming. This processor is useful for reducing network overhead, improving user experience by consolidating small text chunks, and optimizing streaming performance by controlling when parts are emitted to the client.

Usage example

import { BatchPartsProcessor } from "@mastra/core/processors";

const processor = new BatchPartsProcessor({
batchSize: 5,
maxWaitTime: 100,
emitOnNonText: true,
});

Constructor parameters

options?:

Options
Configuration options for batching stream parts

Options

batchSize?:

number
Number of parts to batch together before emitting

maxWaitTime?:

number
Maximum time to wait before emitting a batch (in milliseconds). If set, will emit the current batch even if it hasn't reached batchSize

emitOnNonText?:

boolean
Whether to emit immediately when a non-text part is encountered

Returns

name:

string
Processor name set to 'batch-parts'

processOutputStream:

(args: { part: ChunkType; streamParts: ChunkType[]; state: Record<string, any>; abort: (reason?: string) => never }) => Promise<ChunkType | null>
Processes streaming output parts to batch them together

flush:

(state?: BatchPartsState) => ChunkType | null
Force flush any remaining batched parts when the stream ends

Extended usage example

import { openai } from "@ai-sdk/openai";
import { Agent } from "@mastra/core/agent";
import { BatchPartsProcessor } from "@mastra/core/processors";

export const agent = new Agent({
name: "batched-agent",
instructions: "You are a helpful assistant",
model: openai("gpt-4o-mini"),
outputProcessors: [
new BatchPartsProcessor({
batchSize: 5,
maxWaitTime: 100,
emitOnNonText: true,
}),
],
});