LeaseProvider
LeaseProvider is the distributed leasing contract, separate from event delivery (PubSub). Mastra's signals layer uses it to elect a single owner across multiple processes (for example, serverless invocations) for a given resource, most commonly a thread key. The owner is the process that wakes and runs the agent stream, so other processes route follow-up work to it instead of starting a competing run.
Leasing is a distinct concern from pub/sub. A backend implements LeaseProvider only when it can genuinely coordinate a lock, such as Redis via atomic SET/Lua, or an in-memory map for single-process. Backends that cannot lease omit it; the signals runtime feature-detects the capability and falls back to a no-op provider, preserving single-process behavior.
The built-in RedisStreamsPubSub implements LeaseProvider, which is what enables signals to coordinate across instances in distributed and serverless deployments.
Usage exampleDirect link to Usage example
You don't construct a LeaseProvider directly. Configure a pub/sub backend that implements it (such as RedisStreamsPubSub) on the Mastra constructor, and the signals runtime uses it automatically for cross-process coordination.
import { Mastra } from '@mastra/core'
import { RedisStreamsPubSub } from '@mastra/redis-streams'
export const mastra = new Mastra({
// RedisStreamsPubSub implements both PubSub and LeaseProvider
pubsub: new RedisStreamsPubSub({
url: process.env.REDIS_URL,
}),
})
To implement leasing in a custom backend, implement the methods below. The signals runtime detects the capability structurally (so it works across package boundaries) and only uses it when all methods are present.
import { PubSub } from '@mastra/core/events'
import type { LeaseProvider } from '@mastra/core/events'
export class CustomPubSub extends PubSub implements LeaseProvider {
async acquireLease(key: string, owner: string, ttlMs: number) {
// Atomically claim the lease, or report the current holder.
return { acquired: true, owner }
}
// ...getLeaseOwner, releaseLease, renewLease, transferLease
}
MethodsDirect link to Methods
LeasingDirect link to Leasing
acquireLease(key, owner, ttlMs)Direct link to acquireleasekey-owner-ttlms
Atomically tries to acquire a lease on a key. Returns { acquired: true, owner } if the caller claimed the lease, or { acquired: false, owner } where owner is the current holder, so the caller can route follow-up work to them. The same owner can call acquireLease idempotently to renew or re-claim.
const result = await pubsub.acquireLease('thread:abc', runId, 15000)
if (result.acquired) {
// This process owns the thread, so wake and run the agent.
} else {
// result.owner holds the lease, so route the signal to them.
}
Returns: Promise<{ acquired: boolean; owner?: string }>
key:
owner:
ttlMs:
getLeaseOwner(key)Direct link to getleaseownerkey
Reads the current owner of a lease, or undefined if no lease is held.
const owner = await pubsub.getLeaseOwner('thread:abc')
Returns: Promise<string | undefined>
releaseLease(key, owner)Direct link to releaseleasekey-owner
Releases a lease. This is a no-op if the caller is not the current owner: implementations check ownership atomically before releasing, so a concurrent renewal by another owner is never clobbered.
await pubsub.releaseLease('thread:abc', runId)
Returns: Promise<void>
renewLease(key, owner, ttlMs)Direct link to renewleasekey-owner-ttlms
Renews an existing lease owned by owner, extending its TTL. Returns true if the renewal succeeded and the caller still owns the lease, or false if the lease was lost (TTL expired or another owner took it).
const stillOwned = await pubsub.renewLease('thread:abc', runId, 15000)
if (!stillOwned) {
// Lost the lease, so stop renewing and let the new owner take over.
}
Returns: Promise<boolean>
transferLease(key, fromOwner, toOwner, ttlMs)Direct link to transferleasekey-fromowner-toowner-ttlms
Atomically hands a held lease from fromOwner to toOwner, refreshing its TTL, without releasing the key in between. This is the gap-free primitive used when one owner finishes but a follow-up owner must take over the same key immediately, for example when a thread run completes and a queued follow-up run drains on the same thread. A naive release-then-acquire would briefly leave the key empty, letting a racing process win the freed lease and start a competing run.
Returns true if fromOwner still held the lease and ownership moved to toOwner, or false if the lease was already lost, in which case the caller should fall back to a fresh acquireLease.
const transferred = await pubsub.transferLease('thread:abc', currentRunId, nextRunId, 15000)
if (!transferred) {
// Lease was lost, so acquire fresh instead.
await pubsub.acquireLease('thread:abc', nextRunId, 15000)
}
Returns: Promise<boolean>
Backends that cannot perform the transfer atomically must still implement it as a best-effort releaseLease(fromOwner) followed by acquireLease(toOwner), and document that the swap is non-atomic, since a racing process can win the key in the gap. Keeping the method required means callers have a single code path and atomicity is an explicit per-backend decision.
Capability detectionDirect link to Capability detection
The signals runtime detects LeaseProvider structurally rather than with instanceof, so detection works even when a separately published backend resolves a different copy of @mastra/core. A value is treated as a LeaseProvider when it exposes all five methods (acquireLease, getLeaseOwner, releaseLease, renewLease, transferLease).
When the configured pub/sub backend does not implement LeaseProvider, the runtime falls back to an always-win no-op provider. Every caller wins its own lease race, and release, renew, and transfer are inert, which preserves the expected single-process behavior.
RelatedDirect link to Related
- PubSub: The event delivery contract, separate from leasing
- RedisStreamsPubSub: The built-in backend that implements
LeaseProvider - Signals: The runtime that uses leasing to coordinate thread runs across processes
- Channels: Uses leasing to coordinate agent runs in serverless and multi-instance deployments