# CachingPubSub `CachingPubSub` wraps any [`PubSub`](https://mastra.ai/reference/pubsub/base) implementation and adds event caching and replay. It records every published event per topic in a cache, so a subscriber that connects late or reconnects after a disconnect can replay the events it missed before continuing with live events. Use it to build resumable streams on top of a transport that does not keep history, such as [`EventEmitterPubSub`](https://mastra.ai/reference/pubsub/event-emitter). Transports that already persist events, such as [`RedisStreamsPubSub`](https://mastra.ai/reference/pubsub/redis-streams), do not need this wrapper. ## Usage example Wrap an inner pub/sub and provide a server cache to store events. ```typescript import { Mastra } from '@mastra/core' import { CachingPubSub, EventEmitterPubSub } from '@mastra/core/events' import { InMemoryServerCache } from '@mastra/core/cache' const cache = new InMemoryServerCache() const pubsub = new CachingPubSub(new EventEmitterPubSub(), cache) export const mastra = new Mastra({ pubsub, }) ``` The `withCaching` helper returns the same instance and reads better when wrapping inline: ```typescript import { withCaching, EventEmitterPubSub } from '@mastra/core/events' import { InMemoryServerCache } from '@mastra/core/cache' const pubsub = withCaching(new EventEmitterPubSub(), new InMemoryServerCache()) ``` ## Constructor parameters **inner** (`PubSub`): The pub/sub implementation to wrap. All publishes and live subscriptions pass through to this instance. **cache** (`MastraServerCache`): Cache used to store events per topic for replay. **options** (`CachingPubSubOptions`): Optional configuration. ## Methods `CachingPubSub` implements the [`PubSub`](https://mastra.ai/reference/pubsub/base) contract. It overrides the replay methods to read cached events. The methods below describe the caching behavior. ### `publish(topic, event)` Caches the event with a sequential index, then publishes it to the inner pub/sub. ```typescript await pubsub.publish('my-topic', { type: 'example', data: { value: 1 }, runId: 'run-123', }) ``` ### `subscribeWithReplay(topic, cb)` Replays all cached events for the topic, then subscribes to live events. ```typescript await pubsub.subscribeWithReplay('my-topic', event => { console.log(event) }) ``` ### `subscribeFromOffset(topic, offset, cb)` Replays cached events starting at `offset`, then subscribes to live events. Use this when the client knows its last position, to avoid replaying the whole history. ```typescript await pubsub.subscribeFromOffset('my-topic', 42, event => { console.log(event) }) ``` ### `getHistory(topic, offset?)` Returns cached events for the topic, starting at `offset`. ```typescript const events = await pubsub.getHistory('my-topic', 0) ``` Returns: `Promise` ## Functions ### `withCaching(pubsub, cache, options?)` Convenience wrapper that constructs a `CachingPubSub`. Accepts the same arguments as the constructor and returns the new instance. ```typescript import { withCaching, EventEmitterPubSub } from '@mastra/core/events' import { InMemoryServerCache } from '@mastra/core/cache' const pubsub = withCaching(new EventEmitterPubSub(), new InMemoryServerCache(), { keyPrefix: 'events:', }) ``` Returns: `CachingPubSub`