Observational Memory
Added in: @mastra/memory@1.1.0
Observational Memory (OM) is Mastra's memory system for long-context agentic memory. Two background agents — an Observer that watches conversations and creates observations, and a Reflector that restructures observations by combining related items, reflecting on overarching patterns, and condensing where possible — maintain an observation log that replaces raw message history as it grows.
UsageDirect link to Usage
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'
export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: true,
},
}),
})
ConfigurationDirect link to Configuration
The observationalMemory option accepts true, a configuration object, or false. Setting true enables OM with google/gemini-2.5-flash as the default model. When passing a config object, a model must be explicitly set — either at the top level, or on observation.model and/or reflection.model.
Observer input is multimodal-aware. OM keeps text placeholders like [Image #1: screenshot.png] in the transcript it builds for the Observer, and also sends the underlying image parts when possible. This applies to both single-thread observation and batched multi-thread observation. Non-image files appear as placeholders only.
OM performs thresholding with fast local token estimation. Text uses tokenx, and image-like inputs use provider-aware heuristics plus deterministic fallbacks when metadata is incomplete.
enabled?:
model?:
scope?:
retrieval?:
observation?:
model?:
instruction?:
threadTitle?:
messageTokens?:
maxTokensPerBatch?:
modelSettings?:
temperature?:
maxOutputTokens?:
bufferTokens?:
bufferActivation?:
blockAfter?:
previousObserverTokens?:
reflection?:
model?:
instruction?:
observationTokens?:
modelSettings?:
temperature?:
maxOutputTokens?:
bufferActivation?:
blockAfter?:
Token estimate metadata cacheDirect link to Token estimate metadata cache
OM persists token payload estimates so repeated counting can reuse prior token estimation work.
- Part-level cache:
part.providerMetadata.mastra. - String-content fallback cache: message-level metadata when no parts exist.
- Cache entries are ignored and recomputed if cache version/tokenizer source doesn't match.
- Per-message and per-conversation overhead are always recomputed at runtime and aren't cached.
data-*andreasoningparts are skipped and don't receive cache entries.
ExamplesDirect link to Examples
Resource scope with custom thresholds (experimental)Direct link to Resource scope with custom thresholds (experimental)
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'
export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
model: 'google/gemini-2.5-flash',
scope: 'resource',
observation: {
messageTokens: 20_000,
},
reflection: {
observationTokens: 60_000,
},
},
},
}),
})
Shared token budgetDirect link to Shared token budget
When shareTokenBudget is enabled, the total budget is observation.messageTokens + reflection.observationTokens (100k in this example). If observations only use 30k tokens, messages can expand to use up to 70k. If messages are short, observations have more room before triggering reflection.
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'
export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
shareTokenBudget: true,
observation: {
messageTokens: 20_000,
bufferTokens: false, // required when using shareTokenBudget (temporary limitation)
},
reflection: {
observationTokens: 80_000,
},
},
},
}),
})
Custom modelDirect link to Custom model
By passing a model in the config, you can use any model from Mastra's model router.
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'
export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5.4',
memory: new Memory({
options: {
observationalMemory: {
model: 'openai/gpt-5-mini',
},
},
}),
})
Different models per agentDirect link to Different models per agent
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'
export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5.4',
memory: new Memory({
options: {
observationalMemory: {
observation: {
model: 'google/gemini-2.5-flash',
},
reflection: {
model: 'openai/gpt-5-mini',
},
},
},
}),
})
Custom instructionsDirect link to Custom instructions
Customize what the Observer and Reflector focus on by providing custom instructions:
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'
export const agent = new Agent({
name: 'health-assistant',
instructions: 'You are a health and wellness assistant.',
model: 'openai/gpt-5.4',
memory: new Memory({
options: {
observationalMemory: {
model: 'google/gemini-2.5-flash',
observation: {
// Focus observations on health-related preferences and goals
instruction:
'Prioritize capturing user health goals, dietary restrictions, exercise preferences, and medical considerations. Avoid capturing general chit-chat.',
},
reflection: {
// Guide reflection to consolidate health patterns
instruction:
'When consolidating, group related health information together. Preserve specific metrics, dates, and medical details.',
},
},
},
}),
})
Async bufferingDirect link to Async buffering
Async buffering is enabled by default. It pre-computes observations in the background as the conversation grows — when the messageTokens threshold is reached, buffered observations activate instantly with no blocking LLM call.
The lifecycle is: buffer → activate → remove messages → repeat. Background Observer calls run at bufferTokens intervals, each producing a chunk of observations. At threshold, chunks activate: observations move into the log, raw messages are removed from context. The blockAfter threshold forces a synchronous fallback if buffering can't keep up.
Default settings:
observation.bufferTokens: 0.2— buffer every 20% ofmessageTokens(e.g. every ~6k tokens with a 30k threshold)observation.bufferActivation: 0.8— on activation, remove enough messages to keep only 20% of the threshold remaining- Buffered observations include continuation hints (
suggestedResponse,currentTask) that survive activation to maintain conversational continuity reflection.bufferActivation: 0.5— start background reflection at 50% of observation threshold
To customize:
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'
export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
model: 'google/gemini-2.5-flash',
observation: {
messageTokens: 30_000,
// Buffer every 5k tokens (runs in background)
bufferTokens: 5_000,
// Activate to retain 30% of threshold
bufferActivation: 0.7,
// Force synchronous observation at 1.5x threshold
blockAfter: 1.5,
},
reflection: {
observationTokens: 60_000,
// Start background reflection at 50% of threshold
bufferActivation: 0.5,
// Force synchronous reflection at 1.2x threshold
blockAfter: 1.2,
},
},
},
}),
})
To disable async buffering entirely:
observationalMemory: {
model: "google/gemini-2.5-flash",
observation: {
bufferTokens: false,
},
}
Setting bufferTokens: false disables both observation and reflection async buffering. Observations and reflections will run synchronously when their thresholds are reached.
Async buffering isn't supported with scope: 'resource' and is automatically disabled in resource scope.
Streaming data partsDirect link to Streaming data parts
Observational Memory emits typed data parts during agent execution that clients can use for real-time UI feedback. These are streamed alongside the agent's response.
data-om-statusDirect link to data-om-status
Emitted once per agent loop step, before model generation. Provides a snapshot of the current memory state, including token usage for both context windows and the state of any async buffered content.
interface DataOmStatusPart {
type: 'data-om-status'
data: {
windows: {
active: {
/** Unobserved message tokens and the threshold that triggers observation */
messages: { tokens: number; threshold: number }
/** Observation tokens and the threshold that triggers reflection */
observations: { tokens: number; threshold: number }
}
buffered: {
observations: {
/** Number of buffered chunks staged for activation */
chunks: number
/** Total message tokens across all buffered chunks */
messageTokens: number
/** Projected message tokens that would be removed if activation happened now (based on bufferActivation ratio and chunk boundaries) */
projectedMessageRemoval: number
/** Observation tokens that will be added on activation */
observationTokens: number
/** idle: no buffering in progress. running: background observer is working. complete: chunks are ready for activation. */
status: 'idle' | 'running' | 'complete'
}
reflection: {
/** Observation tokens that were fed into the reflector (pre-compression size) */
inputObservationTokens: number
/** Observation tokens the reflection will produce on activation (post-compression size) */
observationTokens: number
/** idle: no reflection buffered. running: background reflector is working. complete: reflection is ready for activation. */
status: 'idle' | 'running' | 'complete'
}
}
}
recordId: string
threadId: string
stepNumber: number
/** Increments each time the Reflector creates a new generation */
generationCount: number
}
}
buffered.reflection.inputObservationTokens is the size of the observations that were sent to the Reflector. buffered.reflection.observationTokens is the compressed result — the size of what will replace those observations when the reflection activates. A client can use these two values to show a compression ratio.
Clients can derive percentages and post-activation estimates from the raw values:
// Message window usage %
const msgPercent = status.windows.active.messages.tokens / status.windows.active.messages.threshold
// Observation window usage %
const obsPercent =
status.windows.active.observations.tokens / status.windows.active.observations.threshold
// Projected message tokens after buffered observations activate
// Uses projectedMessageRemoval which accounts for bufferActivation ratio and chunk boundaries
const postActivation =
status.windows.active.messages.tokens -
status.windows.buffered.observations.projectedMessageRemoval
// Reflection compression ratio (when buffered reflection exists)
const { inputObservationTokens, observationTokens } = status.windows.buffered.reflection
if (inputObservationTokens > 0) {
const compressionRatio = observationTokens / inputObservationTokens
}
data-om-observation-startDirect link to data-om-observation-start
Emitted when the Observer or Reflector agent begins processing.
cycleId:
operationType:
startedAt:
tokensToObserve:
recordId:
threadId:
threadIds:
config:
data-om-observation-endDirect link to data-om-observation-end
Emitted when observation or reflection completes successfully.
cycleId:
operationType:
completedAt:
durationMs:
tokensObserved:
observationTokens:
observations?:
currentTask?:
suggestedResponse?:
recordId:
threadId:
data-om-observation-failedDirect link to data-om-observation-failed
Emitted when observation or reflection fails. The system falls back to synchronous processing.
cycleId:
operationType:
failedAt:
durationMs:
tokensAttempted:
error:
observations?:
recordId:
threadId:
data-om-buffering-startDirect link to data-om-buffering-start
Emitted when async buffering begins in the background. Buffering pre-computes observations or reflections before the main threshold is reached.
cycleId:
operationType:
startedAt:
tokensToBuffer:
recordId:
threadId:
threadIds:
config:
data-om-buffering-endDirect link to data-om-buffering-end
Emitted when async buffering completes. The content is stored but not yet activated in the main context.
cycleId:
operationType:
completedAt:
durationMs:
tokensBuffered:
bufferedTokens:
observations?:
recordId:
threadId:
data-om-buffering-failedDirect link to data-om-buffering-failed
Emitted when async buffering fails. The system falls back to synchronous processing when the threshold is reached.
cycleId:
operationType:
failedAt:
durationMs:
tokensAttempted:
error:
observations?:
recordId:
threadId:
data-om-activationDirect link to data-om-activation
Emitted when buffered observations or reflections are activated (moved into the active context window). This is an instant operation — no LLM call is involved.
cycleId:
operationType:
activatedAt:
chunksActivated:
tokensActivated:
observationTokens:
messagesActivated:
generationCount:
observations?:
recordId:
threadId:
config:
Standalone usageDirect link to Standalone usage
Most users should use the Memory class above. Using ObservationalMemory directly is mainly useful for benchmarking, experimentation, or when you need to control processor ordering with other processors (like guardrails).
import { ObservationalMemory } from '@mastra/memory/processors'
import { Agent } from '@mastra/core/agent'
import { LibSQLStore } from '@mastra/libsql'
const storage = new LibSQLStore({
id: 'my-storage',
url: 'file:./memory.db',
})
const om = new ObservationalMemory({
storage: storage.stores.memory,
model: 'google/gemini-2.5-flash',
scope: 'resource',
observation: {
messageTokens: 20_000,
},
reflection: {
observationTokens: 60_000,
},
})
export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
inputProcessors: [om],
outputProcessors: [om],
})
Standalone configDirect link to Standalone config
The standalone ObservationalMemory class accepts all the same options as the observationalMemory config object above, plus the following:
storage:
onDebugEvent?:
obscureThreadIds?:
Recall toolDirect link to Recall tool
When retrieval is set (any truthy value), a recall tool is registered so the agent can page through raw messages behind observation group ranges. By default (scope 'resource'), the tool supports listing threads (mode: "threads"), browsing other threads (threadId), and cross-thread search. With retrieval: { vector: true }, semantic search is available (mode: "search"). Set scope: 'thread' to restrict the tool to the current thread only. The tool is automatically added to the agent's tool list — no manual registration is needed.
ParametersDirect link to Parameters
mode?:
query?:
cursor?:
threadId?:
page?:
limit?:
detail?:
partIndex?:
before?:
after?:
Returns (messages mode)Direct link to Returns (messages mode)
messages:
count:
cursor:
page:
limit:
hasNextPage:
hasPrevPage:
truncated?:
tokenOffset?:
Returns (threads mode)Direct link to Returns (threads mode)
threads:
count:
page:
hasMore:
Returns (search mode)Direct link to Returns (search mode)
results:
count:
ModelByInputTokensDirect link to ModelByInputTokens
ModelByInputTokens selects a model based on the input token count. It chooses the model for the smallest threshold that covers the actual input size.
ConstructorDirect link to Constructor
new ModelByInputTokens(config)
Where config is an object with upTo keys that map token thresholds (numbers) to model targets.
ExampleDirect link to Example
import { ModelByInputTokens } from '@mastra/memory'
const selector = new ModelByInputTokens({
upTo: {
10_000: 'google/gemini-2.5-flash', // Fast for small inputs
40_000: 'openai/gpt-5.4-mini', // Stronger for medium inputs
1_000_000: 'openai/gpt-5.4', // Most capable for large inputs
},
})
BehaviorDirect link to Behavior
- Thresholds are sorted internally, so the order in the config object doesn't matter.
inputTokens ≤ smallest threshold→ uses that threshold's modelinputTokens > largest threshold→resolve()throws an error. If this happens during an OM Observer or Reflector run, OM aborts via TripWire, so callers receive an emptytextresult or streamedtripwireinstead of a normal assistant response.- OM computes the input token count for the Observer or Reflector call and resolves the matching model tier directly