Skip to main content
Mastra 1.0 is available 🎉 Read announcement

Observational Memory

Added in: @mastra/memory@1.1.0

Observational Memory (OM) is Mastra's memory system for long-context agentic memory. Two background agents — an Observer that watches conversations and creates observations, and a Reflector that restructures observations by combining related items, reflecting on overarching patterns, and condensing where possible — maintain an observation log that replaces raw message history as it grows.

Usage
Direct link to Usage

src/mastra/agents/agent.ts
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'

export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: true,
},
}),
})

Configuration
Direct link to Configuration

The observationalMemory option accepts true, a configuration object, or false. Setting true enables OM with google/gemini-2.5-flash as the default model. When passing a config object, a model must be explicitly set — either at the top level, or on observation.model and/or reflection.model.

enabled?:

boolean
= true
Enable or disable Observational Memory. When omitted from a config object, defaults to `true`. Only `enabled: false` explicitly disables it.

model?:

string | LanguageModel | DynamicModel | ModelWithRetries[]
= 'google/gemini-2.5-flash' (when using observationalMemory: true)
Model for both the Observer and Reflector agents. Sets the model for both at once. Cannot be used together with `observation.model` or `reflection.model` — an error will be thrown if both are set. When using `observationalMemory: true`, defaults to `google/gemini-2.5-flash`. When passing a config object, this or `observation.model`/`reflection.model` must be set. Use `"default"` to explicitly use the default model (`google/gemini-2.5-flash`).

scope?:

'resource' | 'thread'
= 'thread'
Memory scope for observations. `'thread'` keeps observations per-thread. `'resource'` (experimental) shares observations across all threads for a resource, enabling cross-conversation memory.

shareTokenBudget?:

boolean
= false
Share the token budget between messages and observations. When enabled, the total budget is `observation.messageTokens + reflection.observationTokens`. Messages can use more space when observations are small, and vice versa. This maximizes context usage through flexible allocation. **Note:** `shareTokenBudget` is not yet compatible with async buffering. You must set `observation: { bufferTokens: false }` when using this option (this is a temporary limitation).

observation?:

ObservationalMemoryObservationConfig
Configuration for the observation step. Controls when the Observer agent runs and how it behaves.

reflection?:

ObservationalMemoryReflectionConfig
Configuration for the reflection step. Controls when the Reflector agent runs and how it behaves.

Observation config
Direct link to Observation config

model?:

string | LanguageModel | DynamicModel | ModelWithRetries[]
Model for the Observer agent. Cannot be set if a top-level `model` is also provided. If neither this nor the top-level `model` is set, falls back to `reflection.model`.

instruction?:

string
Custom instruction appended to the Observer's system prompt. Use this to customize what the Observer focuses on, such as domain-specific preferences or priorities.

messageTokens?:

number
= 30000
Token count of unobserved messages that triggers observation. When unobserved message tokens exceed this threshold, the Observer agent is called.

maxTokensPerBatch?:

number
= 10000
Maximum tokens per batch when observing multiple threads in resource scope. Threads are chunked into batches of this size and processed in parallel. Lower values mean more parallelism but more API calls.

modelSettings?:

ObservationalMemoryModelSettings
= { temperature: 0.3, maxOutputTokens: 100_000 }
Model settings for the Observer agent.

bufferTokens?:

number | false
= 0.2
Token interval for async background observation buffering. Can be an absolute token count (e.g. `5000`) or a fraction of `messageTokens` (e.g. `0.25` = buffer every 25% of threshold). When set, observations run in the background at this interval, storing results in a buffer. When the main `messageTokens` threshold is reached, buffered observations activate instantly without a blocking LLM call. Must resolve to less than `messageTokens`. Set to `false` to explicitly disable all async buffering (both observation and reflection).

bufferActivation?:

number
= 0.8
Controls how much of the message window to retain after activation. Accepts a ratio (0-1) or an absolute token count (≥ 1000). For example, `0.8` means: activate enough buffers to remove 80% of `messageTokens` and leave 20% as active message history. An absolute token count like `4000` targets a goal of keeping ~4k message tokens remaining after activation. Higher values remove more message history per activation when using a ratio. Higher values keep more message history when using a token count.

blockAfter?:

number
= 1.2 (when bufferTokens is set)
Token threshold above which synchronous (blocking) observation is forced. Between `messageTokens` and `blockAfter`, only async buffering/activation is used. Above `blockAfter`, a synchronous observation runs as a last resort, while buffered activation still preserves a minimum remaining context (min(1000, retention floor)). Accepts a multiplier (1 < value < 2, multiplied by `messageTokens`) or an absolute token count (≥ 2, must be greater than `messageTokens`). Only relevant when `bufferTokens` is set. Defaults to `1.2` when async buffering is enabled.

Reflection config
Direct link to Reflection config

model?:

string | LanguageModel | DynamicModel | ModelWithRetries[]
Model for the Reflector agent. Cannot be set if a top-level `model` is also provided. If neither this nor the top-level `model` is set, falls back to `observation.model`.

instruction?:

string
Custom instruction appended to the Reflector's system prompt. Use this to customize how the Reflector consolidates observations, such as prioritizing certain types of information.

observationTokens?:

number
= 40000
Token count of observations that triggers reflection. When observation tokens exceed this threshold, the Reflector agent is called to condense them.

modelSettings?:

ObservationalMemoryModelSettings
= { temperature: 0, maxOutputTokens: 100_000 }
Model settings for the Reflector agent.

bufferActivation?:

number
= 0.5
Ratio (0-1) controlling when async reflection buffering starts. When observation tokens reach `observationTokens * bufferActivation`, reflection runs in the background. On activation at the full threshold, the buffered reflection replaces the observations it covers, preserving any new observations appended after that range.

blockAfter?:

number
= 1.2 (when bufferActivation is set)
Token threshold above which synchronous (blocking) reflection is forced. Between `observationTokens` and `blockAfter`, only async buffering/activation is used. Above `blockAfter`, a synchronous reflection runs as a last resort. Accepts a multiplier (1 < value < 2, multiplied by `observationTokens`) or an absolute token count (≥ 2, must be greater than `observationTokens`). Only relevant when `bufferActivation` is set. Defaults to `1.2` when async reflection is enabled.

Model settings
Direct link to Model settings

temperature?:

number
= 0.3
Temperature for generation. Lower values produce more consistent output.

maxOutputTokens?:

number
= 100000
Maximum output tokens. Set high to prevent truncation of observations.

Examples
Direct link to Examples

Resource scope with custom thresholds (experimental)
Direct link to Resource scope with custom thresholds (experimental)

src/mastra/agents/agent.ts
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'

export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
model: 'google/gemini-2.5-flash',
scope: 'resource',
observation: {
messageTokens: 20_000,
},
reflection: {
observationTokens: 60_000,
},
},
},
}),
})

Shared token budget
Direct link to Shared token budget

src/mastra/agents/agent.ts
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'

export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
shareTokenBudget: true,
observation: {
messageTokens: 20_000,
bufferTokens: false, // required when using shareTokenBudget (temporary limitation)
},
reflection: {
observationTokens: 80_000,
},
},
},
}),
})

When shareTokenBudget is enabled, the total budget is observation.messageTokens + reflection.observationTokens (100k in this example). If observations only use 30k tokens, messages can expand to use up to 70k. If messages are short, observations have more room before triggering reflection.

Custom model
Direct link to Custom model

src/mastra/agents/agent.ts
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'

export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
model: 'openai/gpt-4o-mini',
},
},
}),
})

Different models per agent
Direct link to Different models per agent

src/mastra/agents/agent.ts
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'

export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
observation: {
model: 'google/gemini-2.5-flash',
},
reflection: {
model: 'openai/gpt-4o-mini',
},
},
},
}),
})

Custom instructions
Direct link to Custom instructions

Customize what the Observer and Reflector focus on by providing custom instructions:

src/mastra/agents/agent.ts
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'

export const agent = new Agent({
name: 'health-assistant',
instructions: 'You are a health and wellness assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
model: 'google/gemini-2.5-flash',
observation: {
// Focus observations on health-related preferences and goals
instruction:
'Prioritize capturing user health goals, dietary restrictions, exercise preferences, and medical considerations. Avoid capturing general chit-chat.',
},
reflection: {
// Guide reflection to consolidate health patterns
instruction:
'When consolidating, group related health information together. Preserve specific metrics, dates, and medical details.',
},
},
},
}),
})

Async buffering
Direct link to Async buffering

Async buffering is enabled by default. It pre-computes observations in the background as the conversation grows — when the messageTokens threshold is reached, buffered observations activate instantly with no blocking LLM call.

The lifecycle is: buffer → activate → remove messages → repeat. Background Observer calls run at bufferTokens intervals, each producing a chunk of observations. At threshold, chunks activate: observations move into the log, raw messages are removed from context. The blockAfter threshold forces a synchronous fallback if buffering can't keep up.

Default settings:

  • observation.bufferTokens: 0.2 — buffer every 20% of messageTokens (e.g. every ~6k tokens with a 30k threshold)
  • observation.bufferActivation: 0.8 — on activation, remove enough messages to keep only 20% of the threshold remaining
  • Buffered observations include continuation hints (suggestedResponse, currentTask) that survive activation to maintain conversational continuity
  • reflection.bufferActivation: 0.5 — start background reflection at 50% of observation threshold

To customize:

src/mastra/agents/agent.ts
import { Memory } from '@mastra/memory'
import { Agent } from '@mastra/core/agent'

export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
memory: new Memory({
options: {
observationalMemory: {
model: 'google/gemini-2.5-flash',
observation: {
messageTokens: 30_000,
// Buffer every 5k tokens (runs in background)
bufferTokens: 5_000,
// Activate to retain 30% of threshold
bufferActivation: 0.7,
// Force synchronous observation at 1.5x threshold
blockAfter: 1.5,
},
reflection: {
observationTokens: 60_000,
// Start background reflection at 50% of threshold
bufferActivation: 0.5,
// Force synchronous reflection at 1.2x threshold
blockAfter: 1.2,
},
},
},
}),
})

To disable async buffering entirely:

observationalMemory: {
model: "google/gemini-2.5-flash",
observation: {
bufferTokens: false,
},
}

Setting bufferTokens: false disables both observation and reflection async buffering. Observations and reflections will run synchronously when their thresholds are reached.

note

Async buffering is not supported with scope: 'resource' and is automatically disabled in resource scope.

Streaming data parts
Direct link to Streaming data parts

Observational Memory emits typed data parts during agent execution that clients can use for real-time UI feedback. These are streamed alongside the agent's response.

data-om-status
Direct link to data-om-status

Emitted once per agent loop step, before model generation. Provides a snapshot of the current memory state, including token usage for both context windows and the state of any async buffered content.

interface DataOmStatusPart {
type: 'data-om-status'
data: {
windows: {
active: {
/** Unobserved message tokens and the threshold that triggers observation */
messages: { tokens: number; threshold: number }
/** Observation tokens and the threshold that triggers reflection */
observations: { tokens: number; threshold: number }
}
buffered: {
observations: {
/** Number of buffered chunks staged for activation */
chunks: number
/** Total message tokens across all buffered chunks */
messageTokens: number
/** Projected message tokens that would be removed if activation happened now (based on bufferActivation ratio and chunk boundaries) */
projectedMessageRemoval: number
/** Observation tokens that will be added on activation */
observationTokens: number
/** idle: no buffering in progress. running: background observer is working. complete: chunks are ready for activation. */
status: 'idle' | 'running' | 'complete'
}
reflection: {
/** Observation tokens that were fed into the reflector (pre-compression size) */
inputObservationTokens: number
/** Observation tokens the reflection will produce on activation (post-compression size) */
observationTokens: number
/** idle: no reflection buffered. running: background reflector is working. complete: reflection is ready for activation. */
status: 'idle' | 'running' | 'complete'
}
}
}
recordId: string
threadId: string
stepNumber: number
/** Increments each time the Reflector creates a new generation */
generationCount: number
}
}

