Signal providers
Added in: @mastra/core@1.39.0
This feature is in alpha. Breaking changes may occur without a major version bump until the API is stable.
A signal provider monitors an external source, such as GitHub, Slack, continuous integration (CI), or your own API, and pushes notification signals into subscribed agent threads.
When to use a signal providerDirect link to When to use a signal provider
Use a signal provider when an external system produces events that an agent should react to, and you want Mastra to manage the subscription bookkeeping for you.
- The source emits events tied to a resource a thread cares about, such as a pull request, a channel, or a build.
- You want one place that tracks which threads watch which external resources.
- You want to receive events by polling, by webhook, or both.
If you only need to push a one-off event into a thread, call agent.sendNotificationSignal() directly instead.
How signal providers workDirect link to How signal providers work
A signal provider is the producing side of the signals system. It brings external events into a thread, while the signal APIs control how the thread consumes them.
A signal provider combines three capabilities:
- Subscription tracking: The
SignalProviderbase class keeps an in-memory registry that maps each agent thread to the external resources it watches. - Ingestion: You override
poll()for pull-based sources orhandleWebhook()for push-based sources. - Delivery: When an event matches a subscription, call the protected
notify()helper to forward a notification signal to the connected agent's thread.
Register a provider by passing it to an agent. The agent connects the provider, starts polling if a pollInterval is set, and merges any processors or tools the provider exposes.
import { Agent } from '@mastra/core/agent'
import { CiSignals } from '../signals/ci-signals'
export const supportAgent = new Agent({
id: 'support-agent',
name: 'Support Agent',
instructions: 'Help the user triage updates.',
model: 'openai/gpt-5.5',
signals: [new CiSignals()],
})
Notification delivery requires a storage adapter with notification support, such as libSQL, PostgreSQL, or MongoDB. Configure storage on the Mastra instance so notify() can store notification records.
QuickstartDirect link to Quickstart
The following example demonstrates a polling provider that watches CI pipelines and emits a notification when a subscribed pipeline fails.
import { SignalProvider } from '@mastra/core/signals'
import type { SignalProviderTarget, SignalSubscription } from '@mastra/core/signals'
type BuildStatus = {
id: string
status: 'passed' | 'failed'
}
const builds = new Map<string, BuildStatus>([
['acme-app-main', { id: 'build_123', status: 'failed' }],
])
async function fetchBuildStatus(pipeline: string): Promise<BuildStatus> {
return builds.get(pipeline) ?? { id: 'build_unknown', status: 'passed' }
}
export class CiSignals extends SignalProvider<'ci-signals'> {
readonly id = 'ci-signals' as const
readonly pollInterval = 30_000
watch(target: SignalProviderTarget, pipeline: string) {
return this.subscribe(target, pipeline)
}
unwatch(target: SignalProviderTarget, pipeline: string) {
return this.unsubscribe(target, pipeline)
}
async poll(subscriptions: SignalSubscription[]) {
for (const sub of subscriptions) {
const build = await fetchBuildStatus(sub.externalResourceId)
if (build.status !== 'failed') continue
await this.notify(
{
source: this.id,
kind: 'ci-status',
priority: 'high',
summary: `Build failed for ${sub.externalResourceId}`,
payload: build,
dedupeKey: `${this.id}:${sub.externalResourceId}:${build.id}`,
},
{ resourceId: sub.resourceId, threadId: sub.threadId },
)
}
}
}
Register the provider with an agent and subscribe a thread to the pipeline you want to watch.
import { Agent } from '@mastra/core/agent'
import { CiSignals } from '../signals/ci-signals'
export const ciSignals = new CiSignals()
export const supportAgent = new Agent({
id: 'support-agent',
name: 'Support Agent',
instructions: 'Help the user triage CI updates.',
model: 'openai/gpt-5.5',
signals: [ciSignals],
})
ciSignals.watch({ resourceId: 'user_123', threadId: 'thread_456' }, 'acme-app-main')
Mastra calls poll() on the pollInterval with all active subscriptions. It skips a cycle when there are no subscriptions and doesn't overlap cycles, so a slow poll() doesn't run concurrently with itself.
For a complete polling-provider build with notification storage, agent registration, thread subscription, and testing, follow Building a signal provider.
Polling and webhook providersDirect link to Polling and webhook providers
Use polling when the external source doesn't push events to your app. Set pollInterval and override poll(subscriptions). Each subscription includes the thread target and the external resource id to inspect.
Use webhooks when the external source can call your app. Override handleWebhook(request), parse the payload, find matching subscriptions, and call notify() for each match.
import { SignalProvider } from '@mastra/core/signals'
import type { SignalProviderWebhookRequest } from '@mastra/core/signals'
export class CiSignals extends SignalProvider<'ci-signals'> {
readonly id = 'ci-signals' as const
async handleWebhook(request: SignalProviderWebhookRequest) {
const payload = request.body as { pipeline: string; status: string }
const subscriptions = this.getSubscriptionsForResource(payload.pipeline)
for (const sub of subscriptions) {
await this.notify(
{
source: this.id,
kind: 'ci-status',
priority: 'high',
summary: `Build ${payload.status} for ${payload.pipeline}`,
payload,
},
{ resourceId: sub.resourceId, threadId: sub.threadId },
)
}
return { status: 200, body: { matched: subscriptions.length } }
}
}
handleWebhook() is a provider method, not an auto-mounted HTTP route. Invoke it from your own endpoint, passing the request body, headers, and any route params.
Visit SignalProvider reference for subscription, polling, lifecycle, and notify() details. For the complete notification payload shape, including deduplication and coalescing fields, visit Agent.sendNotificationSignal() reference.
Built-in webhook providerDirect link to Built-in webhook provider
For generic webhook sources, use WebhookSignalProvider instead of writing a subclass. Configure it with a function that extracts a resource id from the payload, and an optional function that builds the notification.
import { Agent } from '@mastra/core/agent'
import { WebhookSignalProvider } from '@mastra/core/signals'
const webhooks = new WebhookSignalProvider({
extractResourceId: payload => (payload as { repository: string }).repository,
buildNotification: (payload, sub) => ({
source: 'ci',
kind: 'build-status',
priority: 'medium',
summary: `Build ${(payload as { status: string }).status} for ${sub.externalResourceId}`,
}),
})
export const supportAgent = new Agent({
id: 'support-agent',
name: 'Support Agent',
instructions: 'Help the user triage updates.',
model: 'openai/gpt-5.5',
signals: [webhooks],
})
webhooks.subscribeThread({ resourceId: 'user_123', threadId: 'thread_456' }, 'acme/app')
When a webhook arrives, call webhooks.handleWebhook({ body, headers }) from your route. The provider matches the extracted resource id against its subscriptions and notifies each matching thread.
Advanced provider capabilitiesDirect link to Advanced provider capabilities
A provider can support more than event ingestion. Add only the capabilities your source needs.
- Durable subscriptions: The base registry is in-memory and per-process. Persist subscriptions yourself when they must survive a restart, then rehydrate them in
start(). - Lifecycle hooks: Override
start()for async setup andstop()for cleanup. Callsuper.stop()when overridingstop()so the base provider can stop polling and clear its registry. - Processors and tools: Return processors from
getInputProcessors()orgetOutputProcessors(), and return agent-callable tools fromgetTools().
The @mastra/github-signals package is a production signal provider that watches GitHub pull requests and notifies threads about comments, review state, continuous integration status, and merges. Use it as a reference for polling, durable subscriptions, tools, processors, and lifecycle hooks.
import { Agent } from '@mastra/core/agent'
import { GithubSignals } from '@mastra/github-signals'
export const devAgent = new Agent({
id: 'dev-agent',
name: 'Dev Agent',
instructions: 'Help triage pull request activity.',
model: 'openai/gpt-5.5',
signals: [new GithubSignals()],
})