Skip to main content

Building a signal provider

In this guide, you'll build a signal provider that polls an external service on an interval and pushes a notification into an agent thread whenever a watched resource changes. You'll learn how to extend the SignalProvider base class, track subscriptions, emit notifications from a poll loop, and register the provider on an agent.

The example watches build pipelines in a fake CI service, but the pattern applies to any pull-based source: an issue tracker, a status API, a queue, or your own backend.

alpha

Signal providers are in alpha. Breaking changes may occur without a major version bump until the API is stable.

Prerequisites
Direct link to Prerequisites

This guide also assumes you understand signals at a high level. For the full API surface, see the SignalProvider reference.

Add notification storage
Direct link to Add notification storage

A signal provider pushes notification signals into threads, and notifications require a storage adapter that supports the notifications domain. Configure storage on your Mastra instance.

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { LibSQLStore } from '@mastra/libsql'

export const mastra = new Mastra({
storage: new LibSQLStore({
url: 'file:./mastra.db',
}),
})

LibSQL, PostgreSQL, and MongoDB all support notification records. Without notification storage, the provider's notify() calls throw at runtime.

Create the external service client
Direct link to Create the external service client

Real providers call an external API. To keep this guide self-contained, create a small fake CI client that returns a build status for a pipeline. Swap this for your real API client later.

src/mastra/signals/ci-client.ts
export type BuildStatus = {
id: string
pipeline: string
status: 'passed' | 'failed' | 'running'
}

// Returns a random status so you can see notifications fire while testing.
export async function fetchBuildStatus(pipeline: string): Promise<BuildStatus> {
const states: BuildStatus['status'][] = ['passed', 'failed', 'running']
const status = states[Math.floor(Math.random() * states.length)]!
return { id: `build_${Date.now()}`, pipeline, status }
}

This client returns a random status each call. When you wire in a real API, only this file changes.

Build the signal provider
Direct link to Build the signal provider

Extend SignalProvider, implement the abstract id field, set a pollInterval, and override poll(). The base class calls poll() on the interval with every active subscription. Emit a notification only for the builds you care about.

src/mastra/signals/ci-signals.ts
import { SignalProvider } from '@mastra/core/signals'
import type { SignalProviderTarget, SignalSubscription } from '@mastra/core/signals'
import { fetchBuildStatus } from './ci-client'

export class CiSignals extends SignalProvider<'ci-signals'> {
readonly id = 'ci-signals' as const
readonly pollInterval = 10_000 // poll every 10 seconds

// Public API so callers can subscribe a thread to a pipeline.
watch(target: SignalProviderTarget, pipeline: string): SignalSubscription {
return this.subscribe(target, pipeline)
}

unwatch(target: SignalProviderTarget, pipeline: string): boolean {
return this.unsubscribe(target, pipeline)
}

async poll(subscriptions: SignalSubscription[]): Promise<void> {
for (const sub of subscriptions) {
const build = await fetchBuildStatus(sub.externalResourceId)
if (build.status !== 'failed') continue

await this.notify(
{
source: this.id,
kind: 'ci-status',
priority: 'high',
summary: `Build failed for ${sub.externalResourceId}`,
payload: build,
dedupeKey: `${this.id}:${sub.externalResourceId}:${build.id}`,
},
{ resourceId: sub.resourceId, threadId: sub.threadId },
)
}
}
}

A few things to note:

  • subscribe() and unsubscribe() are protected on the base class. Wrap them in your own public methods (watch / unwatch) so callers can manage subscriptions.
  • externalResourceId is any provider-specific string. Here it's the pipeline name; a GitHub provider might use "github:owner/repo#123".
  • notify() forwards a notification signal to the connected agent's thread. It throws if the provider was never registered on an agent.
  • dedupeKey prevents storing the same failure twice.

Register the provider on an agent
Direct link to Register the provider on an agent

Pass the provider to the agent through signals. The agent connects it and starts the poll loop automatically.

src/mastra/agents/dev-agent.ts
import { Agent } from '@mastra/core/agent'
import { CiSignals } from '../signals/ci-signals'

export const ciSignals = new CiSignals()

export const devAgent = new Agent({
id: 'dev-agent',
name: 'Dev Agent',
instructions: 'Help the user triage CI build failures.',
model: 'openai/gpt-5.5',
signals: [ciSignals],
})

Register the agent with Mastra and add the storage from the first step.

src/mastra/index.ts
import { Mastra } from '@mastra/core'
import { LibSQLStore } from '@mastra/libsql'
import { devAgent } from './agents/dev-agent'

export const mastra = new Mastra({
agents: { devAgent },
storage: new LibSQLStore({
url: 'file:./mastra.db',
}),
})

Subscribe a thread
Direct link to Subscribe a thread

A provider only polls resources that a thread is watching. Subscribe a thread to a pipeline so poll() has something to check.

src/mastra/subscribe.ts
import { ciSignals } from './agents/dev-agent'

ciSignals.watch({ resourceId: 'user_123', threadId: 'thread_456' }, 'acme/app:main')

Run this once after the agent is registered, for example from a setup script or an API route. The subscription lives in the provider's in-memory registry, so re-subscribe after a restart.

Test the signal provider
Direct link to Test the signal provider

Start the dev server:

npm run dev

Make sure a thread is subscribed, then watch the logs. The fake client returns a random status each poll, so within a few cycles you'll see a failed build trigger a notification for the subscribed thread.

To see the agent react to the notification, subscribe to the thread and stream it:

src/mastra/watch-thread.ts
const subscription = await devAgent.subscribeToThread({
resourceId: 'user_123',
threadId: 'thread_456',
})

for await (const chunk of subscription.stream) {
console.log(chunk)
}

When a build fails, the model receives the notification as context:

<notification source="ci-signals" type="ci-status" priority="high" status="delivered">Build failed for acme/app:main</notification>

Output is non-deterministic because the fake client randomizes status and the model phrases its reply freely, so the exact wording will vary.

Next steps
Direct link to Next steps

You can extend this signal provider to:

  • Replace fetchBuildStatus() with a real API client.
  • Persist subscriptions so they survive a restart, then rehydrate them in start().
  • Add a webhook entry point with handleWebhook() for push-based sources.
  • Expose subscribe and unsubscribe tools with getTools() so the agent can manage its own subscriptions.
  • Use dedupeKey and coalesceKey when notifications need deduplication or batching.

Learn more: