Skip to Content
RAG使用法ワークフローによる構造化推論

ワークフローによる構造化推論

この例では、Mastra、OpenAIの埋め込み、そしてベクトルストレージとしてPGVectorを使用し、定義されたワークフローを通じて構造化推論を重視したRetrieval-Augmented Generation(RAG)システムの実装方法を示します。

概要

このシステムは、定義されたワークフローを通じて、Mastra と OpenAI を用いた chain-of-thought プロンプトによる RAG を実装しています。主な機能は以下の通りです。

  1. 応答生成のために gpt-4o-mini を使った Mastra エージェントをセットアップ
  2. ベクトルストアとのやり取りを管理するベクトルクエリツールを作成
  3. chain-of-thought 推論のための複数ステップからなるワークフローを定義
  4. テキストドキュメントを処理し、チャンク化
  5. 埋め込みを作成し、PostgreSQL に保存
  6. ワークフローステップを通じて応答を生成

セットアップ

環境セットアップ

環境変数を必ず設定してください:

.env
OPENAI_API_KEY=your_openai_api_key_here POSTGRES_CONNECTION_STRING=your_connection_string_here

依存関係

必要な依存関係をインポートします:

index.ts
import { openai } from "@ai-sdk/openai"; import { Mastra } from "@mastra/core"; import { Agent } from "@mastra/core/agent"; import { Step, Workflow } from "@mastra/core/workflows"; import { PgVector } from "@mastra/pg"; import { createVectorQueryTool, MDocument } from "@mastra/rag"; import { embedMany } from "ai"; import { z } from "zod";

ワークフローの定義

まず、トリガースキーマとともにワークフローを定義します。

index.ts
export const ragWorkflow = new Workflow({ name: "rag-workflow", triggerSchema: z.object({ query: z.string(), }), });

ベクタークエリツールの作成

ベクターデータベースをクエリするためのツールを作成します:

index.ts
const vectorQueryTool = createVectorQueryTool({ vectorStoreName: "pgVector", indexName: "embeddings", model: openai.embedding("text-embedding-3-small"), });

エージェント設定

Mastraエージェントをセットアップします:

index.ts
export const ragAgent = new Agent({ name: "RAG Agent", instructions: `You are a helpful assistant that answers questions based on the provided context.`, model: openai("gpt-4o-mini"), tools: { vectorQueryTool, }, });

ワークフローステップ

このワークフローは、チェーン・オブ・ソート推論のために複数のステップに分かれています。

1. コンテキスト分析ステップ

index.ts
const analyzeContext = new Step({ id: "analyzeContext", outputSchema: z.object({ initialAnalysis: z.string(), }), execute: async ({ context, mastra }) => { console.log("---------------------------"); const ragAgent = mastra?.getAgent("ragAgent"); const query = context?.getStepResult<{ query: string }>("trigger")?.query; const analysisPrompt = `${query} 1. First, carefully analyze the retrieved context chunks and identify key information.`; const analysis = await ragAgent?.generate(analysisPrompt); console.log(analysis?.text); return { initialAnalysis: analysis?.text ?? "", }; }, });

2. 思考分解ステップ

index.ts
const breakdownThoughts = new Step({ id: "breakdownThoughts", outputSchema: z.object({ breakdown: z.string(), }), execute: async ({ context, mastra }) => { console.log("---------------------------"); const ragAgent = mastra?.getAgent("ragAgent"); const analysis = context?.getStepResult<{ initialAnalysis: string; }>("analyzeContext")?.initialAnalysis; const connectionPrompt = ` Based on the initial analysis: ${analysis} 2. Break down your thinking process about how the retrieved information relates to the query. `; const connectionAnalysis = await ragAgent?.generate(connectionPrompt); console.log(connectionAnalysis?.text); return { breakdown: connectionAnalysis?.text ?? "", }; }, });

3. 接続ステップ

