"I want to put a RAG system into production, but I can't solve the hallucination problem." "The LangChain tutorial worked, but the accuracy is too low to be practical." "The cost estimate is unclear, and I can't read the budget when scaling."
These are the challenges that development leads trying to integrate AI features into an existing SaaS, and engineers wanting to take a RAG system from PoC to production, face. I hit the same wall.
In this article, I publish the implementation of a production-level RAG system built with LangChain + Pinecone + FastAPI. Not a mere tutorial — I share the realities of production operation, all the way to the 5 hallucination countermeasures, quantitative methods of accuracy evaluation, and a cost estimate at 10,000 queries/month.
Premise: What Is a RAG System
The Basics of RAG (Retrieval-Augmented Generation)
RAG is a technique that searches and injects external knowledge into a large language model (LLM) to raise answer accuracy.
The limits of a conventional LLM (ChatGPT, etc.):
- Doesn't know information after the training-data date
- Can't access a company's internal documents
- Hallucinations (answers contrary to fact) occur
The solution via RAG:
- Search a vector DB for documents relevant to the user's question (Retrieval)
- Inject the search results into the prompt and send to the LLM (Augmentation)
- The LLM generates an accurate answer based on the search results (Generation)
The Overall System Architecture
┌─────────────┐
│ User │
└──────┬──────┘
│ HTTP Request
▼
┌──────────────────────────┐
│ FastAPI (API gateway) │
│ - Input validation │
│ - Authentication/authz │
│ - Rate limiting │
└──────┬───────────────────┘
│
▼
┌──────────────────────────┐
│ LangChain │
│ - Query embedding │
│ - Pinecone search │
│ - Prompt construction │
│ - OpenAI API call │
└──────┬───────────────────┘
│
├─────────────┐
▼ ▼
┌────────────┐ ┌──────────────┐
│ Pinecone │ │ OpenAI API │
│ Vector DB │ │ (GPT-4) │
│ - Doc search│ │ - Answer gen │
└────────────┘ └──────────────┘
The Tech Stack
| Layer | Technology | Role |
|---|---|---|
| API layer | FastAPI 0.109+ | Provide REST API, validation |
| Orchestration | LangChain 0.1+ | RAG-flow control, prompt management |
| Vector DB | Pinecone | Embedding-vector search |
| LLM | OpenAI GPT-4 Turbo | Answer generation |
| Embedding model | text-embedding-3-small | Vectorization (1536 dimensions) |
| Deploy | AWS ECS Fargate | Container execution environment |
The 5 Hallucination Countermeasures
Countermeasure ①: Forcing Source Citation
Problem: the LLM ignores the search results and generates an answer from its training data.
Solution: state explicitly in the prompt, "answer only from the search results."
from langchain.prompts import PromptTemplate
# ハルシネーション防止プロンプト
PROMPT_TEMPLATE = """あなたは正確な情報提供を重視するAIアシスタントです。
以下の検索結果のみを使って、ユーザーの質問に回答してください。
検索結果に含まれない情報は「情報が見つかりませんでした」と回答してください。
推測や憶測は絶対に含めないでください。
検索結果:
{context}
ユーザーの質問: {question}
回答の形式:
1. 回答内容(検索結果から引用)
2. 引用元(ソースのファイル名とページ番号)
回答:"""
prompt = PromptTemplate(
template=PROMPT_TEMPLATE,
input_variables=["context", "question"]
)
Effect: hallucination rate 40% → 10% (measured on in-house test data)
Countermeasure ②: Attaching a Confidence Score
Problem: even when the LLM answers without confidence, the user can't tell.
Solution: attach a confidence score (0.0–1.0) to the answer.
from langchain.llms import OpenAI
from langchain.chains import LLMChain
from pydantic import BaseModel, Field
class AnswerWithConfidence(BaseModel):
"""回答と信頼度スコア"""
answer: str = Field(description="ユーザーへの回答")
confidence: float = Field(
description="信頼度スコア(0.0〜1.0)。検索結果に明確な回答がある場合1.0、推測の場合0.5以下",
ge=0.0,
le=1.0
)
sources: list[str] = Field(description="引用元のドキュメントID")
# OpenAI Function Callingで構造化出力
from langchain.output_parsers import PydanticOutputParser
parser = PydanticOutputParser(pydantic_object=AnswerWithConfidence)
# プロンプトに指示を追加
format_instructions = parser.get_format_instructions()
Use examples:
confidence < 0.7: show a warning "this answer may contain speculation"confidence < 0.5: escalate to a human operator
Countermeasure ③: Improving Retrieval Accuracy (Chunk-Size Optimization)
Problem: the document's splitting method is inappropriate, and context is lost.
Solution: optimize chunk size and overlap.
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 最適なチャンク設定(実験で決定)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512, # トークン数(経験的に512が最適)
chunk_overlap=50, # オーバーラップ(文脈保持)
length_function=len,
separators=["\n\n", "\n", "。", ".", " ", ""] # 日本語対応
)
# 文書の分割
chunks = text_splitter.split_text(document_text)
# Pineconeへのアップロード
from langchain.vectorstores import Pinecone
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Pinecone.from_texts(
texts=chunks,
embedding=embeddings,
index_name="my-knowledge-base",
namespace="product-docs"
)
Experimental results of parameter selection:
| Chunk size | Recall@5 | Response time |
|---|---|---|
| 256 | 0.65 | 1.2s |
| 512 | 0.82 | 1.5s |
| 1024 | 0.78 | 2.1s |
| 2048 | 0.71 | 3.5s |
Conclusion: chunk_size=512 is the optimal balance of accuracy and speed
Countermeasure ④: Metadata Filtering
Problem: irrelevant documents mix into the search results (e.g., an old version of a document).
Solution: narrow down with Pinecone's metadata filter.
from datetime import datetime
# 文書のメタデータ付きアップロード
metadata = {
"source": "product_manual_v2.pdf",
"page": 42,
"version": "2.0",
"category": "API仕様",
"last_updated": "2024-12-01"
}
vectorstore.add_texts(
texts=[chunk_text],
metadatas=[metadata]
)
# 検索時のフィルタリング
from langchain.vectorstores import Pinecone
vectorstore = Pinecone.from_existing_index(
index_name="my-knowledge-base",
embedding=embeddings,
namespace="product-docs"
)
# 最新バージョンのみ検索
results = vectorstore.similarity_search(
query="APIの認証方法は?",
k=5,
filter={
"version": {"$eq": "2.0"},
"category": {"$eq": "API仕様"}
}
)
Effect: contamination rate of irrelevant results 30% → 5%
Countermeasure ⑤: The Human-Review Feedback Loop
Problem: even if you detect a hallucination, there's no automatic improvement.
Solution: reflect human feedback into the system.
from pydantic import BaseModel
from datetime import datetime
class UserFeedback(BaseModel):
"""ユーザーフィードバック"""
query_id: str
is_helpful: bool
is_accurate: bool
comment: str | None = None
timestamp: datetime
# フィードバック収集API
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.post("/api/feedback")
async def submit_feedback(feedback: UserFeedback):
# フィードバックをDBに保存
await save_feedback_to_db(feedback)
# 精度が低い回答を検出
if not feedback.is_accurate:
# アラート送信(Slack等)
await send_alert_to_team(feedback)
# 該当する文書の再確認フラグ
await flag_document_for_review(feedback.query_id)
return {"status": "success"}
# 週次での精度分析
async def analyze_feedback_trends():
feedbacks = await get_feedbacks_last_week()
accuracy_rate = sum(f.is_accurate for f in feedbacks) / len(feedbacks)
if accuracy_rate < 0.85:
# 精度低下アラート
await send_weekly_report(accuracy_rate)
Operational flow:
- Place 👍/👎 buttons on answers
- A human reviews inaccurate answers
- Improve the relevant document or adjust the chunk splitting
- Monitor accuracy trends with a weekly report
Pinecone Index Design
The Namespace-Separation Strategy
In Pinecone, you can use namespaces within a single index to logically separate data.
import pinecone
# Pinecone初期化
pinecone.init(
api_key="your-api-key",
environment="us-west1-gcp"
)
# インデックス作成(初回のみ)
pinecone.create_index(
name="my-knowledge-base",
dimension=1536, # text-embedding-3-smallの次元数
metric="cosine" # コサイン類似度
)
# ネームスペース別のデータ投入
namespaces = {
"product-docs": "製品ドキュメント",
"api-reference": "API仕様書",
"faq": "よくある質問",
"internal-kb": "社内ナレッジベース(社員のみ)"
}
# 例:製品ドキュメントの投入
vectorstore = Pinecone.from_texts(
texts=product_doc_chunks,
embedding=embeddings,
index_name="my-knowledge-base",
namespace="product-docs"
)
Multi-Tenant Support
When separating data per customer in a B2B SaaS:
# 顧客ごとのネームスペース
customer_namespace = f"customer-{customer_id}"
vectorstore = Pinecone.from_existing_index(
index_name="my-knowledge-base",
embedding=embeddings,
namespace=customer_namespace
)
# 検索時も同じネームスペースを指定
results = vectorstore.similarity_search(
query=user_query,
k=5,
namespace=customer_namespace # データ漏洩防止
)
Quantitative Methods of Accuracy Evaluation
Evaluation Metrics
| Metric | Definition | Target value |
|---|---|---|
| Recall@5 | The proportion where a correct answer is in the top 5 | > 0.85 |
| Precision@5 | The proportion of relevant documents among the top 5 | > 0.70 |
| MRR (Mean Reciprocal Rank) | The mean of the reciprocal of the correct answer's rank | > 0.75 |
| Accuracy | The answer's correctness (human evaluation) | > 0.90 |
The Evaluation Script
from typing import List, Tuple
import numpy as np
class RAGEvaluator:
"""RAGシステムの精度評価"""
def __init__(self, test_data: List[Tuple[str, List[str]]]):
"""
Args:
test_data: [(質問, 正解ドキュメントIDのリスト), ...]
"""
self.test_data = test_data
def calculate_recall_at_k(
self,
retrieved_docs: List[str],
ground_truth: List[str],
k: int = 5
) -> float:
"""Recall@K を計算"""
top_k = set(retrieved_docs[:k])
relevant = set(ground_truth)
if not relevant:
return 0.0
return len(top_k & relevant) / len(relevant)
def calculate_precision_at_k(
self,
retrieved_docs: List[str],
ground_truth: List[str],
k: int = 5
) -> float:
"""Precision@K を計算"""
top_k = set(retrieved_docs[:k])
relevant = set(ground_truth)
if not top_k:
return 0.0
return len(top_k & relevant) / k
def calculate_mrr(
self,
retrieved_docs: List[str],
ground_truth: List[str]
) -> float:
"""Mean Reciprocal Rank を計算"""
for rank, doc_id in enumerate(retrieved_docs, 1):
if doc_id in ground_truth:
return 1.0 / rank
return 0.0
def evaluate(self, rag_system) -> dict:
"""RAGシステム全体を評価"""
recalls = []
precisions = []
mrrs = []
for question, ground_truth in self.test_data:
# RAGシステムで検索
retrieved = rag_system.retrieve(question, k=10)
retrieved_ids = [doc.metadata["id"] for doc in retrieved]
# 指標計算
recalls.append(self.calculate_recall_at_k(retrieved_ids, ground_truth, k=5))
precisions.append(self.calculate_precision_at_k(retrieved_ids, ground_truth, k=5))
mrrs.append(self.calculate_mrr(retrieved_ids, ground_truth))
return {
"recall@5": np.mean(recalls),
"precision@5": np.mean(precisions),
"mrr": np.mean(mrrs)
}
# 使用例
test_data = [
("APIキーの取得方法は?", ["doc_123", "doc_456"]),
("料金プランの違いは?", ["doc_789"]),
# ... 100件以上のテストケース
]
evaluator = RAGEvaluator(test_data)
metrics = evaluator.evaluate(my_rag_system)
print(f"Recall@5: {metrics['recall@5']:.2f}")
print(f"Precision@5: {metrics['precision@5']:.2f}")
print(f"MRR: {metrics['mrr']:.2f}")
Evaluation Results in a Real Project
Before improvement:
- Recall@5: 0.68
- Precision@5: 0.52
- MRR: 0.61
After improvement (countermeasures ①–④ applied):
- Recall@5: 0.87 (+28%)
- Precision@5: 0.78 (+50%)
- MRR: 0.81 (+33%)
Cost Estimate: A Real Example at 10,000 Queries/Month
OpenAI API Cost
[Assumptions]
- Queries: 10,000/month
- Average prompt length: 2,000 tokens (5 search results + question)
- Average response length: 500 tokens
- Model: GPT-4 Turbo
[Calculation]
Input: 10,000 queries × 2,000 tokens × $0.01/1K = $200
Output: 10,000 queries × 500 tokens × $0.03/1K = $150
Total: $350/month
Pinecone Cost
[Assumptions]
- Number of indexes: 1
- Number of vectors: 100,000
- Plan: Starter ($70/month for 100K vectors)
[Calculation]
Base fee: $70/month
Embedding-Generation Cost (OpenAI)
[Assumptions]
- New documents added: 1,000/month
- Average document length: 1,000 tokens
- Model: text-embedding-3-small
[Calculation]
1,000 docs × 1,000 tokens × $0.00002/1K = $0.02/month
(virtually negligible)
Infrastructure Cost (AWS ECS Fargate)
[Assumptions]
- Fargate Task: 0.5 vCPU, 1GB RAM
- Uptime: 24 hours × 30 days
[Calculation]
Fargate: $35/month
ALB: $23/month
CloudWatch Logs: $3/month
Total: $61/month
Total Cost
OpenAI API (GPT-4 Turbo): $350/month
Pinecone: $70/month
OpenAI Embeddings: $0.02/month (negligible)
AWS infrastructure: $61/month
────────────────────────
Total: about $481/month (about ¥68,000)
Cost-Reduction Strategies
Strategy ①: Partial Downgrade to GPT-3.5 Turbo
# 簡単な質問はGPT-3.5、複雑な質問はGPT-4
def select_model(query: str, context: str) -> str:
# トークン数で判定
total_tokens = len(query.split()) + len(context.split())
if total_tokens < 500:
return "gpt-3.5-turbo" # $0.001/1K (入力)
else:
return "gpt-4-turbo"
Effect: route 50% of queries to GPT-3.5 → 30% cost reduction ($350 → $245)
Strategy ②: Caching
import hashlib
from functools import lru_cache
class RAGCache:
"""よくある質問をキャッシュ"""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1時間
def get_cache_key(self, query: str) -> str:
return f"rag_cache:{hashlib.md5(query.encode()).hexdigest()}"
async def get(self, query: str) -> str | None:
key = self.get_cache_key(query)
return await self.redis.get(key)
async def set(self, query: str, answer: str):
key = self.get_cache_key(query)
await self.redis.setex(key, self.ttl, answer)
# 使用例
cache = RAGCache(redis_client)
async def rag_with_cache(query: str) -> str:
# キャッシュ確認
cached = await cache.get(query)
if cached:
return cached
# RAG実行
answer = await rag_system.query(query)
# キャッシュ保存
await cache.set(query, answer)
return answer
Effect: cache hit rate 30% → 30% reduction in API calls ($350 → $245)
Production-Operation Pitfalls and Error Handling
Pitfall ①: OpenAI API Rate Limits
Problem: a sudden traffic surge hits the API-call limit.
Solution: Exponential Backoff + Retry
import asyncio
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from openai import RateLimitError
@retry(
retry=retry_if_exception_type(RateLimitError),
wait=wait_exponential(multiplier=1, min=4, max=60),
stop=stop_after_attempt(5)
)
async def call_openai_with_retry(prompt: str) -> str:
"""OpenAI API呼び出し(リトライ付き)"""
response = await openai_client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=500
)
return response.choices[0].message.content
Pitfall ②: Pinecone Search Timeout
Problem: a timeout (>10s) on a large vector search.
Solution: a timeout setting + fallback
import asyncio
async def search_with_timeout(
vectorstore,
query: str,
k: int = 5,
timeout: float = 5.0
) -> List[Document]:
"""タイムアウト付き検索"""
try:
result = await asyncio.wait_for(
vectorstore.asimilarity_search(query, k=k),
timeout=timeout
)
return result
except asyncio.TimeoutError:
# フォールバック:キーワード検索
return await fallback_keyword_search(query, k=k)
Pitfall ③: Memory Leaks
Problem: LangChain objects keep lingering in memory.
Solution: explicit cleanup
from contextlib import asynccontextmanager
@asynccontextmanager
async def rag_session():
"""RAGセッション管理(リソース自動解放)"""
# 初期化
vectorstore = Pinecone.from_existing_index(...)
llm = ChatOpenAI(...)
try:
yield vectorstore, llm
finally:
# クリーンアップ
del vectorstore
del llm
import gc
gc.collect()
# 使用例
async def query_rag(user_query: str) -> str:
async with rag_session() as (vectorstore, llm):
# RAG処理
results = await vectorstore.asimilarity_search(user_query)
# ...
# 自動的にリソース解放
FastAPI Integration: Implementing the Production API
from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import AsyncGenerator
import asyncio
app = FastAPI(title="Production RAG API")
class QueryRequest(BaseModel):
query: str = Field(..., min_length=1, max_length=500)
namespace: str = Field(default="product-docs")
max_results: int = Field(default=5, ge=1, le=10)
class QueryResponse(BaseModel):
answer: str
confidence: float
sources: list[dict]
processing_time_ms: float
# RAGシステムの初期化(起動時)
@app.on_event("startup")
async def startup_event():
global rag_system
rag_system = initialize_rag_system()
@app.post("/api/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
"""RAGクエリエンドポイント"""
import time
start_time = time.time()
try:
# RAG実行
result = await rag_system.query(
query=request.query,
namespace=request.namespace,
k=request.max_results
)
processing_time = (time.time() - start_time) * 1000
return QueryResponse(
answer=result.answer,
confidence=result.confidence,
sources=[
{
"id": src.id,
"title": src.metadata.get("title"),
"page": src.metadata.get("page"),
"score": src.score
}
for src in result.sources
],
processing_time_ms=processing_time
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ストリーミングレスポンス(リアルタイム回答)
@app.post("/api/query/stream")
async def query_stream_endpoint(request: QueryRequest):
"""ストリーミングRAGクエリ"""
async def generate() -> AsyncGenerator[str, None]:
async for chunk in rag_system.query_stream(request.query):
yield f"data: {chunk}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
# ヘルスチェック
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "rag-api"}
Summary: Success Factors of a Production RAG
Technical Points
- Hallucination countermeasures are defense-in-depth: prompt + score + human review
- Accuracy evaluation must be quantified: continuous monitoring with Recall/Precision/MRR
- Cost is optimized by operational design: caching + model selection + scaling
Operational Points
- The feedback loop is the lifeblood: reflect human evaluation weekly
- Error handling to an almost-excessive degree: retry + timeout + fallback
- Staged release: internal-only → beta → full release
Real-World Data (My Project)
- Accuracy: Recall@5 0.87, Accuracy 0.92 (human evaluation)
- Response time: average 1.8s (95th percentile 3.2s)
- Cost: about ¥70,000/month at 10,000 queries/month (within budget)
- Availability: 99.8% (monthly downtime < 1 hour)
Next Steps
If you're struggling with RAG-system implementation, feel free to reach out. I'll bring the practical know-how of LangChain/Pinecone/OpenAI to your project.