Skip to main content

PubSub

PubSub is the abstract base class for Mastra's event system. It defines the contract that every pub/sub backend implements, so the rest of Mastra can publish and subscribe to events without knowing which transport is in use.

Mastra uses pub/sub internally for workflow event processing, streaming, and cross-component communication. Most applications use the default EventEmitterPubSub and never construct a PubSub directly. Implement this class only when you need a custom transport.

For built-in implementations, see EventEmitterPubSub, UnixSocketPubSub, CachingPubSub, RedisStreamsPubSub, and GoogleCloudPubSub.

Usage example
Direct link to Usage example

Extend PubSub and implement the four abstract methods to add a custom backend.

src/mastra/pubsub.ts
import { PubSub } from '@mastra/core/events'
import type { Event, EventCallback, SubscribeOptions } from '@mastra/core/events'

export class CustomPubSub extends PubSub {
async publish(topic: string, event: Omit<Event, 'id' | 'createdAt'>): Promise<void> {
// Deliver the event to subscribers of `topic`.
}

async subscribe(topic: string, cb: EventCallback, options?: SubscribeOptions): Promise<void> {
// Register `cb` to receive events published to `topic`.
}

async unsubscribe(topic: string, cb: EventCallback): Promise<void> {
// Remove a previously registered callback.
}

async flush(): Promise<void> {
// Wait for any in-flight deliveries to settle.
}
}

Pass the instance to the Mastra constructor:

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { CustomPubSub } from './pubsub'

export const mastra = new Mastra({
pubsub: new CustomPubSub(),
})

Delivery modes
Direct link to Delivery modes

A PubSub declares which delivery modes it supports through the supportedModes property. Mastra reads this to decide whether to run a long-lived worker that pulls events.

ModeDescription
pullConsumers actively read from the broker, for example Redis Streams XREADGROUP. Mastra runs an orchestration worker to read.
pushEvents arrive without the consumer asking, either in-process or through an HTTP endpoint. No read loop is required.

The default is ['pull'] so that custom implementations keep today's behavior unless they opt in to push delivery.

Methods
Direct link to Methods

Core methods
Direct link to Core methods

publish(topic, event)
Direct link to publishtopic-event

Publishes an event to a topic. The id and createdAt fields are assigned by the implementation.

await pubsub.publish('my-topic', {
type: 'example',
data: { value: 1 },
runId: 'run-123',
})

subscribe(topic, cb, options?)
Direct link to subscribetopic-cb-options

Registers a callback to receive events published to a topic. When options.group is set, subscribers in the same group compete for messages and each event is delivered to one member. Without a group, every subscriber receives every event.

Pass options.batch to opt in to batched delivery. The callback signature is unchanged: a batch of N events is delivered as N consecutive cb(event, ack, nack) calls in publish order. Batching is honored only when the backend's supportsNativeBatching is true; other backends ignore the option and deliver events one at a time.

await pubsub.subscribe('my-topic', (event, ack, nack) => {
console.log(event)
})

unsubscribe(topic, cb)
Direct link to unsubscribetopic-cb

Removes a previously registered callback from a topic.

await pubsub.unsubscribe('my-topic', callback)

flush()
Direct link to flush

Waits for any in-flight deliveries to settle. Call this before shutdown to avoid dropping events.

await pubsub.flush()

Replay methods
Direct link to Replay methods

These methods support resuming a stream after a disconnect. The default implementations fall back to a regular subscribe, so backends without history support behave as live-only. CachingPubSub overrides them to replay cached events.

getHistory(topic, offset?)
Direct link to gethistorytopic-offset

Returns cached events for a topic, starting at offset. Returns an empty array when the backend has no history.

const events = await pubsub.getHistory('my-topic', 0)

Returns: Promise<Event[]>

subscribeWithReplay(topic, cb)
Direct link to subscribewithreplaytopic-cb

Replays cached events, then subscribes to live events.

await pubsub.subscribeWithReplay('my-topic', event => {
console.log(event)
})

subscribeFromOffset(topic, offset, cb)
Direct link to subscribefromoffsettopic-offset-cb

Replays cached events starting at a known position, then subscribes to live events. This is more efficient than a full replay when the client knows its last position.

await pubsub.subscribeFromOffset('my-topic', 42, event => {
console.log(event)
})

Properties
Direct link to Properties

supportedModes:

ReadonlyArray<"pull" | "push">
Delivery modes the implementation supports. Defaults to `["pull"]`.

supportsNativeBatching:

boolean
Whether the implementation honors `options.batch` on `subscribe()`. Defaults to `false`. Backends that integrate batching internally override this and return `true`.

Types
Direct link to Types

Event
Direct link to event

type:

string
Event type identifier.

id:

string
Unique event ID, assigned by the implementation on publish.

data:

any
Event payload.

runId:

string
Run the event belongs to.

createdAt:

Date
Timestamp assigned by the implementation on publish.

index?:

number
Sequential position used to resume from a specific offset.

deliveryAttempt?:

number
Number of times the event has been delivered. Starts at 1. Defaults to 1 when the backend does not track redelivery.

SubscribeOptions
Direct link to subscribeoptions

group?:

string
When set, subscribers with the same group compete for messages and each event is delivered to one member. When omitted, every subscriber receives every event.

batch?:

SubscribeBatchOptions
Opt in to batched delivery for this subscription. When omitted, events are delivered one at a time. Honored only by backends where `supportsNativeBatching` is `true`.

SubscribeBatchOptions
Direct link to subscribebatchoptions

Per-subscription batching policy. The callback signature doesn't change; a batch of N events becomes N consecutive callback invocations in publish order.

maxSize?:

number
Maximum events held before forcing a flush.

maxWaitMs?:

number
Maximum time in milliseconds the oldest event may sit in the buffer. The timer starts when the buffer transitions from empty to non-empty.

minIntervalMs?:

number
Minimum time in milliseconds between consecutive batch deliveries. Even when `maxSize` or `maxWaitMs` would fire, the buffer holds until this interval has elapsed since the last delivery.

isImmediate?:

(event: Event) => boolean
When it returns `true` for an event, the buffer flushes immediately on publish, subject to `minIntervalMs`. A per-event escape hatch.

coalesce?:

(events: Event[]) => Event[]
Applied to the queued batch before delivery to drop superseded events. Must return a subset of its input by reference identity; returning freshly constructed `Event` objects is a contract violation and discards the whole batch. Ordering of kept events is preserved.

maxBufferSize?:

number
= 256
Maximum events the buffer may hold before overflow handling kicks in. Events flagged immediate are never dropped on overflow.

overflow?:

"drop-oldest" | "drop-newest" | "coalesce-or-drop-oldest"
= coalesce-or-drop-oldest
Overflow strategy when the buffer exceeds `maxBufferSize`. `coalesce-or-drop-oldest` runs `coalesce` first, then drops oldest if still over budget.

EventCallback
Direct link to eventcallback

The callback signature for subscribers: (event: Event, ack?: () => Promise<void>, nack?: () => Promise<void>) => void.

event:

Event
The delivered event.

ack?:

() => Promise<void>
Acknowledge successful processing. The event is removed from the queue.

nack?:

() => Promise<void>
Negative acknowledge. The event is requeued for redelivery after a delay.