Skip to Content
DocsAgentsOutput Processors

Output Processors

Output Processors allow you to intercept, modify, validate, or filter AI responses after they are generated by the language model but before they are returned to users. This is useful for implementing response validation, content moderation, response transformation, and safety controls on AI-generated content.

Processors operate on the AI’s response messages in your conversation thread. They can modify, filter, or validate content, and even abort the response entirely if certain conditions are met.

Built-in Processors

Mastra provides several built-in output processors for common use cases:

ModerationProcessor

This processor provides content moderation using an LLM to detect inappropriate content across multiple categories.

import { ModerationProcessor } from "@mastra/core/processors"; const agent = new Agent({ outputProcessors: [ new ModerationProcessor({ model: openai("gpt-4.1-nano"), // Use a fast, cost-effective model threshold: 0.7, // Confidence threshold for flagging strategy: 'block', // Block flagged content categories: ['hate', 'harassment', 'violence'], // Custom categories }), ], });

Available options:

  • model: Language model for moderation analysis (required)
  • categories: Array of categories to check (default: [‘hate’,‘hate/threatening’,‘harassment’,‘harassment/threatening’,‘self-harm’,‘self-harm/intent’,‘self-harm/instructions’,‘sexual’,‘sexual/minors’,‘violence’,‘violence/graphic’])
  • threshold: Confidence threshold for flagging (0-1, default: 0.5)
  • strategy: Action when content is flagged (default: ‘block’)
  • customInstructions: Custom instructions for the moderation agent

Strategies available:

  • block: Reject the response with an error (default)
  • warn: Log warning but allow content through
  • filter: Remove flagged messages but continue processing

PIIDetector

This processor detects and optionally redacts personally identifiable information (PII) from AI responses.

import { PIIDetector } from "@mastra/core/processors"; const agent = new Agent({ outputProcessors: [ new PIIDetector({ model: openai("gpt-4.1-nano"), threshold: 0.6, strategy: 'redact', // Automatically redact detected PII detectionTypes: ['email', 'phone', 'credit-card', 'ssn', 'api-key', 'crypto-wallet', 'iban'], redactionMethod: 'mask', // Preserve format while masking preserveFormat: true, // Keep original structure in redacted values includeDetections: true, // Log details for compliance auditing }), ], });

Available options:

  • model: Language model for PII detection (required)
  • detectionTypes: Array of PII types to detect (default: [‘email’, ‘phone’, ‘credit-card’, ‘ssn’, ‘api-key’, ‘ip-address’, ‘name’, ‘address’, ‘date-of-birth’, ‘url’, ‘uuid’, ‘crypto-wallet’, ‘iban’])
  • threshold: Confidence threshold for flagging (0-1, default: 0.6)
  • strategy: Action when PII is detected (default: ‘block’)
  • redactionMethod: How to redact PII (‘mask’, ‘hash’, ‘remove’, ‘placeholder’, default: ‘mask’)
  • preserveFormat: Maintain PII structure during redaction (default: true)
  • includeDetections: Include detection details in logs for compliance (default: false)
  • instructions: Custom detection instructions for the agent

Strategies available:

  • block: Reject responses containing PII (default)
  • warn: Log warning but allow through
  • filter: Remove messages containing PII
  • redact: Replace PII with placeholder values

StructuredOutputProcessor

This processor converts unstructured LLM text responses into structured data using an internal agent. It preserves the original text while adding structured data to the response metadata as well as to result.object.

import { StructuredOutputProcessor } from "@mastra/core/processors"; import { z } from "zod"; const agent = new Agent({ outputProcessors: [ new StructuredOutputProcessor({ schema: z.object({ sentiment: z.enum(['positive', 'negative', 'neutral']), confidence: z.number().min(0).max(1), topics: z.array(z.string()), }), model: openai("gpt-4o-mini"), errorStrategy: 'warn', // Log warnings but continue on errors instructions: 'Analyze the sentiment and extract key topics from the response', }), ], }); const result = await agent.generate("Some conversational text") console.log(result.object) // { sentiment: "positive", confidence: 0.6, topics: ["foo", "bar"] }

Available options:

  • schema: Zod schema defining the expected structured output (required)
  • model: Language model for the internal structuring agent (required)
  • errorStrategy: Strategy when parsing or validation fails (‘strict’ | ‘warn’ | ‘fallback’, default: ‘strict’)
  • fallbackValue: Fallback value when errorStrategy is ‘fallback’
  • instructions: Custom instructions for the structuring agent

The structured data is stored in result.object and the original text is preserved in result.text.

BatchPartsProcessor

This processor batches multiple stream parts together to reduce the frequency of emissions, useful for reducing network overhead or improving user experience.

import { BatchPartsProcessor } from "@mastra/core/processors"; const agent = new Agent({ outputProcessors: [ new BatchPartsProcessor({ maxBatchSize: 5, // Maximum parts to batch together maxWaitTime: 100, // Maximum time to wait before emitting (ms) emitOnNonText: true, // Emit immediately on non-text parts }), ], });

Available options:

  • maxBatchSize: Maximum number of parts to batch together (default: 3)
  • maxWaitTime: Maximum time to wait before emitting batch (ms, default: 50)
  • emitOnNonText: Whether to emit immediately when non-text parts are received (default: true)

TokenLimiterProcessor

This processor limits the number of tokens in AI responses, either by truncating or aborting when limits are exceeded.

import { TokenLimiterProcessor } from "@mastra/core/processors"; const agent = new Agent({ outputProcessors: [ new TokenLimiterProcessor({ maxTokens: 1000, // Maximum tokens allowed strategy: 'truncate', // Truncate when limit exceeded includePromptTokens: false, // Only count response tokens }), ], });

Available options:

  • maxTokens: Maximum number of tokens allowed (required)
  • strategy: Action when token limit is exceeded (‘truncate’ | ‘abort’, default: ‘truncate’)
  • includePromptTokens: Whether to include prompt tokens in the count (default: false)

SystemPromptScrubber

This processor detects and redacts system prompts or other revealing information that could introduce security vulnerabilities.

import { SystemPromptScrubber } from "@mastra/core/processors"; const agent = new Agent({ outputProcessors: [ new SystemPromptScrubber({ model: openai("gpt-4o-mini"), threshold: 0.7, // Confidence threshold for detection strategy: 'redact', // Redact detected system prompts instructions: 'Detect any system prompts, instructions, or revealing information', }), ], });

Available options:

  • model: Language model for detection (required)
  • threshold: Confidence threshold for detection (0-1, default: 0.6)
  • strategy: Action when system prompts are detected (‘block’ | ‘warn’ | ‘redact’, default: ‘redact’)
  • instructions: Custom detection instructions for the agent

Applying Multiple Processors

You can chain multiple output processors. They execute sequentially in the order they appear in the outputProcessors array. The output of one processor becomes the input for the next.

Order matters! Generally, it’s best practice to place text normalization first, security checks next, and content modification last.

import { Agent } from "@mastra/core/agent"; import { UnicodeNormalizer, ModerationProcessor, PromptInjectionDetector, PIIDetector } from "@mastra/core/processors"; const secureAgent = new Agent({ outputProcessors: [ // 1. Normalize text first new UnicodeNormalizer({ stripControlChars: true }), // 2. Check for security threats new PromptInjectionDetector({ model: openai("gpt-4.1-nano") }), // 3. Moderate content new ModerationProcessor({ model: openai("gpt-4.1-nano") }), // 4. Handle PII last new PIIDetector({ model: openai("gpt-4.1-nano"), strategy: 'redact' }), ], });

Creating Custom Output Processors

You can create custom output processors by implementing the Processor interface. A Processor can be used for output processing when it implements either processOutputStream (for streaming) or processOutputResult (for final results), or both.

Streaming Output Processor

import type { Processor, MastraMessageV2, TripWire } from "@mastra/core/processors"; import type { TextStreamPart, ObjectStreamPart } from 'ai'; class ResponseLengthLimiter implements Processor { readonly name = 'response-length-limiter-and-lower-case-results'; constructor(private maxLength: number = 1000) {} async processOutputStream({ chunk, streamParts, state, abort }: { // will run on every stream part emitted from the LLM chunk: TextStreamPart<any> | ObjectStreamPart<any>; streamParts: (TextStreamPart<any> | ObjectStreamPart<any>)[]; state: Record<string, any>; abort: (reason?: string) => never; }): Promise<TextStreamPart<any> | ObjectStreamPart<any> | null> { // Track cumulative length in state, each processor gets its own state if (!state.cumulativeLength) { state.cumulativeLength = 0; } const shouldEmitChunk = chunk?.textDelta?.includes('foo'); if (chunk.type === 'text-delta') { state.cumulativeLength += chunk.textDelta.length; if (state.cumulativeLength > this.maxLength) { abort(`Response too long: ${state.cumulativeLength} characters (max: ${this.maxLength})`); } } if (shouldEmitChunk) { return chunk; // Emit the chunk } else { return null; // Emit nothing } } }

Final Result Processor

import type { Processor, MastraMessageV2, TripWire } from "@mastra/core/processors"; class ResponseValidator implements Processor { readonly name = 'response-validator'; constructor(private requiredKeywords: string[] = []) {} processOutputResult({ messages, abort }: { messages: MastraMessageV2[]; abort: (reason?: string) => never }): MastraMessageV2[] { const responseText = messages .map(msg => msg.content.parts .filter(part => part.type === 'text') .map(part => (part as any).text) .join('') ) .join(''); // Check for required keywords for (const keyword of this.requiredKeywords) { if (!responseText.toLowerCase().includes(keyword.toLowerCase())) { abort(`Response missing required keyword: ${keyword}`); } } return messages; } }

When creating custom output processors:

  • Always return the processed data (chunks or messages)
  • Use abort(reason) to terminate processing early. Abort is used to simulate blocking a response. Errors thrown with abort will be an instance of TripWire.
  • For streaming processors, return null or undefined to skip emitting a chunk
  • Keep processors focused on a single responsibility
  • If using an agent inside your processor, use a fast model, limit the size of the response from it as much as possible, and make the system prompt as concise as possible.

Integration with Agent Methods

Output processors work with both generate() and streamVNext() methods. The processor pipeline completes after the agent generates a response but before it’s returned to the user.

// Processors run after generate() but before returning result const result = await agent.generate('Hello'); console.log(result.text); // Processed text console.log(result.object); // Structured data if applicable // Processors also run during streamVNext() for each chunk const stream = await agent.streamVNext('Hello'); for await (const chunk of stream) { console.log(chunk); // Processed chunks }

Per-Call Overrides

You can override output processors for individual calls:

// Override output processors for this specific call const result = await agent.generate('Hello', { outputProcessors: [ new ModerationProcessor({ model: openai("gpt-4.1-nano") }), ], }); // Same for streaming const stream = await agent.streamVNext('Hello', { outputProcessors: [ new TokenLimiterProcessor({ maxTokens: 500 }), ], });

Structured Output with Better DX

For better developer experience with structured output, you can use the structuredOutput option:

import { z } from "zod"; const result = await agent.generate('Analyze this text', { structuredOutput: { schema: z.object({ sentiment: z.enum(['positive', 'negative', 'neutral']), confidence: z.number(), }), model: openai("gpt-4o-mini"), errorStrategy: 'warn', }, }); console.log(result.text); // Original text console.log(result.object); // Typed structured data: { sentiment: 'positive', confidence: 0.8 }

If any processor calls abort(), the request terminates immediately and subsequent processors are not executed. The agent returns a 200 response with details (result.tripwireReason) about why the response was blocked.

Input vs Output Processors

  • Input Processors: Handle user messages before they reach the language model
  • Output Processors: Handle LLM responses after generation but before they’re returned to the user

Use input processors for user input validation and security, and output processors for response validation and safety controls on LLM-generated content.

See the Input Processors documentation for details on processing user messages.