SignalProvider
Abstract base class for building signal providers. A signal provider monitors external sources (APIs, webhooks, event streams) and pushes notification signals into agent threads through a built-in subscription registry.
Signal providers are not processors by default. Providers that need to intercept agent execution return processors from getInputProcessors() or getOutputProcessors(). Providers that expose agent-callable tools return them from getTools().
For a ready-to-use webhook-based provider, see WebhookSignalProvider.
Usage exampleDirect link to Usage example
Polling provider that checks an API every 30 seconds:
import { SignalProvider } from '@mastra/core/signals'
import type { SignalSubscription } from '@mastra/core/signals'
class SlackSignals extends SignalProvider<'slack-signals'> {
readonly id = 'slack-signals'
readonly pollInterval = 30_000
async poll(subscriptions: SignalSubscription[]) {
for (const sub of subscriptions) {
const messages = await fetchSlackMessages(sub.externalResourceId)
if (messages.length > 0) {
await this.notify(
{
source: 'slack',
kind: 'new-messages',
summary: `${messages.length} new messages in ${sub.externalResourceId}`,
},
{ threadId: sub.threadId, resourceId: sub.resourceId },
)
}
}
}
}
Register with an agent:
import { Agent } from '@mastra/core/agent'
const agent = new Agent({
signals: [new SlackSignals()],
})
The agent calls connect(this), registers any processors or tools the provider returns, and starts polling.
Constructor parametersDirect link to Constructor parameters
SignalProvider is abstract. Subclasses call super() with no arguments.
PropertiesDirect link to Properties
id:
name?:
pollInterval?:
isConnected:
MethodsDirect link to Methods
ConnectionDirect link to Connection
connect(agent)Direct link to connectagent
Called by the Agent constructor. Establishes the bidirectional link so the provider can send signals back to the agent. Override to run additional setup after the link is established. Always call super.connect(agent).
class MySignals extends SignalProvider<'my-signals'> {
readonly id = 'my-signals'
override connect(agent) {
super.connect(agent)
// additional setup after agent link is established
}
}
__registerMastra(mastra)Direct link to __registermastramastra
Called when the provider's agent is registered with a Mastra instance. Override to access storage or other Mastra services. Always call super.__registerMastra(mastra).
override __registerMastra(mastra) {
super.__registerMastra(mastra)
// this.mastra is now available
}
Processor and tool integrationDirect link to Processor and tool integration
getInputProcessors()Direct link to getinputprocessors
Return input processors this provider needs to be registered with the agent. Override when your provider intercepts agent input steps (for example, injecting context hints or detecting tool calls).
getInputProcessors() {
return [this]
}
Returns: InputProcessorOrWorkflow[]
getOutputProcessors()Direct link to getoutputprocessors
Return output processors this provider needs to be registered with the agent. Override when your provider intercepts agent output steps.
getOutputProcessors() {
return [this]
}
Returns: OutputProcessorOrWorkflow[]
getTools()Direct link to gettools
Return tools this provider exposes to the agent. Override when your provider adds agent-callable tools such as subscribe or unsubscribe commands.
getTools() {
return {
subscribe_pr: createTool({ /* ... */ }),
unsubscribe_pr: createTool({ /* ... */ }),
}
}
Returns: Record<string, unknown>
Subscription trackingDirect link to Subscription tracking
subscribe(target, externalResourceId, metadata?)Direct link to subscribetarget-externalresourceid-metadata
Subscribe a thread to an external resource. This is a protected method — call it from within your provider implementation.
const sub = this.subscribe(
{ threadId: 'thread-1', resourceId: 'user-1' },
'github:mastra-ai/mastra#123',
{ pr: 123 },
)
Returns: SignalSubscription — the created subscription, or the existing one with merged metadata.
target:
externalResourceId:
metadata?:
unsubscribe(target, externalResourceId)Direct link to unsubscribetarget-externalresourceid
Remove a subscription.
const removed = this.unsubscribe(
{ threadId: 'thread-1', resourceId: 'user-1' },
'github:mastra-ai/mastra#123',
)
Returns: boolean — true if removed, false if no matching subscription existed.
getSubscriptions()Direct link to getsubscriptions
Return all active subscriptions for this provider.
const allSubs = this.getSubscriptions()
Returns: SignalSubscription[]
getSubscriptionsForResource(externalResourceId)Direct link to getsubscriptionsforresourceexternalresourceid
Return all subscriptions for a specific external resource.
const subs = this.getSubscriptionsForResource('github:mastra-ai/mastra#123')
for (const sub of subs) {
await this.notify(
{ source: 'my-provider', kind: 'update', summary: 'Resource updated' },
{ threadId: sub.threadId, resourceId: sub.resourceId },
)
}
Returns: SignalSubscription[]
getSubscriptionsForThread(target)Direct link to getsubscriptionsforthreadtarget
Return all subscriptions for a specific thread.
const subs = this.getSubscriptionsForThread({
threadId: 'thread-1',
resourceId: 'user-1',
})
Returns: SignalSubscription[]
hasSubscription(target, externalResourceId)Direct link to hassubscriptiontarget-externalresourceid
Check whether a subscription exists.
if (this.hasSubscription(target, 'github:mastra-ai/mastra#123')) {
// already subscribed
}
Returns: boolean
unsubscribeAll(target)Direct link to unsubscribealltarget
Remove all subscriptions for a thread.
const removed = this.unsubscribeAll({
threadId: 'thread-1',
resourceId: 'user-1',
})
Returns: number — count of removed subscriptions.
subscriptionCountDirect link to subscriptioncount
Total number of active subscriptions for this provider.
if (this.subscriptionCount === 0) {
// nothing to poll
}
Returns: number
PollingDirect link to Polling
poll(subscriptions)Direct link to pollsubscriptions
Called on each poll cycle with all active subscriptions. Override to check external sources and emit notifications. The framework prevents overlapping poll cycles — if a poll() call takes longer than pollInterval, the next cycle is skipped.
async poll(subscriptions: SignalSubscription[]) {
for (const sub of subscriptions) {
const events = await checkExternalSource(sub.externalResourceId)
for (const event of events) {
await this.notify(
{ source: 'my-provider', kind: event.type, summary: event.message },
{ threadId: sub.threadId, resourceId: sub.resourceId },
)
}
}
}
startPolling()Direct link to startpolling
Start the polling timer. Called by the Agent after connect(). Idempotent — calling multiple times has no effect.
provider.startPolling()
stopPolling()Direct link to stoppolling
Stop the polling timer.
provider.stopPolling()
WebhooksDirect link to Webhooks
handleWebhook(request)Direct link to handlewebhookrequest
Handle an incoming webhook request. Override to parse the payload, match it to subscriptions, and emit notification signals. See WebhookSignalProvider for a ready-to-use implementation.
async handleWebhook(request) {
const payload = request.body as { repo: string, event: string }
const subs = this.getSubscriptionsForResource(payload.repo)
for (const sub of subs) {
await this.notify(
{ source: 'github', kind: payload.event, summary: `Event on ${payload.repo}` },
{ threadId: sub.threadId, resourceId: sub.resourceId },
)
}
return { status: 200, body: { matched: subs.length } }
}
Returns: Promise<{ status?: number; body?: unknown }>
LifecycleDirect link to Lifecycle
start()Direct link to start
Called after connect() to run async initialization. Override when setup requires the agent or Mastra instance to be available.
async start() {
await this.loadInitialState()
}
stop()Direct link to stop
Called on shutdown. The default implementation stops polling and clears all subscriptions.
provider.stop()
NotificationsDirect link to Notifications
notify(notification, target)Direct link to notifynotification-target
Send a notification signal to the connected agent. This is a protected convenience wrapper around agent.sendNotificationSignal().
await this.notify(
{
source: 'my-provider',
kind: 'pr-updated',
summary: 'PR #123 was updated',
priority: 'high',
payload: { prNumber: 123 },
},
{ threadId: 'thread-1', resourceId: 'user-1' },
)
notification:
target:
TypesDirect link to Types
SignalSubscriptionDirect link to signalsubscription
The subscription object returned by subscribe().
id:
providerId:
threadId:
resourceId:
externalResourceId:
subscribedAt:
metadata:
SignalProviderTargetDirect link to signalprovidertarget
Identifies a specific agent thread.
threadId:
resourceId:
agentId?:
Type guardDirect link to Type guard
isSignalProvider(obj)Direct link to issignalproviderobj
Runtime check for SignalProvider instances.
import { isSignalProvider } from '@mastra/core/signals'
if (isSignalProvider(obj)) {
obj.connect(agent)
}
Returns: boolean