Building a signal provider
In this guide, you'll build a signal provider that polls an external service on an interval and pushes a notification into an agent thread whenever a watched resource changes. You'll learn how to extend the SignalProvider base class, track subscriptions, emit notifications from a poll loop, and register the provider on an agent.
The example watches build pipelines in a fake CI service, but the pattern applies to any pull-based source: an issue tracker, a status API, a queue, or your own backend.
Signal providers are in alpha. Breaking changes may occur without a major version bump until the API is stable.
PrerequisitesDirect link to Prerequisites
- Node.js
v22.13.0or later installed - An API key from a supported Model Provider
- An existing Mastra project. Follow the installation guide if needed.
This guide also assumes you understand signals at a high level. For the full API surface, see the SignalProvider reference.
Add notification storageDirect link to Add notification storage
A signal provider pushes notification signals into threads, and notifications require a storage adapter that supports the notifications domain. Configure storage on your Mastra instance.
import { Mastra } from '@mastra/core'
import { LibSQLStore } from '@mastra/libsql'
export const mastra = new Mastra({
storage: new LibSQLStore({
url: 'file:./mastra.db',
}),
})
LibSQL, PostgreSQL, and MongoDB all support notification records. Without notification storage, the provider's notify() calls throw at runtime.
Create the external service clientDirect link to Create the external service client
Real providers call an external API. To keep this guide self-contained, create a small fake CI client that returns a build status for a pipeline. Swap this for your real API client later.
export type BuildStatus = {
id: string
pipeline: string
status: 'passed' | 'failed' | 'running'
}
// Returns a random status so you can see notifications fire while testing.
export async function fetchBuildStatus(pipeline: string): Promise<BuildStatus> {
const states: BuildStatus['status'][] = ['passed', 'failed', 'running']
const status = states[Math.floor(Math.random() * states.length)]!
return { id: `build_${Date.now()}`, pipeline, status }
}
This client returns a random status each call. When you wire in a real API, only this file changes.
Build the signal providerDirect link to Build the signal provider
Extend SignalProvider, implement the abstract id field, set a pollInterval, and override poll(). The base class calls poll() on the interval with every active subscription. Emit a notification only for the builds you care about.
import { SignalProvider } from '@mastra/core/signals'
import type { SignalProviderTarget, SignalSubscription } from '@mastra/core/signals'
import { fetchBuildStatus } from './ci-client'
export class CiSignals extends SignalProvider<'ci-signals'> {
readonly id = 'ci-signals' as const
readonly pollInterval = 10_000 // poll every 10 seconds
// Public API so callers can subscribe a thread to a pipeline.
watch(target: SignalProviderTarget, pipeline: string): SignalSubscription {
return this.subscribe(target, pipeline)
}
unwatch(target: SignalProviderTarget, pipeline: string): boolean {
return this.unsubscribe(target, pipeline)
}
async poll(subscriptions: SignalSubscription[]): Promise<void> {
for (const sub of subscriptions) {
const build = await fetchBuildStatus(sub.externalResourceId)
if (build.status !== 'failed') continue
await this.notify(
{
source: this.id,
kind: 'ci-status',
priority: 'high',
summary: `Build failed for ${sub.externalResourceId}`,
payload: build,
dedupeKey: `${this.id}:${sub.externalResourceId}:${build.id}`,
},
{ resourceId: sub.resourceId, threadId: sub.threadId },
)
}
}
}
A few things to note:
subscribe()andunsubscribe()are protected on the base class. Wrap them in your own public methods (watch/unwatch) so callers can manage subscriptions.externalResourceIdis any provider-specific string. Here it's the pipeline name; a GitHub provider might use"github:owner/repo#123".notify()forwards a notification signal to the connected agent's thread. It throws if the provider was never registered on an agent.dedupeKeyprevents storing the same failure twice.
Register the provider on an agentDirect link to Register the provider on an agent
Pass the provider to the agent through signals. The agent connects it and starts the poll loop automatically.
import { Agent } from '@mastra/core/agent'
import { CiSignals } from '../signals/ci-signals'
export const ciSignals = new CiSignals()
export const devAgent = new Agent({
id: 'dev-agent',
name: 'Dev Agent',
instructions: 'Help the user triage CI build failures.',
model: 'openai/gpt-5.5',
signals: [ciSignals],
})
Register the agent with Mastra and add the storage from the first step.
import { Mastra } from '@mastra/core'
import { LibSQLStore } from '@mastra/libsql'
import { devAgent } from './agents/dev-agent'
export const mastra = new Mastra({
agents: { devAgent },
storage: new LibSQLStore({
url: 'file:./mastra.db',
}),
})
Subscribe a threadDirect link to Subscribe a thread
A provider only polls resources that a thread is watching. Subscribe a thread to a pipeline so poll() has something to check.
import { ciSignals } from './agents/dev-agent'
ciSignals.watch({ resourceId: 'user_123', threadId: 'thread_456' }, 'acme/app:main')
Run this once after the agent is registered, for example from a setup script or an API route. The subscription lives in the provider's in-memory registry, so re-subscribe after a restart.
Test the signal providerDirect link to Test the signal provider
Start the dev server:
- npm
- pnpm
- Yarn
- Bun
npm run dev
pnpm run dev
yarn dev
bun run dev
Make sure a thread is subscribed, then watch the logs. The fake client returns a random status each poll, so within a few cycles you'll see a failed build trigger a notification for the subscribed thread.
To see the agent react to the notification, subscribe to the thread and stream it:
const subscription = await devAgent.subscribeToThread({
resourceId: 'user_123',
threadId: 'thread_456',
})
for await (const chunk of subscription.stream) {
console.log(chunk)
}
When a build fails, the model receives the notification as context:
<notification source="ci-signals" type="ci-status" priority="high" status="delivered">Build failed for acme/app:main</notification>
Output is non-deterministic because the fake client randomizes status and the model phrases its reply freely, so the exact wording will vary.
Next stepsDirect link to Next steps
You can extend this signal provider to:
- Replace
fetchBuildStatus()with a real API client. - Persist subscriptions so they survive a restart, then rehydrate them in
start(). - Add a webhook entry point with
handleWebhook()for push-based sources. - Expose
subscribeandunsubscribetools withgetTools()so the agent can manage its own subscriptions. - Use
dedupeKeyandcoalesceKeywhen notifications need deduplication or batching.
Learn more: