Skip to main content

GoogleCloudPubSub

GoogleCloudPubSub is a PubSub implementation backed by Google Cloud Pub/Sub. It delivers events across processes and hosts using Google Cloud topics and subscriptions, with ordered delivery and message acknowledgment.

Use it for distributed deployments on Google Cloud. For single-process delivery, use EventEmitterPubSub. For Redis, use RedisStreamsPubSub.

Each topic maps to a Google Cloud topic. Subscriptions with a group share a named subscription, so members compete for events. Subscriptions without a group create a per-instance subscription, so every instance receives every event.

Installation
Direct link to Installation

npm install @mastra/google-cloud-pubsub

Usage example
Direct link to Usage example

Pass a Google Cloud client configuration, such as a project ID.

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { GoogleCloudPubSub } from '@mastra/google-cloud-pubsub'

export const mastra = new Mastra({
pubsub: new GoogleCloudPubSub({
projectId: 'my-project',
}),
})

Constructor parameters
Direct link to Constructor parameters

config:

ClientConfig
Configuration for the Google Cloud Pub/Sub client, including credentials and project ID. See the `@google-cloud/pubsub` client documentation for all fields.

Methods
Direct link to Methods

GoogleCloudPubSub implements the PubSub contract. The methods below are specific to this implementation.

init(topicName, group?)
Direct link to inittopicname-group

Creates the topic and a subscription if they do not already exist, and returns the subscription. subscribe calls this internally, so you rarely call it directly.

await pubsub.init('workflow.events')

subscribe(topic, cb, options?)
Direct link to subscribetopic-cb-options

Subscribes to a topic. With options.group, members of the group share a subscription and compete for events. Without a group, the instance receives every event through its own subscription.

await pubsub.subscribe('workflow.events', (event, ack, nack) => {
console.log(event)
})

flush()
Direct link to flush

Waits for pending acknowledgments to complete.

await pubsub.flush()

destroy(topicName)
Direct link to destroytopicname

Removes the subscription and topic for a given topic name. Use this to clean up Google Cloud resources.

await pubsub.destroy('workflow.events')

Acknowledgment
Direct link to Acknowledgment

Each delivered event includes ack and nack functions. Call ack after successful processing to remove the event from the subscription. When neither is called, Google Cloud redelivers the event after its acknowledgment deadline expires.