buffered.reflection.inputObservationTokens is the size of the observations that were sent to the Reflector. buffered.reflection.observationTokens is the compressed result — the size of what will replace those observations when the reflection activates. A client can use these two values to show a compression ratio.

Clients can derive percentages and post-activation estimates from the raw values:

// Message window usage %
const msgPercent = status.windows.active.messages.tokens / status.windows.active.messages.threshold

// Observation window usage %
const obsPercent =
status.windows.active.observations.tokens / status.windows.active.observations.threshold

// Projected message tokens after buffered observations activate
// Uses projectedMessageRemoval which accounts for bufferActivation ratio and chunk boundaries
const postActivation =
status.windows.active.messages.tokens -
status.windows.buffered.observations.projectedMessageRemoval

// Reflection compression ratio (when buffered reflection exists)
const { inputObservationTokens, observationTokens } = status.windows.buffered.reflection
if (inputObservationTokens > 0) {
const compressionRatio = observationTokens / inputObservationTokens
}

data-om-observation-start
Direct link to data-om-observation-start

Emitted when the Observer or Reflector agent begins processing.

cycleId:

string
Unique ID for this cycle — shared between start/end/failed markers.

operationType:

'observation' | 'reflection'
Whether this is an observation or reflection operation.

startedAt:

string
ISO timestamp when processing started.

tokensToObserve:

number
Message tokens (input) being processed in this batch.

recordId:

string
The OM record ID.

threadId:

string
This thread's ID.

threadIds:

string[]
All thread IDs in this batch (for resource-scoped).

config:

ObservationMarkerConfig
Snapshot of `messageTokens`, `observationTokens`, and `scope` at observation time.

data-om-observation-end
Direct link to data-om-observation-end

Emitted when observation or reflection completes successfully.

cycleId:

string
Matches the corresponding `start` marker.

operationType:

'observation' | 'reflection'
Type of operation that completed.

completedAt:

string
ISO timestamp when processing completed.

durationMs:

number
Duration in milliseconds.

tokensObserved:

number
Message tokens (input) that were processed.

observationTokens:

number
Resulting observation tokens (output) after the Observer compressed them.

observations?:

string
The generated observations text.

currentTask?:

string
Current task extracted by the Observer.

suggestedResponse?:

string
Suggested response extracted by the Observer.

recordId:

string
The OM record ID.

threadId:

string
This thread's ID.

data-om-observation-failed
Direct link to data-om-observation-failed

Emitted when observation or reflection fails. The system falls back to synchronous processing.

cycleId:

string
Matches the corresponding `start` marker.

operationType:

'observation' | 'reflection'
Type of operation that failed.

failedAt:

string
ISO timestamp when the failure occurred.

durationMs:

number
Duration until failure in milliseconds.

tokensAttempted:

number
Message tokens (input) that were attempted.

error:

string
Error message.

observations?:

string
Any partial content available for display.

recordId:

string
The OM record ID.

threadId:

string
This thread's ID.

data-om-buffering-start
Direct link to data-om-buffering-start

Emitted when async buffering begins in the background. Buffering pre-computes observations or reflections before the main threshold is reached.

cycleId:

string
Unique ID for this buffering cycle.

operationType:

'observation' | 'reflection'
Type of operation being buffered.

startedAt:

string
ISO timestamp when buffering started.

tokensToBuffer:

number
Message tokens (input) being buffered in this cycle.

recordId:

string
The OM record ID.

threadId:

string
This thread's ID.

