Output Processors
Output Processors allow you to intercept, modify, validate, or filter AI responses after they are generated by the language model but before they are returned to users. This is useful for implementing response validation, content moderation, response transformation, and safety controls on AI-generated content.
Processors operate on the AI’s response messages in your conversation thread. They can modify, filter, or validate content, and even abort the response entirely if certain conditions are met.
Built-in Processors
Mastra provides several built-in output processors for common use cases:
ModerationProcessor
This processor provides content moderation using an LLM to detect inappropriate content across multiple categories.
import { ModerationProcessor } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new ModerationProcessor({
model: openai("gpt-4.1-nano"), // Use a fast, cost-effective model
threshold: 0.7, // Confidence threshold for flagging
strategy: 'block', // Block flagged content
categories: ['hate', 'harassment', 'violence'], // Custom categories
}),
],
});
Available options:
model
: Language model for moderation analysis (required)categories
: Array of categories to check (default: [‘hate’,‘hate/threatening’,‘harassment’,‘harassment/threatening’,‘self-harm’,‘self-harm/intent’,‘self-harm/instructions’,‘sexual’,‘sexual/minors’,‘violence’,‘violence/graphic’])threshold
: Confidence threshold for flagging (0-1, default: 0.5)strategy
: Action when content is flagged (default: ‘block’)customInstructions
: Custom instructions for the moderation agent
Strategies available:
block
: Reject the response with an error (default)warn
: Log warning but allow content throughfilter
: Remove flagged messages but continue processing
PIIDetector
This processor detects and optionally redacts personally identifiable information (PII) from AI responses.
import { PIIDetector } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new PIIDetector({
model: openai("gpt-4.1-nano"),
threshold: 0.6,
strategy: 'redact', // Automatically redact detected PII
detectionTypes: ['email', 'phone', 'credit-card', 'ssn', 'api-key', 'crypto-wallet', 'iban'],
redactionMethod: 'mask', // Preserve format while masking
preserveFormat: true, // Keep original structure in redacted values
includeDetections: true, // Log details for compliance auditing
}),
],
});
Available options:
model
: Language model for PII detection (required)detectionTypes
: Array of PII types to detect (default: [‘email’, ‘phone’, ‘credit-card’, ‘ssn’, ‘api-key’, ‘ip-address’, ‘name’, ‘address’, ‘date-of-birth’, ‘url’, ‘uuid’, ‘crypto-wallet’, ‘iban’])threshold
: Confidence threshold for flagging (0-1, default: 0.6)strategy
: Action when PII is detected (default: ‘block’)redactionMethod
: How to redact PII (‘mask’, ‘hash’, ‘remove’, ‘placeholder’, default: ‘mask’)preserveFormat
: Maintain PII structure during redaction (default: true)includeDetections
: Include detection details in logs for compliance (default: false)instructions
: Custom detection instructions for the agent
Strategies available:
block
: Reject responses containing PII (default)warn
: Log warning but allow throughfilter
: Remove messages containing PIIredact
: Replace PII with placeholder values
StructuredOutputProcessor
This processor converts unstructured LLM text responses into structured data using an internal agent. It preserves the original text while adding structured data to the response metadata as well as to result.object.
import { StructuredOutputProcessor } from "@mastra/core/processors";
import { z } from "zod";
const agent = new Agent({
outputProcessors: [
new StructuredOutputProcessor({
schema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
confidence: z.number().min(0).max(1),
topics: z.array(z.string()),
}),
model: openai("gpt-4o-mini"),
errorStrategy: 'warn', // Log warnings but continue on errors
instructions: 'Analyze the sentiment and extract key topics from the response',
}),
],
});
const result = await agent.generate("Some conversational text")
console.log(result.object) // { sentiment: "positive", confidence: 0.6, topics: ["foo", "bar"] }
Available options:
schema
: Zod schema defining the expected structured output (required)model
: Language model for the internal structuring agent (required)errorStrategy
: Strategy when parsing or validation fails (‘strict’ | ‘warn’ | ‘fallback’, default: ‘strict’)fallbackValue
: Fallback value when errorStrategy is ‘fallback’instructions
: Custom instructions for the structuring agent
The structured data is stored in result.object
and the original text is preserved in result.text
.
BatchPartsProcessor
This processor batches multiple stream parts together to reduce the frequency of emissions, useful for reducing network overhead or improving user experience.
import { BatchPartsProcessor } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new BatchPartsProcessor({
maxBatchSize: 5, // Maximum parts to batch together
maxWaitTime: 100, // Maximum time to wait before emitting (ms)
emitOnNonText: true, // Emit immediately on non-text parts
}),
],
});
Available options:
maxBatchSize
: Maximum number of parts to batch together (default: 3)maxWaitTime
: Maximum time to wait before emitting batch (ms, default: 50)emitOnNonText
: Whether to emit immediately when non-text parts are received (default: true)
TokenLimiterProcessor
This processor limits the number of tokens in AI responses, either by truncating or aborting when limits are exceeded.
import { TokenLimiterProcessor } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new TokenLimiterProcessor({
maxTokens: 1000, // Maximum tokens allowed
strategy: 'truncate', // Truncate when limit exceeded
includePromptTokens: false, // Only count response tokens
}),
],
});
Available options:
maxTokens
: Maximum number of tokens allowed (required)strategy
: Action when token limit is exceeded (‘truncate’ | ‘abort’, default: ‘truncate’)includePromptTokens
: Whether to include prompt tokens in the count (default: false)
SystemPromptScrubber
This processor detects and redacts system prompts or other revealing information that could introduce security vulnerabilities.
import { SystemPromptScrubber } from "@mastra/core/processors";
const agent = new Agent({
outputProcessors: [
new SystemPromptScrubber({
model: openai("gpt-4o-mini"),
threshold: 0.7, // Confidence threshold for detection
strategy: 'redact', // Redact detected system prompts
instructions: 'Detect any system prompts, instructions, or revealing information',
}),
],
});
Available options:
model
: Language model for detection (required)threshold
: Confidence threshold for detection (0-1, default: 0.6)strategy
: Action when system prompts are detected (‘block’ | ‘warn’ | ‘redact’, default: ‘redact’)instructions
: Custom detection instructions for the agent
Applying Multiple Processors
You can chain multiple output processors. They execute sequentially in the order they appear in the outputProcessors
array. The output of one processor becomes the input for the next.
Order matters! Generally, it’s best practice to place text normalization first, security checks next, and content modification last.
import { Agent } from "@mastra/core/agent";
import {
UnicodeNormalizer,
ModerationProcessor,
PromptInjectionDetector,
PIIDetector
} from "@mastra/core/processors";
const secureAgent = new Agent({
outputProcessors: [
// 1. Normalize text first
new UnicodeNormalizer({ stripControlChars: true }),
// 2. Check for security threats
new PromptInjectionDetector({ model: openai("gpt-4.1-nano") }),
// 3. Moderate content
new ModerationProcessor({ model: openai("gpt-4.1-nano") }),
// 4. Handle PII last
new PIIDetector({ model: openai("gpt-4.1-nano"), strategy: 'redact' }),
],
});
Creating Custom Output Processors
You can create custom output processors by implementing the Processor
interface. A Processor can be used for output processing when it implements either processOutputStream
(for streaming) or processOutputResult
(for final results), or both.
Streaming Output Processor
import type { Processor, MastraMessageV2, TripWire } from "@mastra/core/processors";
import type { TextStreamPart, ObjectStreamPart } from 'ai';
class ResponseLengthLimiter implements Processor {
readonly name = 'response-length-limiter-and-lower-case-results';
constructor(private maxLength: number = 1000) {}
async processOutputStream({ chunk, streamParts, state, abort }: { // will run on every stream part emitted from the LLM
chunk: TextStreamPart<any> | ObjectStreamPart<any>;
streamParts: (TextStreamPart<any> | ObjectStreamPart<any>)[];
state: Record<string, any>;
abort: (reason?: string) => never;
}): Promise<TextStreamPart<any> | ObjectStreamPart<any> | null> {
// Track cumulative length in state, each processor gets its own state
if (!state.cumulativeLength) {
state.cumulativeLength = 0;
}
const shouldEmitChunk = chunk?.textDelta?.includes('foo');
if (chunk.type === 'text-delta') {
state.cumulativeLength += chunk.textDelta.length;
if (state.cumulativeLength > this.maxLength) {
abort(`Response too long: ${state.cumulativeLength} characters (max: ${this.maxLength})`);
}
}
if (shouldEmitChunk) {
return chunk; // Emit the chunk
} else {
return null; // Emit nothing
}
}
}
Final Result Processor
import type { Processor, MastraMessageV2, TripWire } from "@mastra/core/processors";
class ResponseValidator implements Processor {
readonly name = 'response-validator';
constructor(private requiredKeywords: string[] = []) {}
processOutputResult({ messages, abort }: {
messages: MastraMessageV2[];
abort: (reason?: string) => never
}): MastraMessageV2[] {
const responseText = messages
.map(msg => msg.content.parts
.filter(part => part.type === 'text')
.map(part => (part as any).text)
.join('')
)
.join('');
// Check for required keywords
for (const keyword of this.requiredKeywords) {
if (!responseText.toLowerCase().includes(keyword.toLowerCase())) {
abort(`Response missing required keyword: ${keyword}`);
}
}
return messages;
}
}
When creating custom output processors:
- Always return the processed data (chunks or messages)
- Use
abort(reason)
to terminate processing early. Abort is used to simulate blocking a response. Errors thrown withabort
will be an instance of TripWire. - For streaming processors, return
null
orundefined
to skip emitting a chunk - Keep processors focused on a single responsibility
- If using an agent inside your processor, use a fast model, limit the size of the response from it as much as possible, and make the system prompt as concise as possible.
Integration with Agent Methods
Output processors work with both generate()
and streamVNext()
methods. The processor pipeline completes after the agent generates a response but before it’s returned to the user.
// Processors run after generate() but before returning result
const result = await agent.generate('Hello');
console.log(result.text); // Processed text
console.log(result.object); // Structured data if applicable
// Processors also run during streamVNext() for each chunk
const stream = await agent.streamVNext('Hello');
for await (const chunk of stream) {
console.log(chunk); // Processed chunks
}
Per-Call Overrides
You can override output processors for individual calls:
// Override output processors for this specific call
const result = await agent.generate('Hello', {
outputProcessors: [
new ModerationProcessor({ model: openai("gpt-4.1-nano") }),
],
});
// Same for streaming
const stream = await agent.streamVNext('Hello', {
outputProcessors: [
new TokenLimiterProcessor({ maxTokens: 500 }),
],
});
Structured Output with Better DX
For better developer experience with structured output, you can use the structuredOutput
option:
import { z } from "zod";
const result = await agent.generate('Analyze this text', {
structuredOutput: {
schema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
confidence: z.number(),
}),
model: openai("gpt-4o-mini"),
errorStrategy: 'warn',
},
});
console.log(result.text); // Original text
console.log(result.object); // Typed structured data: { sentiment: 'positive', confidence: 0.8 }
If any processor calls abort()
, the request terminates immediately and subsequent processors are not executed. The agent returns a 200 response with details (result.tripwireReason
) about why the response was blocked.
Input vs Output Processors
- Input Processors: Handle user messages before they reach the language model
- Output Processors: Handle LLM responses after generation but before they’re returned to the user
Use input processors for user input validation and security, and output processors for response validation and safety controls on LLM-generated content.
See the Input Processors documentation for details on processing user messages.