# SignalProvider Abstract base class for building signal providers. A signal provider monitors external sources (APIs, webhooks, event streams) and pushes notification signals into agent threads through a built-in subscription registry. Signal providers are not processors by default. Providers that need to intercept agent execution return processors from `getInputProcessors()` or `getOutputProcessors()`. Providers that expose agent-callable tools return them from `getTools()`. For a ready-to-use webhook-based provider, see [`WebhookSignalProvider`](https://mastra.ai/reference/signals/webhook-signal-provider). ## Usage example Polling provider that checks an API every 30 seconds: ```typescript import { SignalProvider } from '@mastra/core/signals' import type { SignalSubscription } from '@mastra/core/signals' class SlackSignals extends SignalProvider<'slack-signals'> { readonly id = 'slack-signals' readonly pollInterval = 30_000 async poll(subscriptions: SignalSubscription[]) { for (const sub of subscriptions) { const messages = await fetchSlackMessages(sub.externalResourceId) if (messages.length > 0) { await this.notify( { source: 'slack', kind: 'new-messages', summary: `${messages.length} new messages in ${sub.externalResourceId}`, }, { threadId: sub.threadId, resourceId: sub.resourceId }, ) } } } } ``` Register with an agent: ```typescript import { Agent } from '@mastra/core/agent' const agent = new Agent({ signals: [new SlackSignals()], }) ``` The agent calls `connect(this)`, registers any processors or tools the provider returns, and starts polling. ## Constructor parameters `SignalProvider` is abstract. Subclasses call `super()` with no arguments. ## Properties **id** (`TId extends string`): Unique identifier for this provider. Subclasses must implement this as a readonly property. **name** (`string`): Human-readable display name for the provider. **pollInterval** (`number`): Poll interval in milliseconds. When set, the framework calls \`poll()\` on this interval. Leave \`undefined\` or \`0\` for webhook-only providers. **isConnected** (`boolean`): Whether this provider is connected to an agent. Returns \`true\` after \`connect()\` is called. Used internally to skip re-wiring during \`Agent.\_\_fork()\`. ## Methods ### Connection #### `connect(agent)` Called by the Agent constructor. Establishes the bidirectional link so the provider can send signals back to the agent. Override to run additional setup after the link is established. Always call `super.connect(agent)`. ```typescript class MySignals extends SignalProvider<'my-signals'> { readonly id = 'my-signals' override connect(agent) { super.connect(agent) // additional setup after agent link is established } } ``` #### `__registerMastra(mastra)` Called when the provider's agent is registered with a Mastra instance. Override to access storage or other Mastra services. Always call `super.__registerMastra(mastra)`. ```typescript override __registerMastra(mastra) { super.__registerMastra(mastra) // this.mastra is now available } ``` ### Processor and tool integration #### `getInputProcessors()` Return input processors this provider needs to be registered with the agent. Override when your provider intercepts agent input steps (for example, injecting context hints or detecting tool calls). ```typescript getInputProcessors() { return [this] } ``` Returns: `InputProcessorOrWorkflow[]` #### `getOutputProcessors()` Return output processors this provider needs to be registered with the agent. Override when your provider intercepts agent output steps. ```typescript getOutputProcessors() { return [this] } ``` Returns: `OutputProcessorOrWorkflow[]` #### `getTools()` Return tools this provider exposes to the agent. Override when your provider adds agent-callable tools such as subscribe or unsubscribe commands. ```typescript getTools() { return { subscribe_pr: createTool({ /* ... */ }), unsubscribe_pr: createTool({ /* ... */ }), } } ``` Returns: `Record` ### Subscription tracking #### `subscribe(target, externalResourceId, metadata?)` Subscribe a thread to an external resource. This is a protected method — call it from within your provider implementation. ```typescript const sub = this.subscribe( { threadId: 'thread-1', resourceId: 'user-1' }, 'github:mastra-ai/mastra#123', { pr: 123 }, ) ``` Returns: `SignalSubscription` — the created subscription, or the existing one with merged metadata. **target** (`SignalProviderTarget`): The thread to subscribe. Must include \`threadId\` and \`resourceId\`. **externalResourceId** (`string`): Provider-specific identifier for the external resource (for example, \`"github:owner/repo#123"\`). **metadata** (`Record`): Additional data to store with the subscription. Merged into existing metadata on duplicate subscribes. #### `unsubscribe(target, externalResourceId)` Remove a subscription. ```typescript const removed = this.unsubscribe( { threadId: 'thread-1', resourceId: 'user-1' }, 'github:mastra-ai/mastra#123', ) ``` Returns: `boolean` — `true` if removed, `false` if no matching subscription existed. #### `getSubscriptions()` Return all active subscriptions for this provider. ```typescript const allSubs = this.getSubscriptions() ``` Returns: `SignalSubscription[]` #### `getSubscriptionsForResource(externalResourceId)` Return all subscriptions for a specific external resource. ```typescript const subs = this.getSubscriptionsForResource('github:mastra-ai/mastra#123') for (const sub of subs) { await this.notify( { source: 'my-provider', kind: 'update', summary: 'Resource updated' }, { threadId: sub.threadId, resourceId: sub.resourceId }, ) } ``` Returns: `SignalSubscription[]` #### `getSubscriptionsForThread(target)` Return all subscriptions for a specific thread. ```typescript const subs = this.getSubscriptionsForThread({ threadId: 'thread-1', resourceId: 'user-1', }) ``` Returns: `SignalSubscription[]` #### `hasSubscription(target, externalResourceId)` Check whether a subscription exists. ```typescript if (this.hasSubscription(target, 'github:mastra-ai/mastra#123')) { // already subscribed } ``` Returns: `boolean` #### `unsubscribeAll(target)` Remove all subscriptions for a thread. ```typescript const removed = this.unsubscribeAll({ threadId: 'thread-1', resourceId: 'user-1', }) ``` Returns: `number` — count of removed subscriptions. #### `subscriptionCount` Total number of active subscriptions for this provider. ```typescript if (this.subscriptionCount === 0) { // nothing to poll } ``` Returns: `number` ### Polling #### `poll(subscriptions)` Called on each poll cycle with all active subscriptions. Override to check external sources and emit notifications. The framework prevents overlapping poll cycles — if a `poll()` call takes longer than `pollInterval`, the next cycle is skipped. ```typescript async poll(subscriptions: SignalSubscription[]) { for (const sub of subscriptions) { const events = await checkExternalSource(sub.externalResourceId) for (const event of events) { await this.notify( { source: 'my-provider', kind: event.type, summary: event.message }, { threadId: sub.threadId, resourceId: sub.resourceId }, ) } } } ``` #### `startPolling()` Start the polling timer. Called by the Agent after `connect()`. Idempotent — calling multiple times has no effect. ```typescript provider.startPolling() ``` #### `stopPolling()` Stop the polling timer. ```typescript provider.stopPolling() ``` ### Webhooks #### `handleWebhook(request)` Handle an incoming webhook request. Override to parse the payload, match it to subscriptions, and emit notification signals. See [`WebhookSignalProvider`](https://mastra.ai/reference/signals/webhook-signal-provider) for a ready-to-use implementation. ```typescript async handleWebhook(request) { const payload = request.body as { repo: string, event: string } const subs = this.getSubscriptionsForResource(payload.repo) for (const sub of subs) { await this.notify( { source: 'github', kind: payload.event, summary: `Event on ${payload.repo}` }, { threadId: sub.threadId, resourceId: sub.resourceId }, ) } return { status: 200, body: { matched: subs.length } } } ``` Returns: `Promise<{ status?: number; body?: unknown }>` ### Lifecycle #### `start()` Called after `connect()` to run async initialization. Override when setup requires the agent or Mastra instance to be available. ```typescript async start() { await this.loadInitialState() } ``` #### `stop()` Called on shutdown. The default implementation stops polling and clears all subscriptions. ```typescript provider.stop() ``` ### Notifications #### `notify(notification, target)` Send a notification signal to the connected agent. This is a protected convenience wrapper around `agent.sendNotificationSignal()`. ```typescript await this.notify( { source: 'my-provider', kind: 'pr-updated', summary: 'PR #123 was updated', priority: 'high', payload: { prNumber: 123 }, }, { threadId: 'thread-1', resourceId: 'user-1' }, ) ``` **notification** (`object`): The notification payload. **target** (`SignalProviderTarget`): The thread to notify. Must include \`threadId\` and \`resourceId\`. ## Types ### `SignalSubscription` The subscription object returned by `subscribe()`. **id** (`string`): Unique identifier for the subscription. **providerId** (`string`): The provider that owns this subscription. **threadId** (`string`): The thread receiving signals. **resourceId** (`string`): The resource owning the thread. **externalResourceId** (`string`): Provider-specific identifier for the external resource (for example, \`"github:owner/repo#123"\`). **subscribedAt** (`Date`): When the subscription was created. **metadata** (`Record`): Provider-specific metadata stored with the subscription. ### `SignalProviderTarget` Identifies a specific agent thread. **threadId** (`string`): The thread to target. **resourceId** (`string`): The resource owning the thread. **agentId** (`string`): Agent identifier. ## Type guard ### `isSignalProvider(obj)` Runtime check for `SignalProvider` instances. ```typescript import { isSignalProvider } from '@mastra/core/signals' if (isSignalProvider(obj)) { obj.connect(agent) } ``` Returns: `boolean`