index.ts
const connectPieces = new Step({ id: "connectPieces", outputSchema: z.object({ connections: z.string(), }), execute: async ({ context, mastra }) => { console.log("---------------------------"); const ragAgent = mastra?.getAgent("ragAgent"); const process = context?.getStepResult<{ breakdown: string; }>("breakdownThoughts")?.breakdown; const connectionPrompt = ` Based on the breakdown: ${process} 3. Explain how you're connecting different pieces from the retrieved chunks. `; const connections = await ragAgent?.generate(connectionPrompt); console.log(connections?.text); return { connections: connections?.text ?? "", }; }, });

4. 結論ステップ

index.ts
const drawConclusions = new Step({ id: "drawConclusions", outputSchema: z.object({ conclusions: z.string(), }), execute: async ({ context, mastra }) => { console.log("---------------------------"); const ragAgent = mastra?.getAgent("ragAgent"); const evidence = context?.getStepResult<{ connections: string; }>("connectPieces")?.connections; const conclusionPrompt = ` Based on the connections: ${evidence} 4. Draw conclusions based only on the evidence in the retrieved context. `; const conclusions = await ragAgent?.generate(conclusionPrompt); console.log(conclusions?.text); return { conclusions: conclusions?.text ?? "", }; }, });

5. 最終回答ステップ

index.ts
const finalAnswer = new Step({ id: "finalAnswer", outputSchema: z.object({ finalAnswer: z.string(), }), execute: async ({ context, mastra }) => { console.log("---------------------------"); const ragAgent = mastra?.getAgent("ragAgent"); const conclusions = context?.getStepResult<{ conclusions: string; }>("drawConclusions")?.conclusions; const answerPrompt = ` Based on the conclusions: ${conclusions} Format your response as: THOUGHT PROCESS: - Step 1: [Initial analysis of retrieved chunks] - Step 2: [Connections between chunks] - Step 3: [Reasoning based on chunks] FINAL ANSWER: [Your concise answer based on the retrieved context]`; const finalAnswer = await ragAgent?.generate(answerPrompt); console.log(finalAnswer?.text); return { finalAnswer: finalAnswer?.text ?? "", }; }, });

ワークフロー設定

ワークフロー内のすべてのステップを接続します:

index.ts
ragWorkflow .step(analyzeContext) .then(breakdownThoughts) .then(connectPieces) .then(drawConclusions) .then(finalAnswer); ragWorkflow.commit();

PgVectorとMastraのインスタンス化

PgVectorとMastraをすべてのコンポーネントでインスタンス化します:

index.ts
const pgVector = new PgVector({ connectionString: process.env.POSTGRES_CONNECTION_STRING! }); export const mastra = new Mastra({ agents: { ragAgent }, vectors: { pgVector }, workflows: { ragWorkflow }, });

ドキュメント処理

ドキュメントを処理し、チャンクに分割します:

index.ts
const doc = MDocument.fromText( `The Impact of Climate Change on Global Agriculture...`, ); const chunks = await doc.chunk({ strategy: "recursive", size: 512, overlap: 50, separator: "\n", });

埋め込みの作成と保存

埋め込みを生成して保存します:

index.ts
const { embeddings } = await embedMany({ model: openai.embedding("text-embedding-3-small"), values: chunks.map((chunk) => chunk.text), }); const vectorStore = mastra.getVector("pgVector"); await vectorStore.createIndex({ indexName: "embeddings", dimension: 1536, }); await vectorStore.upsert({ indexName: "embeddings", vectors: embeddings, metadata: chunks?.map((chunk: any) => ({ text: chunk.text })), });

ワークフローの実行

クエリを使ってワークフローを実行する方法は次のとおりです。

index.ts
const query = "What are the main adaptation strategies for farmers?"; console.log("\nQuery:", query); const prompt = ` Please answer the following question: ${query} Please base your answer only on the context provided in the tool. If the context doesn't contain enough information to fully answer the question, please state that explicitly. `; const { runId, start } = ragWorkflow.createRun(); console.log("Run:", runId); const workflowResult = await start({ triggerData: { query: prompt, }, }); console.log("\nThought Process:"); console.log(workflowResult.results);





GitHubで例を見る