Skip to main content

EventEmitterPubSub

EventEmitterPubSub is the default PubSub implementation. It delivers events in-process using a Node.js EventEmitter, so it works without any external service.

Use it for single-process applications. For delivery across processes on one host, see UnixSocketPubSub. For distributed delivery, see RedisStreamsPubSub or GoogleCloudPubSub.

Because it's in-process, events aren't persisted and aren't shared with other processes. Wrap it in CachingPubSub when you need replay for resumable streams.

Usage example
Direct link to Usage example

EventEmitterPubSub is used automatically when you don't configure a pubsub option, so most applications never construct it directly. Create one explicitly only when you want to configure or share it.

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { EventEmitterPubSub } from '@mastra/core/events'

export const mastra = new Mastra({
pubsub: new EventEmitterPubSub(),
})

To share an emitter with other parts of your application, pass an existing EventEmitter:

import EventEmitter from 'node:events'
import { EventEmitterPubSub } from '@mastra/core/events'

const emitter = new EventEmitter()
const pubsub = new EventEmitterPubSub(emitter)

To surface batched-delivery errors, pass a logger:

import { EventEmitterPubSub } from '@mastra/core/events'

const pubsub = new EventEmitterPubSub(undefined, { logger })

Constructor parameters
Direct link to Constructor parameters

existingEmitter?:

EventEmitter
An existing Node.js EventEmitter to use for delivery. When omitted, a new EventEmitter is created.

options?:

EventEmitterPubSubOptions
Optional configuration.
IMastraLogger

Properties
Direct link to Properties

supportedModes:

ReadonlyArray<"pull" | "push">
Returns `["pull", "push"]`. The emitter can serve a pull-style worker or push events directly to listeners.

supportsNativeBatching:

boolean
Returns `true`. Subscribers can opt in to batched delivery with `options.batch`.

Methods
Direct link to Methods

EventEmitterPubSub implements the PubSub contract. The methods below have behavior specific to this implementation.

subscribe(topic, cb, options?)
Direct link to subscribetopic-cb-options

Registers a callback for a topic. Without options.group, every subscriber receives every event. With a group, events are distributed round-robin across members of that group.

Pass options.batch to opt in to batched delivery. See Batching below.

await pubsub.subscribe('workflow.events', (event, ack, nack) => {
console.log(event)
})

flush()
Direct link to flush

Waits for any pending redeliveries from nack to fire before resolving.

await pubsub.flush()

close()
Direct link to close

Removes all listeners and cancels pending redeliveries. Call this during graceful shutdown.

await pubsub.close()

Redelivery
Direct link to Redelivery

When a grouped subscriber calls nack, the event is redelivered to the group after a short delay, and its deliveryAttempt count increases. Calling ack clears the tracking for that event. Fan-out subscribers receive no-op ack and nack functions, since each event reaches every subscriber once.

Batching
Direct link to Batching

EventEmitterPubSub honors options.batch natively. When a subscriber opts in, events are held in a per-subscriber in-memory buffer and delivered as consecutive callback invocations once a flush condition is met. Both fan-out and group subscribers can batch. See SubscribeBatchOptions for the full policy.

await pubsub.subscribe(
'workflow.events',
event => {
console.log(event)
},
{
batch: {
maxSize: 10, // flush once 10 events have queued
maxWaitMs: 500, // ...or after 500ms, whichever comes first
},
},
)

The buffer is in-memory and per-process, so batched state isn't persisted and doesn't survive a restart. A flush triggered by maxWaitMs is best-effort: if a step such as a throwing coalesce fails, the error is surfaced through the configured logger rather than thrown. flush() drains every batched subscriber buffer before resolving.