Skip to main content

SignalProvider

Abstract base class for building signal providers. A signal provider monitors external sources (APIs, webhooks, event streams) and pushes notification signals into agent threads through a built-in subscription registry.

Signal providers are not processors by default. Providers that need to intercept agent execution return processors from getInputProcessors() or getOutputProcessors(). Providers that expose agent-callable tools return them from getTools().

For a ready-to-use webhook-based provider, see WebhookSignalProvider.

Usage example
Direct link to Usage example

Polling provider that checks an API every 30 seconds:

src/signals/slack-signals.ts
import { SignalProvider } from '@mastra/core/signals'
import type { SignalSubscription } from '@mastra/core/signals'

class SlackSignals extends SignalProvider<'slack-signals'> {
readonly id = 'slack-signals'
readonly pollInterval = 30_000

async poll(subscriptions: SignalSubscription[]) {
for (const sub of subscriptions) {
const messages = await fetchSlackMessages(sub.externalResourceId)
if (messages.length > 0) {
await this.notify(
{
source: 'slack',
kind: 'new-messages',
summary: `${messages.length} new messages in ${sub.externalResourceId}`,
},
{ threadId: sub.threadId, resourceId: sub.resourceId },
)
}
}
}
}

Register with an agent:

src/mastra/index.ts
import { Agent } from '@mastra/core/agent'

const agent = new Agent({
signals: [new SlackSignals()],
})

The agent calls connect(this), registers any processors or tools the provider returns, and starts polling.

Constructor parameters
Direct link to Constructor parameters

SignalProvider is abstract. Subclasses call super() with no arguments.

Properties
Direct link to Properties

id:

TId extends string
Unique identifier for this provider. Subclasses must implement this as a readonly property.

name?:

string
Human-readable display name for the provider.

pollInterval?:

number
Poll interval in milliseconds. When set, the framework calls `poll()` on this interval. Leave `undefined` or `0` for webhook-only providers.

isConnected:

boolean
Whether this provider is connected to an agent. Returns `true` after `connect()` is called. Used internally to skip re-wiring during `Agent.__fork()`.

Methods
Direct link to Methods

Connection
Direct link to Connection

connect(agent)
Direct link to connectagent

Called by the Agent constructor. Establishes the bidirectional link so the provider can send signals back to the agent. Override to run additional setup after the link is established. Always call super.connect(agent).

class MySignals extends SignalProvider<'my-signals'> {
readonly id = 'my-signals'

override connect(agent) {
super.connect(agent)
// additional setup after agent link is established
}
}

__registerMastra(mastra)
Direct link to __registermastramastra

Called when the provider's agent is registered with a Mastra instance. Override to access storage or other Mastra services. Always call super.__registerMastra(mastra).

override __registerMastra(mastra) {
super.__registerMastra(mastra)
// this.mastra is now available
}

Processor and tool integration
Direct link to Processor and tool integration

getInputProcessors()
Direct link to getinputprocessors

Return input processors this provider needs to be registered with the agent. Override when your provider intercepts agent input steps (for example, injecting context hints or detecting tool calls).

getInputProcessors() {
return [this]
}

Returns: InputProcessorOrWorkflow[]

getOutputProcessors()
Direct link to getoutputprocessors

Return output processors this provider needs to be registered with the agent. Override when your provider intercepts agent output steps.

getOutputProcessors() {
return [this]
}

Returns: OutputProcessorOrWorkflow[]

getTools()
Direct link to gettools

Return tools this provider exposes to the agent. Override when your provider adds agent-callable tools such as subscribe or unsubscribe commands.

getTools() {
return {
subscribe_pr: createTool({ /* ... */ }),
unsubscribe_pr: createTool({ /* ... */ }),
}
}

Returns: Record<string, unknown>

Subscription tracking
Direct link to Subscription tracking

subscribe(target, externalResourceId, metadata?)
Direct link to subscribetarget-externalresourceid-metadata

Subscribe a thread to an external resource. This is a protected method — call it from within your provider implementation.

const sub = this.subscribe(
{ threadId: 'thread-1', resourceId: 'user-1' },
'github:mastra-ai/mastra#123',
{ pr: 123 },
)

Returns: SignalSubscription — the created subscription, or the existing one with merged metadata.

target:

SignalProviderTarget
The thread to subscribe. Must include `threadId` and `resourceId`.

externalResourceId:

string
Provider-specific identifier for the external resource (for example, `"github:owner/repo#123"`).

metadata?:

Record<string, unknown>
Additional data to store with the subscription. Merged into existing metadata on duplicate subscribes.

unsubscribe(target, externalResourceId)
Direct link to unsubscribetarget-externalresourceid

Remove a subscription.

const removed = this.unsubscribe(
{ threadId: 'thread-1', resourceId: 'user-1' },
'github:mastra-ai/mastra#123',
)

Returns: booleantrue if removed, false if no matching subscription existed.

getSubscriptions()
Direct link to getsubscriptions

Return all active subscriptions for this provider.

const allSubs = this.getSubscriptions()

Returns: SignalSubscription[]

