PostgreSQL Storage
The PostgreSQL storage implementation provides a production-ready storage solution using PostgreSQL databases.
InstallationDirect link to Installation
- npm
- pnpm
- Yarn
- Bun
npm install @mastra/pg@latest
pnpm add @mastra/pg@latest
yarn add @mastra/pg@latest
bun add @mastra/pg@latest
UsageDirect link to Usage
import { PostgresStore } from "@mastra/pg";
const storage = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
});
ParametersDirect link to Parameters
id:
connectionString?:
host?:
port?:
database?:
user?:
password?:
pool?:
schemaName?:
ssl?:
max?:
idleTimeoutMillis?:
disableInit?:
skipDefaultIndexes?:
indexes?:
Constructor ExamplesDirect link to Constructor Examples
You can instantiate PostgresStore in the following ways:
import { PostgresStore } from "@mastra/pg";
import { Pool } from "pg";
// Using a connection string
const store1 = new PostgresStore({
id: 'pg-storage-1',
connectionString: "postgresql://user:password@localhost:5432/mydb",
});
// Using a connection string with pool options
const store2 = new PostgresStore({
id: 'pg-storage-2',
connectionString: "postgresql://user:password@localhost:5432/mydb",
schemaName: "custom_schema",
max: 30, // Max pool connections
idleTimeoutMillis: 60000, // Idle timeout
ssl: { rejectUnauthorized: false },
});
// Using individual connection parameters
const store3 = new PostgresStore({
id: 'pg-storage-3',
host: "localhost",
port: 5432,
database: "mydb",
user: "user",
password: "password",
});
// Using a pre-configured pg.Pool (recommended for pool reuse)
const existingPool = new Pool({
connectionString: "postgresql://user:password@localhost:5432/mydb",
max: 20,
// ... your custom pool configuration
});
const store4 = new PostgresStore({
id: 'pg-storage-4',
pool: existingPool,
schemaName: "custom_schema", // optional
});
Additional NotesDirect link to Additional Notes
Schema ManagementDirect link to Schema Management
The storage implementation handles schema creation and updates automatically. It creates the following tables:
mastra_workflow_snapshot: Stores workflow state and execution datamastra_evals: Stores evaluation results and metadatamastra_threads: Stores conversation threadsmastra_messages: Stores individual messagesmastra_traces: Stores telemetry and tracing datamastra_scorers: Stores scoring and evaluation datamastra_resources: Stores resource working memory data
ObservabilityDirect link to Observability
PostgreSQL supports observability and can handle low trace volumes. Throughput capacity depends on deployment factors such as hardware, schema design, indexing, and retention policies, and should be validated for your specific environment. For high-volume production environments, consider:
- Using the
insert-onlytracing strategy to reduce database write operations - Setting up table partitioning for efficient data retention
- Migrating observability to ClickHouse via composite storage if you need to scale further
InitializationDirect link to Initialization
When you pass storage to the Mastra class, init() is called automatically before any storage operation:
import { Mastra } from "@mastra/core";
import { PostgresStore } from "@mastra/pg";
const storage = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
});
const mastra = new Mastra({
storage, // init() is called automatically
});
If you're using storage directly without Mastra, you must call init() explicitly to create the tables:
import { PostgresStore } from "@mastra/pg";
const storage = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
});
// Required when using storage directly
await storage.init();
// Access domain-specific stores via getStore()
const memoryStore = await storage.getStore('memory');
const thread = await memoryStore?.getThreadById({ threadId: "..." });
If init() is not called, tables won't be created and storage operations will fail silently or throw errors.
Using an Existing PoolDirect link to Using an Existing Pool
If you already have a pg.Pool in your application (e.g., shared with an ORM or for Row Level Security), you can pass it directly to PostgresStore:
import { Pool } from "pg";
import { PostgresStore } from "@mastra/pg";
// Your existing pool (shared across your application)
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20,
});
const storage = new PostgresStore({
id: "shared-storage",
pool: pool,
});
Pool lifecycle behavior:
- When you provide a pool: Mastra uses your pool but does not close it when
store.close()is called. You manage the pool lifecycle. - When Mastra creates a pool: Mastra owns the pool and will close it when
store.close()is called.
Direct Database and Pool AccessDirect link to Direct Database and Pool Access
PostgresStore exposes the underlying database client and pool for advanced use cases:
store.db; // DbClient - query interface with helpers (any, one, tx, etc.)
store.pool; // pg.Pool - the underlying connection pool
Using store.db for queries:
// Execute queries with helper methods
const users = await store.db.any("SELECT * FROM users WHERE active = $1", [true]);
const user = await store.db.one("SELECT * FROM users WHERE id = $1", [userId]);
const maybeUser = await store.db.oneOrNone("SELECT * FROM users WHERE email = $1", [email]);
// Use transactions
const result = await store.db.tx(async (t) => {
await t.none("INSERT INTO logs (message) VALUES ($1)", ["Started"]);
const data = await t.any("SELECT * FROM items");
return data;
});
Using store.pool directly:
// Get a client for manual connection management
const client = await store.pool.connect();
try {
await client.query("SET LOCAL app.user_id = $1", [userId]);
const result = await client.query("SELECT * FROM protected_table");
return result.rows;
} finally {
client.release();
}
When using these fields:
- You are responsible for proper connection and transaction handling.
- Closing the store (
store.close()) will destroy the pool only if Mastra created it. - Direct access bypasses any additional logic or validation provided by PostgresStore methods.
This approach is intended for advanced scenarios where low-level access is required.
Using with Next.jsDirect link to Using with Next.js
When using PostgresStore in Next.js applications, Hot Module Replacement (HMR) during development can cause multiple storage instances to be created, resulting in this warning:
WARNING: Creating a duplicate database object for the same connection.
To prevent this, store the PostgresStore instance on the global object so it persists across HMR reloads:
import { PostgresStore } from "@mastra/pg";
import { Memory } from "@mastra/memory";
// Extend the global type to include our instances
declare global {
var pgStore: PostgresStore | undefined;
var memory: Memory | undefined;
}
// Get or create the PostgresStore instance
function getPgStore(): PostgresStore {
if (!global.pgStore) {
if (!process.env.DATABASE_URL) {
throw new Error("DATABASE_URL is not defined in environment variables");
}
global.pgStore = new PostgresStore({
id: "pg-storage",
connectionString: process.env.DATABASE_URL,
ssl:
process.env.DATABASE_SSL === "true"
? { rejectUnauthorized: false }
: false,
});
}
return global.pgStore;
}
// Get or create the Memory instance
function getMemory(): Memory {
if (!global.memory) {
global.memory = new Memory({
storage: getPgStore(),
});
}
return global.memory;
}
export const storage = getPgStore();
export const memory = getMemory();
Then use the exported instances in your Mastra configuration:
import { Mastra } from "@mastra/core/mastra";
import { storage } from "./storage";
export const mastra = new Mastra({
storage,
// ...other config
});
This pattern ensures only one PostgresStore instance is created regardless of how many times the module is reloaded during development. The same pattern can be applied to other storage providers like LibSQLStore.
This singleton pattern is only necessary during local development with HMR. In production builds, modules are only loaded once.
Usage ExampleDirect link to Usage Example
Adding memory to an agentDirect link to Adding memory to an agent
To add PostgreSQL memory to an agent use the Memory class and create a new storage key using PostgresStore. The connectionString can either be a remote location, or a local database connection.
import { Memory } from "@mastra/memory";
import { Agent } from "@mastra/core/agent";
import { PostgresStore } from "@mastra/pg";
export const pgAgent = new Agent({
id: "pg-agent",
name: "PG Agent",
instructions:
"You are an AI agent with the ability to automatically recall memories from previous interactions.",
model: "openai/gpt-5.1",
memory: new Memory({
storage: new PostgresStore({
id: 'pg-agent-storage',
connectionString: process.env.DATABASE_URL!,
}),
options: {
generateTitle: true, // Explicitly enable automatic title generation
},
}),
});
Using the agentDirect link to Using the agent
Use memoryOptions to scope recall for this request. Set lastMessages: 5 to limit recency-based recall, and use semanticRecall to fetch the topK: 3 most relevant messages, including messageRange: 2 neighboring messages for context around each match.
import "dotenv/config";
import { mastra } from "./mastra";
const threadId = "123";
const resourceId = "user-456";
const agent = mastra.getAgent("pg-agent");
const message = await agent.stream("My name is Mastra", {
memory: {
thread: threadId,
resource: resourceId,
},
});
await message.textStream.pipeTo(new WritableStream());
const stream = await agent.stream("What's my name?", {
memory: {
thread: threadId,
resource: resourceId,
},
memoryOptions: {
lastMessages: 5,
semanticRecall: {
topK: 3,
messageRange: 2,
},
},
});
for await (const chunk of stream.textStream) {
process.stdout.write(chunk);
}
Index ManagementDirect link to Index Management
PostgreSQL storage provides index management to optimize query performance.
Default IndexesDirect link to Default Indexes
PostgreSQL storage creates composite indexes during initialization for common query patterns:
mastra_threads_resourceid_createdat_idx: (resourceId, createdAt DESC)mastra_messages_thread_id_createdat_idx: (thread_id, createdAt DESC)mastra_ai_spans_traceid_startedat_idx: (traceId, startedAt DESC)mastra_ai_spans_parentspanid_startedat_idx: (parentSpanId, startedAt DESC)mastra_ai_spans_name_startedat_idx: (name, startedAt DESC)mastra_ai_spans_scope_startedat_idx: (scope, startedAt DESC)mastra_scores_trace_id_span_id_created_at_idx: (traceId, spanId, createdAt DESC)
These indexes improve performance for filtered queries with sorting, including dateRange filters on message queries.
Configuring IndexesDirect link to Configuring Indexes
You can control index creation via constructor options:
import { PostgresStore } from "@mastra/pg";
// Skip default indexes (manage indexes separately)
const store = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
skipDefaultIndexes: true,
});
// Add custom indexes during initialization
const storeWithCustomIndexes = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
indexes: [
{
name: "idx_threads_metadata_type",
table: "mastra_threads",
columns: ["metadata->>'type'"],
},
{
name: "idx_messages_status",
table: "mastra_messages",
columns: ["metadata->>'status'"],
},
],
});
For advanced index types, you can specify additional options:
unique: truefor unique constraintswhere: 'condition'for partial indexesmethod: 'brin'for time-series datastorage: { fillfactor: 90 }for update-heavy tablesconcurrent: truefor non-blocking creation (default)
Index OptionsDirect link to Index Options
name:
table:
columns:
unique?:
concurrent?:
where?:
method?:
opclass?:
storage?:
tablespace?:
Schema-Specific IndexesDirect link to Schema-Specific Indexes
When using custom schemas, index names are prefixed with the schema name:
const storage = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
schemaName: "custom_schema",
indexes: [
{
name: "idx_threads_status",
table: "mastra_threads",
columns: ["status"],
},
],
});
// Creates index as: custom_schema_idx_threads_status
Managing Indexes via SQLDirect link to Managing Indexes via SQL
For advanced index management (listing, dropping, analyzing), use direct SQL queries via the db accessor:
// List indexes for a table
const indexes = await storage.db.any(`
SELECT indexname, indexdef
FROM pg_indexes
WHERE tablename = 'mastra_messages'
`);
// Drop an index
await storage.db.none('DROP INDEX IF EXISTS idx_my_custom_index');
// Analyze index usage
const stats = await storage.db.one(`
SELECT idx_scan, idx_tup_read
FROM pg_stat_user_indexes
WHERE indexrelname = 'mastra_messages_thread_id_createdat_idx'
`);
Index Types and Use CasesDirect link to Index Types and Use Cases
PostgreSQL offers different index types optimized for specific scenarios:
| Index Type | Best For | Storage | Speed |
|---|---|---|---|
| btree (default) | Range queries, sorting, general purpose | Moderate | Fast |
| hash | Equality comparisons only | Small | Very fast for = |
| gin | JSONB, arrays, full-text search | Large | Fast for contains |
| gist | Geometric data, full-text search | Moderate | Fast for nearest-neighbor |
| spgist | Non-balanced data, text patterns | Small | Fast for specific patterns |
| brin | Large tables with natural ordering | Very small | Fast for ranges |