# PostgreSQL Storage The PostgreSQL storage implementation provides a production-ready storage solution using PostgreSQL databases. ## Installation **npm**: ```bash npm install @mastra/pg@latest ``` **pnpm**: ```bash pnpm add @mastra/pg@latest ``` **Yarn**: ```bash yarn add @mastra/pg@latest ``` **Bun**: ```bash bun add @mastra/pg@latest ``` ## Usage ```typescript import { PostgresStore } from "@mastra/pg"; const storage = new PostgresStore({ id: 'pg-storage', connectionString: process.env.DATABASE_URL, }); ``` ## Parameters **id:** (`string`): Unique identifier for this storage instance. **connectionString?:** (`string`): PostgreSQL connection string (e.g., postgresql://user:pass\@host:5432/dbname). Required unless using \`pool\` or individual host-based parameters (\`host\`, \`port\`, \`database\`, \`user\`, \`password\`). **host?:** (`string`): Database server hostname or IP address. Used with other host-based parameters as an alternative to connectionString. **port?:** (`number`): Database server port number. Defaults to 5432 if not specified. **database?:** (`string`): Name of the database to connect to. **user?:** (`string`): Database user for authentication. **password?:** (`string`): Password for the database user. **pool?:** (`pg.Pool`): Pre-configured pg.Pool instance. Use this to reuse an existing connection pool. When provided, Mastra will not create its own pool and will not close it when \`store.close()\` is called. **schemaName?:** (`string`): The name of the schema you want the storage to use. Defaults to 'public'. **ssl?:** (`boolean | ConnectionOptions`): SSL configuration for the connection; set to true to use default SSL or provide a ConnectionOptions object for custom SSL settings. **max?:** (`number`): Maximum number of connections in the pool. Defaults to 20. **idleTimeoutMillis?:** (`number`): How long a connection can sit idle before being closed. Defaults to 30000 (30 seconds). **disableInit?:** (`boolean`): When true, automatic table creation/migrations are disabled. Useful for CI/CD pipelines where migrations are run separately. **skipDefaultIndexes?:** (`boolean`): When true, default indexes will not be created during initialization. **indexes?:** (`CreateIndexOptions[]`): Custom indexes to create during initialization. ## Constructor Examples You can instantiate `PostgresStore` in the following ways: ```ts import { PostgresStore } from "@mastra/pg"; import { Pool } from "pg"; // Using a connection string const store1 = new PostgresStore({ id: 'pg-storage-1', connectionString: "postgresql://user:password@localhost:5432/mydb", }); // Using a connection string with pool options const store2 = new PostgresStore({ id: 'pg-storage-2', connectionString: "postgresql://user:password@localhost:5432/mydb", schemaName: "custom_schema", max: 30, // Max pool connections idleTimeoutMillis: 60000, // Idle timeout ssl: { rejectUnauthorized: false }, }); // Using individual connection parameters const store3 = new PostgresStore({ id: 'pg-storage-3', host: "localhost", port: 5432, database: "mydb", user: "user", password: "password", }); // Using a pre-configured pg.Pool (recommended for pool reuse) const existingPool = new Pool({ connectionString: "postgresql://user:password@localhost:5432/mydb", max: 20, // ... your custom pool configuration }); const store4 = new PostgresStore({ id: 'pg-storage-4', pool: existingPool, schemaName: "custom_schema", // optional }); ``` ## Additional Notes ### Schema Management The storage implementation handles schema creation and updates automatically. It creates the following tables: - `mastra_workflow_snapshot`: Stores workflow state and execution data - `mastra_evals`: Stores evaluation results and metadata - `mastra_threads`: Stores conversation threads - `mastra_messages`: Stores individual messages - `mastra_traces`: Stores telemetry and tracing data - `mastra_scorers`: Stores scoring and evaluation data - `mastra_resources`: Stores resource working memory data ### Observability PostgreSQL supports observability and can handle low trace volumes. Throughput capacity depends on deployment factors such as hardware, schema design, indexing, and retention policies, and should be validated for your specific environment. For high-volume production environments, consider: - Using the `insert-only` [tracing strategy](https://mastra.ai/docs/observability/tracing/exporters/default) to reduce database write operations - Setting up table partitioning for efficient data retention - Migrating observability to [ClickHouse via composite storage](https://mastra.ai/reference/storage/composite) if you need to scale further ### Initialization When you pass storage to the Mastra class, `init()` is called automatically before any storage operation: ```typescript import { Mastra } from "@mastra/core"; import { PostgresStore } from "@mastra/pg"; const storage = new PostgresStore({ id: 'pg-storage', connectionString: process.env.DATABASE_URL, }); const mastra = new Mastra({ storage, // init() is called automatically }); ``` If you're using storage directly without Mastra, you must call `init()` explicitly to create the tables: ```typescript import { PostgresStore } from "@mastra/pg"; const storage = new PostgresStore({ id: 'pg-storage', connectionString: process.env.DATABASE_URL, }); // Required when using storage directly await storage.init(); // Access domain-specific stores via getStore() const memoryStore = await storage.getStore('memory'); const thread = await memoryStore?.getThreadById({ threadId: "..." }); ``` > **Warning:** If `init()` is not called, tables won't be created and storage operations will fail silently or throw errors. ### Using an Existing Pool If you already have a `pg.Pool` in your application (e.g., shared with an ORM or for Row Level Security), you can pass it directly to `PostgresStore`: ```typescript import { Pool } from "pg"; import { PostgresStore } from "@mastra/pg"; // Your existing pool (shared across your application) const pool = new Pool({ connectionString: process.env.DATABASE_URL, max: 20, }); const storage = new PostgresStore({ id: "shared-storage", pool: pool, }); ``` **Pool lifecycle behavior:** - When you **provide a pool**: Mastra uses your pool but does **not** close it when `store.close()` is called. You manage the pool lifecycle. - When Mastra **creates a pool**: Mastra owns the pool and will close it when `store.close()` is called. ### Direct Database and Pool Access `PostgresStore` exposes the underlying database client and pool for advanced use cases: ```typescript store.db; // DbClient - query interface with helpers (any, one, tx, etc.) store.pool; // pg.Pool - the underlying connection pool ``` **Using `store.db` for queries:** ```typescript // Execute queries with helper methods const users = await store.db.any("SELECT * FROM users WHERE active = $1", [true]); const user = await store.db.one("SELECT * FROM users WHERE id = $1", [userId]); const maybeUser = await store.db.oneOrNone("SELECT * FROM users WHERE email = $1", [email]); // Use transactions const result = await store.db.tx(async (t) => { await t.none("INSERT INTO logs (message) VALUES ($1)", ["Started"]); const data = await t.any("SELECT * FROM items"); return data; }); ``` **Using `store.pool` directly:** ```typescript // Get a client for manual connection management const client = await store.pool.connect(); try { await client.query("SET LOCAL app.user_id = $1", [userId]); const result = await client.query("SELECT * FROM protected_table"); return result.rows; } finally { client.release(); } ``` When using these fields: - You are responsible for proper connection and transaction handling. - Closing the store (`store.close()`) will destroy the pool only if Mastra created it. - Direct access bypasses any additional logic or validation provided by PostgresStore methods. This approach is intended for advanced scenarios where low-level access is required. ### Using with Next.js When using `PostgresStore` in Next.js applications, [Hot Module Replacement (HMR)](https://nextjs.org/docs/architecture/fast-refresh) during development can cause multiple storage instances to be created, resulting in this warning: ```text WARNING: Creating a duplicate database object for the same connection. ``` To prevent this, store the `PostgresStore` instance on the global object so it persists across HMR reloads: ```typescript import { PostgresStore } from "@mastra/pg"; import { Memory } from "@mastra/memory"; // Extend the global type to include our instances declare global { var pgStore: PostgresStore | undefined; var memory: Memory | undefined; } // Get or create the PostgresStore instance function getPgStore(): PostgresStore { if (!global.pgStore) { if (!process.env.DATABASE_URL) { throw new Error("DATABASE_URL is not defined in environment variables"); } global.pgStore = new PostgresStore({ id: "pg-storage", connectionString: process.env.DATABASE_URL, ssl: process.env.DATABASE_SSL === "true" ? { rejectUnauthorized: false } : false, }); } return global.pgStore; } // Get or create the Memory instance function getMemory(): Memory { if (!global.memory) { global.memory = new Memory({ storage: getPgStore(), }); } return global.memory; } export const storage = getPgStore(); export const memory = getMemory(); ``` Then use the exported instances in your Mastra configuration: ```typescript import { Mastra } from "@mastra/core/mastra"; import { storage } from "./storage"; export const mastra = new Mastra({ storage, // ...other config }); ``` This pattern ensures only one `PostgresStore` instance is created regardless of how many times the module is reloaded during development. The same pattern can be applied to other storage providers like `LibSQLStore`. > **Tip:** This singleton pattern is only necessary during local development with HMR. In production builds, modules are only loaded once. ## Usage Example ### Adding memory to an agent To add PostgreSQL memory to an agent use the `Memory` class and create a new `storage` key using `PostgresStore`. The `connectionString` can either be a remote location, or a local database connection. ```typescript import { Memory } from "@mastra/memory"; import { Agent } from "@mastra/core/agent"; import { PostgresStore } from "@mastra/pg"; export const pgAgent = new Agent({ id: "pg-agent", name: "PG Agent", instructions: "You are an AI agent with the ability to automatically recall memories from previous interactions.", model: "openai/gpt-5.1", memory: new Memory({ storage: new PostgresStore({ id: 'pg-agent-storage', connectionString: process.env.DATABASE_URL!, }), options: { generateTitle: true, // Explicitly enable automatic title generation }, }), }); ``` ### Using the agent Use `memoryOptions` to scope recall for this request. Set `lastMessages: 5` to limit recency-based recall, and use `semanticRecall` to fetch the `topK: 3` most relevant messages, including `messageRange: 2` neighboring messages for context around each match. ```typescript import "dotenv/config"; import { mastra } from "./mastra"; const threadId = "123"; const resourceId = "user-456"; const agent = mastra.getAgent("pg-agent"); const message = await agent.stream("My name is Mastra", { memory: { thread: threadId, resource: resourceId, }, }); await message.textStream.pipeTo(new WritableStream()); const stream = await agent.stream("What's my name?", { memory: { thread: threadId, resource: resourceId, }, memoryOptions: { lastMessages: 5, semanticRecall: { topK: 3, messageRange: 2, }, }, }); for await (const chunk of stream.textStream) { process.stdout.write(chunk); } ``` ## Index Management PostgreSQL storage provides index management to optimize query performance. ### Default Indexes PostgreSQL storage creates composite indexes during initialization for common query patterns: - `mastra_threads_resourceid_createdat_idx`: (resourceId, createdAt DESC) - `mastra_messages_thread_id_createdat_idx`: (thread\_id, createdAt DESC) - `mastra_ai_spans_traceid_startedat_idx`: (traceId, startedAt DESC) - `mastra_ai_spans_parentspanid_startedat_idx`: (parentSpanId, startedAt DESC) - `mastra_ai_spans_name_startedat_idx`: (name, startedAt DESC) - `mastra_ai_spans_scope_startedat_idx`: (scope, startedAt DESC) - `mastra_scores_trace_id_span_id_created_at_idx`: (traceId, spanId, createdAt DESC) These indexes improve performance for filtered queries with sorting, including `dateRange` filters on message queries. ### Configuring Indexes You can control index creation via constructor options: ```typescript import { PostgresStore } from "@mastra/pg"; // Skip default indexes (manage indexes separately) const store = new PostgresStore({ id: 'pg-storage', connectionString: process.env.DATABASE_URL, skipDefaultIndexes: true, }); // Add custom indexes during initialization const storeWithCustomIndexes = new PostgresStore({ id: 'pg-storage', connectionString: process.env.DATABASE_URL, indexes: [ { name: "idx_threads_metadata_type", table: "mastra_threads", columns: ["metadata->>'type'"], }, { name: "idx_messages_status", table: "mastra_messages", columns: ["metadata->>'status'"], }, ], }); ``` For advanced index types, you can specify additional options: - `unique: true` for unique constraints - `where: 'condition'` for partial indexes - `method: 'brin'` for time-series data - `storage: { fillfactor: 90 }` for update-heavy tables - `concurrent: true` for non-blocking creation (default) ### Index Options **name:** (`string`): Unique name for the index **table:** (`string`): Table name (e.g., 'mastra\_threads') **columns:** (`string[]`): Array of column names with optional sort order (e.g., \['id', 'createdAt DESC']) **unique?:** (`boolean`): Creates a unique constraint index **concurrent?:** (`boolean`): Creates index without locking table (default: true) **where?:** (`string`): Partial index condition (PostgreSQL specific) **method?:** (`'btree' | 'hash' | 'gin' | 'gist' | 'spgist' | 'brin'`): Index method (default: 'btree') **opclass?:** (`string`): Operator class for GIN/GIST indexes **storage?:** (`Record`): Storage parameters (e.g., { fillfactor: 90 }) **tablespace?:** (`string`): Tablespace name for index placement ### Schema-Specific Indexes When using custom schemas, index names are prefixed with the schema name: ```typescript const storage = new PostgresStore({ id: 'pg-storage', connectionString: process.env.DATABASE_URL, schemaName: "custom_schema", indexes: [ { name: "idx_threads_status", table: "mastra_threads", columns: ["status"], }, ], }); // Creates index as: custom_schema_idx_threads_status ``` ### Managing Indexes via SQL For advanced index management (listing, dropping, analyzing), use direct SQL queries via the `db` accessor: ```typescript // List indexes for a table const indexes = await storage.db.any(` SELECT indexname, indexdef FROM pg_indexes WHERE tablename = 'mastra_messages' `); // Drop an index await storage.db.none('DROP INDEX IF EXISTS idx_my_custom_index'); // Analyze index usage const stats = await storage.db.one(` SELECT idx_scan, idx_tup_read FROM pg_stat_user_indexes WHERE indexrelname = 'mastra_messages_thread_id_createdat_idx' `); ``` ### Index Types and Use Cases PostgreSQL offers different index types optimized for specific scenarios: | Index Type | Best For | Storage | Speed | | ------------------- | --------------------------------------- | ---------- | -------------------------- | | **btree** (default) | Range queries, sorting, general purpose | Moderate | Fast | | **hash** | Equality comparisons only | Small | Very fast for `=` | | **gin** | JSONB, arrays, full-text search | Large | Fast for contains | | **gist** | Geometric data, full-text search | Moderate | Fast for nearest-neighbor | | **spgist** | Non-balanced data, text patterns | Small | Fast for specific patterns | | **brin** | Large tables with natural ordering | Very small | Fast for ranges |