getSubscriptionsForResource(externalResourceId)
Direct link to getsubscriptionsforresourceexternalresourceid

Return all subscriptions for a specific external resource.

const subs = this.getSubscriptionsForResource('github:mastra-ai/mastra#123')
for (const sub of subs) {
await this.notify(
{ source: 'my-provider', kind: 'update', summary: 'Resource updated' },
{ threadId: sub.threadId, resourceId: sub.resourceId },
)
}

Returns: SignalSubscription[]

getSubscriptionsForThread(target)
Direct link to getsubscriptionsforthreadtarget

Return all subscriptions for a specific thread.

const subs = this.getSubscriptionsForThread({
threadId: 'thread-1',
resourceId: 'user-1',
})

Returns: SignalSubscription[]

hasSubscription(target, externalResourceId)
Direct link to hassubscriptiontarget-externalresourceid

Check whether a subscription exists.

if (this.hasSubscription(target, 'github:mastra-ai/mastra#123')) {
// already subscribed
}

Returns: boolean

unsubscribeAll(target)
Direct link to unsubscribealltarget

Remove all subscriptions for a thread.

const removed = this.unsubscribeAll({
threadId: 'thread-1',
resourceId: 'user-1',
})

Returns: number — count of removed subscriptions.

subscriptionCount
Direct link to subscriptioncount

Total number of active subscriptions for this provider.

if (this.subscriptionCount === 0) {
// nothing to poll
}

Returns: number

Polling
Direct link to Polling

poll(subscriptions)
Direct link to pollsubscriptions

Called on each poll cycle with all active subscriptions. Override to check external sources and emit notifications. The framework prevents overlapping poll cycles — if a poll() call takes longer than pollInterval, the next cycle is skipped.

async poll(subscriptions: SignalSubscription[]) {
for (const sub of subscriptions) {
const events = await checkExternalSource(sub.externalResourceId)
for (const event of events) {
await this.notify(
{ source: 'my-provider', kind: event.type, summary: event.message },
{ threadId: sub.threadId, resourceId: sub.resourceId },
)
}
}
}

startPolling()
Direct link to startpolling

Start the polling timer. Called by the Agent after connect(). Idempotent — calling multiple times has no effect.

provider.startPolling()

stopPolling()
Direct link to stoppolling

Stop the polling timer.

provider.stopPolling()

Webhooks
Direct link to Webhooks

handleWebhook(request)
Direct link to handlewebhookrequest

Handle an incoming webhook request. Override to parse the payload, match it to subscriptions, and emit notification signals. See WebhookSignalProvider for a ready-to-use implementation.

async handleWebhook(request) {
const payload = request.body as { repo: string, event: string }
const subs = this.getSubscriptionsForResource(payload.repo)

for (const sub of subs) {
await this.notify(
{ source: 'github', kind: payload.event, summary: `Event on ${payload.repo}` },
{ threadId: sub.threadId, resourceId: sub.resourceId },
)
}

return { status: 200, body: { matched: subs.length } }
}

Returns: Promise<{ status?: number; body?: unknown }>

Lifecycle
Direct link to Lifecycle

start()
Direct link to start

Called after connect() to run async initialization. Override when setup requires the agent or Mastra instance to be available.

async start() {
await this.loadInitialState()
}

stop()
Direct link to stop

Called on shutdown. The default implementation stops polling and clears all subscriptions.

provider.stop()

Notifications
Direct link to Notifications

notify(notification, target)
Direct link to notifynotification-target

Send a notification signal to the connected agent. This is a protected convenience wrapper around agent.sendNotificationSignal().

await this.notify(
{
source: 'my-provider',
kind: 'pr-updated',
summary: 'PR #123 was updated',
priority: 'high',
payload: { prNumber: 123 },
},
{ threadId: 'thread-1', resourceId: 'user-1' },
)

notification:

object
The notification payload.
string
string
string
"high" | "medium" | "low"
unknown

target:

SignalProviderTarget
The thread to notify. Must include `threadId` and `resourceId`.

Types
Direct link to Types

SignalSubscription
Direct link to signalsubscription

The subscription object returned by subscribe().

id:

string
Unique identifier for the subscription.

providerId:

string
The provider that owns this subscription.

threadId:

string
The thread receiving signals.

resourceId:

string
The resource owning the thread.

externalResourceId:

string
Provider-specific identifier for the external resource (for example, `"github:owner/repo#123"`).

subscribedAt:

Date
When the subscription was created.

metadata:

Record<string, unknown>
Provider-specific metadata stored with the subscription.

SignalProviderTarget
Direct link to signalprovidertarget

Identifies a specific agent thread.

threadId:

string
The thread to target.

resourceId:

string
The resource owning the thread.

agentId?:

string
Agent identifier.

Type guard
Direct link to Type guard

isSignalProvider(obj)
Direct link to issignalproviderobj

Runtime check for SignalProvider instances.

import { isSignalProvider } from '@mastra/core/signals'

if (isSignalProvider(obj)) {
obj.connect(agent)
}

Returns: boolean