threadIds:

string[]
All thread IDs being buffered (for resource-scoped).

config:

ObservationMarkerConfig
Snapshot of config at buffering time.

data-om-buffering-end
Direct link to data-om-buffering-end

Emitted when async buffering completes. The content is stored but not yet activated in the main context.

cycleId:

string
Matches the corresponding `buffering-start` marker.

operationType:

'observation' | 'reflection'
Type of operation that was buffered.

completedAt:

string
ISO timestamp when buffering completed.

durationMs:

number
Duration in milliseconds.

tokensBuffered:

number
Message tokens (input) that were buffered.

bufferedTokens:

number
Observation tokens (output) after the Observer compressed them.

observations?:

string
The buffered content.

recordId:

string
The OM record ID.

threadId:

string
This thread's ID.

data-om-buffering-failed
Direct link to data-om-buffering-failed

Emitted when async buffering fails. The system falls back to synchronous processing when the threshold is reached.

cycleId:

string
Matches the corresponding `buffering-start` marker.

operationType:

'observation' | 'reflection'
Type of operation that failed.

failedAt:

string
ISO timestamp when the failure occurred.

durationMs:

number
Duration until failure in milliseconds.

tokensAttempted:

number
Message tokens (input) that were attempted to buffer.

error:

string
Error message.

observations?:

string
Any partial content.

recordId:

string
The OM record ID.

threadId:

string
This thread's ID.

data-om-activation
Direct link to data-om-activation

Emitted when buffered observations or reflections are activated (moved into the active context window). This is an instant operation — no LLM call is involved.

cycleId:

string
Unique ID for this activation event.

operationType:

'observation' | 'reflection'
Type of content activated.

activatedAt:

string
ISO timestamp when activation occurred.

chunksActivated:

number
Number of buffered chunks activated.

tokensActivated:

number
Message tokens (input) from activated chunks. For observation activation, these are removed from the message window. For reflection activation, this is the observation tokens that were compressed.

observationTokens:

number
Resulting observation tokens after activation.

messagesActivated:

number
Number of messages that were observed via activation.

generationCount:

number
Current reflection generation count.

observations?:

string
The activated observations text.

recordId:

string
The OM record ID.

threadId:

string
This thread's ID.

config:

ObservationMarkerConfig
Snapshot of config at activation time.

Standalone usage
Direct link to Standalone usage

Most users should use the Memory class above. Using ObservationalMemory directly is mainly useful for benchmarking, experimentation, or when you need to control processor ordering with other processors (like guardrails).

src/mastra/agents/agent.ts
import { ObservationalMemory } from '@mastra/memory/processors'
import { Agent } from '@mastra/core/agent'
import { LibSQLStore } from '@mastra/libsql'

const storage = new LibSQLStore({
id: 'my-storage',
url: 'file:./memory.db',
})

const om = new ObservationalMemory({
storage: storage.stores.memory,
model: 'google/gemini-2.5-flash',
scope: 'resource',
observation: {
messageTokens: 20_000,
},
reflection: {
observationTokens: 60_000,
},
})

export const agent = new Agent({
name: 'my-agent',
instructions: 'You are a helpful assistant.',
model: 'openai/gpt-5-mini',
inputProcessors: [om],
outputProcessors: [om],
})

Standalone config
Direct link to Standalone config

The standalone ObservationalMemory class accepts all the same options as the observationalMemory config object above, plus the following:

storage:

MemoryStorage
Storage adapter for persisting observations. Must be a MemoryStorage instance (from `MastraStorage.stores.memory`).

onDebugEvent?:

(event: ObservationDebugEvent) => void
Debug callback for observation events. Called whenever observation-related events occur. Useful for debugging and understanding the observation flow.

obscureThreadIds?:

boolean
= false
When enabled, thread IDs are hashed before being included in observation context. This prevents the LLM from recognizing patterns in thread identifiers. Automatically enabled when using resource scope through the Memory class.