Chain of Thought Workflow RAG
This example demonstrates how to implement a Retrieval-Augmented Generation (RAG) system using Mastra, OpenAI embeddings, and PGVector for vector storage, with an emphasis on chain-of-thought reasoning using a step-by-step workflow.
Overview
The system implements RAG using Mastra and OpenAI with chain-of-thought prompting through a defined workflow. Here’s what it does:
- Sets up a Mastra agent with GPT-4o-mini for response generation
- Creates a vector query tool to manage vector store interactions
- Defines a workflow with multiple steps for chain-of-thought reasoning
- Processes and chunks text documents
- Creates and stores embeddings in PostgreSQL
- Generates responses through the workflow steps
Setup
Environment Setup
Make sure to set up your environment variables:
POSTGRES_CONNECTION_STRING=your_connection_string_here
Dependencies
Import the necessary dependencies:
import { Mastra, Agent, EmbedManyResult, Step, Workflow } from "@mastra/core";
import { createVectorQueryTool, embed, MDocument, PgVector } from "@mastra/rag";
import { z } from "zod";
Workflow Definition
First, define the workflow with its trigger schema:
export const ragWorkflow = new Workflow({
name: "rag-workflow",
triggerSchema: z.object({
query: z.string(),
}),
});
Vector Query Tool Creation
Create a tool for querying the vector database:
const vectorQueryTool = createVectorQueryTool({
vectorStoreName: "pgVector",
indexName: "embeddings",
options: {
provider: "OPEN_AI",
model: "text-embedding-ada-002",
maxRetries: 3,
},
topK: 3,
});
Agent Configuration
Set up the Mastra agent:
export const ragAgent = new Agent({
name: "RAG Agent",
instructions: `You are a helpful assistant that answers questions based on the provided context.`,
model: {
provider: "OPEN_AI",
name: "gpt-4o-mini",
},
tools: {
vectorQueryTool,
},
});
Workflow Steps
The workflow is divided into multiple steps for chain-of-thought reasoning:
1. Context Analysis Step
const analyzeContext = new Step({
id: "analyzeContext",
outputSchema: z.object({
initialAnalysis: z.string(),
}),
execute: async ({ context, mastra }) => {
console.log("---------------------------");
const ragAgent = mastra?.agents?.ragAgent;
const query = context?.machineContext?.getStepPayload<{ query: string }>(
"trigger",
)?.query;
const analysisPrompt = `${query} 1. First, carefully analyze the retrieved context chunks and identify key information.`;
const analysis = await ragAgent?.generate(analysisPrompt);
console.log(analysis?.text);
return {
initialAnalysis: analysis?.text ?? "",
};
},
});
2. Thought Breakdown Step
const breakdownThoughts = new Step({
id: "breakdownThoughts",
outputSchema: z.object({
breakdown: z.string(),
}),
execute: async ({ context, mastra }) => {
console.log("---------------------------");
const ragAgent = mastra?.agents?.ragAgent;
const analysis = context?.machineContext?.getStepPayload<{
initialAnalysis: string;
}>("analyzeContext")?.initialAnalysis;
const connectionPrompt = `
Based on the initial analysis: ${analysis}
2. Break down your thinking process about how the retrieved information relates to the query.
`;
const connectionAnalysis = await ragAgent?.generate(connectionPrompt);
console.log(connectionAnalysis?.text);
return {
breakdown: connectionAnalysis?.text ?? "",
};
},
});
3. Connection Step
const connectPieces = new Step({
id: "connectPieces",
outputSchema: z.object({
connections: z.string(),
}),
execute: async ({ context, mastra }) => {
console.log("---------------------------");
const ragAgent = mastra?.agents?.ragAgent;
const process = context?.machineContext?.getStepPayload<{
breakdown: string;
}>("breakdownThoughts")?.breakdown;
const connectionPrompt = `
Based on the breakdown: ${process}
3. Explain how you're connecting different pieces from the retrieved chunks.
`;
const connections = await ragAgent?.generate(connectionPrompt);
console.log(connections?.text);
return {
connections: connections?.text ?? "",
};
},
});
4. Conclusion Step
const drawConclusions = new Step({
id: "drawConclusions",
outputSchema: z.object({
conclusions: z.string(),
}),
execute: async ({ context, mastra }) => {
console.log("---------------------------");
const ragAgent = mastra?.agents?.ragAgent;
const evidence = context?.machineContext?.getStepPayload<{
connections: string;
}>("connectPieces")?.connections;
const conclusionPrompt = `
Based on the connections: ${evidence}
4. Draw conclusions based only on the evidence in the retrieved context.
`;
const conclusions = await ragAgent?.generate(conclusionPrompt);
console.log(conclusions?.text);
return {
conclusions: conclusions?.text ?? "",
};
},
});
5. Final Answer Step
const finalAnswer = new Step({
id: "finalAnswer",
outputSchema: z.object({
finalAnswer: z.string(),
}),
execute: async ({ context, mastra }) => {
console.log("---------------------------");
const ragAgent = mastra?.agents?.ragAgent;
const conclusions = context?.machineContext?.getStepPayload<{
conclusions: string;
}>("drawConclusions")?.conclusions;
const answerPrompt = `
Based on the conclusions: ${conclusions}
Format your response as:
THOUGHT PROCESS:
- Step 1: [Initial analysis of retrieved chunks]
- Step 2: [Connections between chunks]
- Step 3: [Reasoning based on chunks]
FINAL ANSWER:
[Your concise answer based on the retrieved context]`;
const finalAnswer = await ragAgent?.generate(answerPrompt);
console.log(finalAnswer?.text);
return {
finalAnswer: finalAnswer?.text ?? "",
};
},
});
Workflow Configuration
Connect all the steps in the workflow:
ragWorkflow
.step(analyzeContext)
.then(breakdownThoughts)
.then(connectPieces)
.then(drawConclusions)
.then(finalAnswer);
ragWorkflow.commit();
Instantiate PgVector and Mastra
Instantiate PgVector and Mastra with all components:
const pgVector = new PgVector(process.env.POSTGRES_CONNECTION_STRING!);
export const mastra = new Mastra({
agents: { ragAgent },
vectors: { pgVector },
workflows: { ragWorkflow },
});
Document Processing
Process and chunk the document:
const doc = MDocument.fromText(`Your document text here...`);
const chunks = await doc.chunk({
strategy: "recursive",
size: 512,
overlap: 50,
separator: "\n",
});
Embedding Creation and Storage
Generate and store embeddings:
const { embeddings } = (await embed(chunks, {
provider: "OPEN_AI",
model: "text-embedding-ada-002",
maxRetries: 3,
})) as EmbedManyResult<string>;
const vectorStore = mastra.getVector("pgVector");
await vectorStore.createIndex("embeddings", 1536);
await vectorStore.upsert(
"embeddings",
embeddings,
chunks?.map((chunk: any) => ({ text: chunk.text })),
);
Response Generation
Function to generate responses using the workflow:
async function generateResponse(query: string) {
const prompt = `
Please answer the following question:
${query}
Please base your answer only on the context provided in the tool. If the context doesn't contain enough information to fully answer the question, please state that explicitly.
`;
const { runId, start } = ragWorkflow.createRun();
const workflowResult = await start({
triggerData: {
query: prompt,
},
});
return workflowResult;
}
Example Usage
const query = "What are the main benefits of telemedicine?";
console.log("\nQuery:", query);
const result = await generateResponse(query);
console.log("\nThought Process:");
console.log(result.results);