ExamplesRAGUsageStructured Reasoning with Workflows

Structured Reasoning with Workflows

This example demonstrates how to implement a Retrieval-Augmented Generation (RAG) system using Mastra, OpenAI embeddings, and PGVector for vector storage, with an emphasis on structured reasoning through a defined workflow.

Overview

The system implements RAG using Mastra and OpenAI with chain-of-thought prompting through a defined workflow. Here’s what it does:

  1. Sets up a Mastra agent with gpt-4o-mini for response generation
  2. Creates a vector query tool to manage vector store interactions
  3. Defines a workflow with multiple steps for chain-of-thought reasoning
  4. Processes and chunks text documents
  5. Creates and stores embeddings in PostgreSQL
  6. Generates responses through the workflow steps

Setup

Environment Setup

Make sure to set up your environment variables:

.env
OPENAI_API_KEY=your_openai_api_key_here
POSTGRES_CONNECTION_STRING=your_connection_string_here

Dependencies

Import the necessary dependencies:

index.ts
import { openai } from "@ai-sdk/openai";
import { Mastra } from "@mastra/core";
import { Agent } from "@mastra/core/agent";
import { Step, Workflow } from "@mastra/core/workflows";
import { PgVector } from "@mastra/pg";
import { createVectorQueryTool, MDocument } from "@mastra/rag";
import { embedMany } from "ai";
import { z } from "zod";

Workflow Definition

First, define the workflow with its trigger schema:

index.ts
export const ragWorkflow = new Workflow({
  name: "rag-workflow",
  triggerSchema: z.object({
    query: z.string(),
  }),
});

Vector Query Tool Creation

Create a tool for querying the vector database:

index.ts
const vectorQueryTool = createVectorQueryTool({
  vectorStoreName: "pgVector",
  indexName: "embeddings",
  model: openai.embedding('text-embedding-3-small'),
});

Agent Configuration

Set up the Mastra agent:

index.ts
export const ragAgent = new Agent({
  name: "RAG Agent",
  instructions: `You are a helpful assistant that answers questions based on the provided context.`,
  model: openai("gpt-4o-mini"),
  tools: {
    vectorQueryTool,
  },
});

Workflow Steps

The workflow is divided into multiple steps for chain-of-thought reasoning:

1. Context Analysis Step

index.ts
const analyzeContext = new Step({
  id: "analyzeContext",
  outputSchema: z.object({
    initialAnalysis: z.string(),
  }),
  execute: async ({ context, mastra }) => {
    console.log("---------------------------");
    const ragAgent = mastra?.getAgent('ragAgent');
    const query = context?.getStepResult<{ query: string }>(
      "trigger",
    )?.query;
 
    const analysisPrompt = `${query} 1. First, carefully analyze the retrieved context chunks and identify key information.`;
 
    const analysis = await ragAgent?.generate(analysisPrompt);
    console.log(analysis?.text);
    return {
      initialAnalysis: analysis?.text ?? "",
    };
  },
});

2. Thought Breakdown Step

index.ts
const breakdownThoughts = new Step({
  id: "breakdownThoughts",
  outputSchema: z.object({
    breakdown: z.string(),
  }),
  execute: async ({ context, mastra }) => {
    console.log("---------------------------");
    const ragAgent = mastra?.getAgent('ragAgent');
    const analysis = context?.getStepResult<{
      initialAnalysis: string;
    }>("analyzeContext")?.initialAnalysis;
 
    const connectionPrompt = `
      Based on the initial analysis: ${analysis}
 
      2. Break down your thinking process about how the retrieved information relates to the query.
    `;
 
    const connectionAnalysis = await ragAgent?.generate(connectionPrompt);
    console.log(connectionAnalysis?.text);
    return {
      breakdown: connectionAnalysis?.text ?? "",
    };
  },
});

3. Connection Step

