Skip to main content

EventEmitterPubSub

EventEmitterPubSub is the default PubSub implementation. It delivers events in-process using a Node.js EventEmitter, so it works without any external service.

Use it for single-process applications. For delivery across processes on one host, see UnixSocketPubSub. For distributed delivery, see RedisStreamsPubSub or GoogleCloudPubSub.

Because it is in-process, events are not persisted and are not shared with other processes. Wrap it in CachingPubSub when you need replay for resumable streams.

Usage example
Direct link to Usage example

EventEmitterPubSub is used automatically when you do not configure a pubsub option, so most applications never construct it directly. Create one explicitly only when you want to configure or share it.

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { EventEmitterPubSub } from '@mastra/core/events'

export const mastra = new Mastra({
pubsub: new EventEmitterPubSub(),
})

To share an emitter with other parts of your application, pass an existing EventEmitter:

import EventEmitter from 'node:events'
import { EventEmitterPubSub } from '@mastra/core/events'

const emitter = new EventEmitter()
const pubsub = new EventEmitterPubSub(emitter)

Constructor parameters
Direct link to Constructor parameters

existingEmitter?:

EventEmitter
An existing Node.js EventEmitter to use for delivery. When omitted, a new EventEmitter is created.

Properties
Direct link to Properties

supportedModes:

ReadonlyArray<"pull" | "push">
Returns `["pull", "push"]`. The emitter can serve a pull-style worker or push events directly to listeners.

Methods
Direct link to Methods

EventEmitterPubSub implements the PubSub contract. The methods below have behavior specific to this implementation.

subscribe(topic, cb, options?)
Direct link to subscribetopic-cb-options

Registers a callback for a topic. Without options.group, every subscriber receives every event. With a group, events are distributed round-robin across members of that group.

await pubsub.subscribe('workflow.events', (event, ack, nack) => {
console.log(event)
})

flush()
Direct link to flush

Waits for any pending redeliveries from nack to fire before resolving.

await pubsub.flush()

close()
Direct link to close

Removes all listeners and cancels pending redeliveries. Call this during graceful shutdown.

await pubsub.close()

Redelivery
Direct link to Redelivery

When a grouped subscriber calls nack, the event is redelivered to the group after a short delay, and its deliveryAttempt count increases. Calling ack clears the tracking for that event. Fan-out subscribers receive no-op ack and nack functions, since each event reaches every subscriber once.