# PubSub `PubSub` is the abstract base class for Mastra's event system. It defines the contract that every pub/sub backend implements, so the rest of Mastra can publish and subscribe to events without knowing which transport is in use. Mastra uses pub/sub internally for workflow event processing, streaming, and cross-component communication. Most applications use the default [`EventEmitterPubSub`](https://mastra.ai/reference/pubsub/event-emitter) and never construct a `PubSub` directly. Implement this class only when you need a custom transport. For built-in implementations, see [`EventEmitterPubSub`](https://mastra.ai/reference/pubsub/event-emitter), [`UnixSocketPubSub`](https://mastra.ai/reference/pubsub/unix-socket-pubsub), [`CachingPubSub`](https://mastra.ai/reference/pubsub/caching-pubsub), [`RedisStreamsPubSub`](https://mastra.ai/reference/pubsub/redis-streams), and [`GoogleCloudPubSub`](https://mastra.ai/reference/pubsub/google-cloud-pubsub). ## Usage example Extend `PubSub` and implement the four abstract methods to add a custom backend. ```typescript import { PubSub } from '@mastra/core/events' import type { Event, EventCallback, SubscribeOptions } from '@mastra/core/events' export class CustomPubSub extends PubSub { async publish(topic: string, event: Omit): Promise { // Deliver the event to subscribers of `topic`. } async subscribe(topic: string, cb: EventCallback, options?: SubscribeOptions): Promise { // Register `cb` to receive events published to `topic`. } async unsubscribe(topic: string, cb: EventCallback): Promise { // Remove a previously registered callback. } async flush(): Promise { // Wait for any in-flight deliveries to settle. } } ``` Pass the instance to the [Mastra](https://mastra.ai/reference/core/mastra-class) constructor: ```typescript import { Mastra } from '@mastra/core' import { CustomPubSub } from './pubsub' export const mastra = new Mastra({ pubsub: new CustomPubSub(), }) ``` ## Delivery modes A `PubSub` declares which delivery modes it supports through the `supportedModes` property. Mastra reads this to decide whether to run a long-lived worker that pulls events. | Mode | Description | | ------ | ----------------------------------------------------------------------------------------------------------------------------- | | `pull` | Consumers actively read from the broker, for example Redis Streams `XREADGROUP`. Mastra runs an orchestration worker to read. | | `push` | Events arrive without the consumer asking, either in-process or through an HTTP endpoint. No read loop is required. | The default is `['pull']` so that custom implementations keep today's behavior unless they opt in to push delivery. ## Methods ### Core methods #### `publish(topic, event)` Publishes an event to a topic. The `id` and `createdAt` fields are assigned by the implementation. ```typescript await pubsub.publish('my-topic', { type: 'example', data: { value: 1 }, runId: 'run-123', }) ``` #### `subscribe(topic, cb, options?)` Registers a callback to receive events published to a topic. When `options.group` is set, subscribers in the same group compete for messages and each event is delivered to one member. Without a group, every subscriber receives every event. ```typescript await pubsub.subscribe('my-topic', (event, ack, nack) => { console.log(event) }) ``` #### `unsubscribe(topic, cb)` Removes a previously registered callback from a topic. ```typescript await pubsub.unsubscribe('my-topic', callback) ``` #### `flush()` Waits for any in-flight deliveries to settle. Call this before shutdown to avoid dropping events. ```typescript await pubsub.flush() ``` ### Replay methods These methods support resuming a stream after a disconnect. The default implementations fall back to a regular `subscribe`, so backends without history support behave as live-only. [`CachingPubSub`](https://mastra.ai/reference/pubsub/caching-pubsub) overrides them to replay cached events. #### `getHistory(topic, offset?)` Returns cached events for a topic, starting at `offset`. Returns an empty array when the backend has no history. ```typescript const events = await pubsub.getHistory('my-topic', 0) ``` Returns: `Promise` #### `subscribeWithReplay(topic, cb)` Replays cached events, then subscribes to live events. ```typescript await pubsub.subscribeWithReplay('my-topic', event => { console.log(event) }) ``` #### `subscribeFromOffset(topic, offset, cb)` Replays cached events starting at a known position, then subscribes to live events. This is more efficient than a full replay when the client knows its last position. ```typescript await pubsub.subscribeFromOffset('my-topic', 42, event => { console.log(event) }) ``` ## Properties **supportedModes** (`ReadonlyArray<"pull" | "push">`): Delivery modes the implementation supports. Defaults to \`\["pull"]\`. ## Types ### `Event` **type** (`string`): Event type identifier. **id** (`string`): Unique event ID, assigned by the implementation on publish. **data** (`any`): Event payload. **runId** (`string`): Run the event belongs to. **createdAt** (`Date`): Timestamp assigned by the implementation on publish. **index** (`number`): Sequential position used to resume from a specific offset. **deliveryAttempt** (`number`): Number of times the event has been delivered. Starts at 1. Defaults to 1 when the backend does not track redelivery. ### `SubscribeOptions` **group** (`string`): When set, subscribers with the same group compete for messages and each event is delivered to one member. When omitted, every subscriber receives every event. ### `EventCallback` The callback signature for subscribers: `(event: Event, ack?: () => Promise, nack?: () => Promise) => void`. **event** (`Event`): The delivered event. **ack** (`() => Promise`): Acknowledge successful processing. The event is removed from the queue. **nack** (`() => Promise`): Negative acknowledge. The event is requeued for redelivery after a delay.