index.ts
const connectPieces = new Step({
  id: "connectPieces",
  outputSchema: z.object({
    connections: z.string(),
  }),
  execute: async ({ context, mastra }) => {
    console.log("---------------------------");
    const ragAgent = mastra?.getAgent('ragAgent');
    const process = context?.getStepResult<{
      breakdown: string;
    }>("breakdownThoughts")?.breakdown;
    const connectionPrompt = `
        Based on the breakdown: ${process}
 
        3. Explain how you're connecting different pieces from the retrieved chunks.
    `;
 
    const connections = await ragAgent?.generate(connectionPrompt);
    console.log(connections?.text);
    return {
      connections: connections?.text ?? "",
    };
  },
});

4. Conclusion Step

index.ts
const drawConclusions = new Step({
  id: "drawConclusions",
  outputSchema: z.object({
    conclusions: z.string(),
  }),
  execute: async ({ context, mastra }) => {
    console.log("---------------------------");
    const ragAgent = mastra?.getAgent('ragAgent');
    const evidence = context?.getStepResult<{
      connections: string;
    }>("connectPieces")?.connections;
    const conclusionPrompt = `
        Based on the connections: ${evidence}
 
        4. Draw conclusions based only on the evidence in the retrieved context.
    `;
 
    const conclusions = await ragAgent?.generate(conclusionPrompt);
    console.log(conclusions?.text);
    return {
      conclusions: conclusions?.text ?? "",
    };
  },
});

5. Final Answer Step

index.ts
const finalAnswer = new Step({
  id: "finalAnswer",
  outputSchema: z.object({
    finalAnswer: z.string(),
  }),
  execute: async ({ context, mastra }) => {
    console.log("---------------------------");
    const ragAgent = mastra?.getAgent('ragAgent');
    const conclusions = context?.getStepResult<{
      conclusions: string;
    }>("drawConclusions")?.conclusions;
    const answerPrompt = `
        Based on the conclusions: ${conclusions}
        Format your response as:
        THOUGHT PROCESS:
        - Step 1: [Initial analysis of retrieved chunks]
        - Step 2: [Connections between chunks]
        - Step 3: [Reasoning based on chunks]
 
        FINAL ANSWER:
        [Your concise answer based on the retrieved context]`;
 
    const finalAnswer = await ragAgent?.generate(answerPrompt);
    console.log(finalAnswer?.text);
    return {
      finalAnswer: finalAnswer?.text ?? "",
    };
  },
});

Workflow Configuration

Connect all the steps in the workflow:

index.ts
ragWorkflow
  .step(analyzeContext)
  .then(breakdownThoughts)
  .then(connectPieces)
  .then(drawConclusions)
  .then(finalAnswer);
 
ragWorkflow.commit();

Instantiate PgVector and Mastra

Instantiate PgVector and Mastra with all components:

index.ts
const pgVector = new PgVector(process.env.POSTGRES_CONNECTION_STRING!);
 
export const mastra = new Mastra({
  agents: { ragAgent },
  vectors: { pgVector },
  workflows: { ragWorkflow },
});

Document Processing

Process and chunks the document:

index.ts
const doc = MDocument.fromText(`The Impact of Climate Change on Global Agriculture...`);
 
const chunks = await doc.chunk({
  strategy: "recursive",
  size: 512,
  overlap: 50,
  separator: "\n",
});

Embedding Creation and Storage

Generate and store embeddings:

index.ts
const { embeddings } = await embedMany({
  model: openai.embedding('text-embedding-3-small'),
  values: chunks.map(chunk => chunk.text),
});
 
const vectorStore = mastra.getVector("pgVector");
await vectorStore.createIndex({
  indexName: "embeddings",
  dimension: 1536,
});
await vectorStore.upsert({
  indexName: "embeddings",
  vectors: embeddings,
  metadata: chunks?.map((chunk: any) => ({ text: chunk.text })),
});

Workflow Execution

Here’s how to execute the workflow with a query:

index.ts
const query = 'What are the main adaptation strategies for farmers?';
 
console.log('\nQuery:', query);
const prompt = `
    Please answer the following question:
    ${query}
 
    Please base your answer only on the context provided in the tool. If the context doesn't contain enough information to fully answer the question, please state that explicitly.
    `;
 
const { runId, start } = ragWorkflow.createRun();
 
console.log('Run:', runId);
 
const workflowResult = await start({
  triggerData: {
    query: prompt,
  },
});
console.log('\nThought Process:');
console.log(workflowResult.results);





View Example on GitHub