# Building a Production RAG System with LangChain + Pinecone: Hallucination Countermeasures and Accuracy Improvement in Practice

> A guide to building a RAG system at production-operation level, not a verification environment. The 5 hallucination countermeasures, accuracy-evaluation methods, and cost-optimization strategies implemented with LangChain + Pinecone + FastAPI, explained with real code.

- Published: 2025-01-10
- Author: 友田 陽大
- Tags: AI, RAG, LangChain, Pinecone, OpenAI, Python, FastAPI, 機械学習, NLP
- URL: https://tomodahinata.com/en/blog/langchain-pinecone-production-rag-system
- Category: Generative AI, LLMs & RAG
- Pillar guide: https://tomodahinata.com/en/blog/vercel-ai-sdk-production-llm-apps-streaming-tools-rag

## Key points

- Hallucination countermeasures are defense-in-depth — layer forced source citation, confidence scores, metadata filters, and human review
- Retrieval accuracy hinges on chunk-size optimization; in experiments, chunk_size=512 was the optimal balance of accuracy and speed
- Quantitatively evaluate RAG accuracy with Recall@5, Precision@5, and MRR, and continuously monitor
- Optimize cost with model selection (partial downgrade to GPT-3.5) and caching
- Logically separate with Pinecone namespaces, and specify the same namespace at search time too to prevent data leakage

---

"I want to put a RAG system into production, but I can't solve the hallucination problem."
"The LangChain tutorial worked, but the accuracy is too low to be practical."
"The cost estimate is unclear, and I can't read the budget when scaling."

These are the challenges that development leads trying to integrate AI features into an existing SaaS, and engineers wanting to take a RAG system from PoC to production, face. I hit the same wall.

In this article, I publish the implementation of a production-level RAG system built with **LangChain + Pinecone + FastAPI**. Not a mere tutorial — I share the realities of production operation, all the way to **the 5 hallucination countermeasures**, **quantitative methods of accuracy evaluation**, and **a cost estimate at 10,000 queries/month**.

---

## Premise: What Is a RAG System

### The Basics of RAG (Retrieval-Augmented Generation)

RAG is a **technique that searches and injects external knowledge into a large language model (LLM) to raise answer accuracy**.

**The limits of a conventional LLM (ChatGPT, etc.)**:
- Doesn't know information after the training-data date
- Can't access a company's internal documents
- Hallucinations (answers contrary to fact) occur

**The solution via RAG**:
1. **Search a vector DB** for documents relevant to the user's question (Retrieval)
2. **Inject** the search results into the prompt and send to the LLM (Augmentation)
3. The LLM **generates an accurate answer** based on the search results (Generation)

---

## The Overall System Architecture

```
┌─────────────┐
│ User        │
└──────┬──────┘
       │ HTTP Request
       ▼
┌──────────────────────────┐
│ FastAPI (API gateway)     │
│ - Input validation        │
│ - Authentication/authz    │
│ - Rate limiting           │
└──────┬───────────────────┘
       │
       ▼
┌──────────────────────────┐
│ LangChain                 │
│ - Query embedding         │
│ - Pinecone search         │
│ - Prompt construction     │
│ - OpenAI API call         │
└──────┬───────────────────┘
       │
       ├─────────────┐
       ▼             ▼
┌────────────┐  ┌──────────────┐
│ Pinecone   │  │ OpenAI API   │
│ Vector DB  │  │ (GPT-4)      │
│ - Doc search│ │ - Answer gen │
└────────────┘  └──────────────┘
```

### The Tech Stack

| Layer | Technology | Role |
|---------|------|------|
| **API layer** | FastAPI 0.109+ | Provide REST API, validation |
| **Orchestration** | LangChain 0.1+ | RAG-flow control, prompt management |
| **Vector DB** | Pinecone | Embedding-vector search |
| **LLM** | OpenAI GPT-4 Turbo | Answer generation |
| **Embedding model** | text-embedding-3-small | Vectorization (1536 dimensions) |
| **Deploy** | AWS ECS Fargate | Container execution environment |

---

## The 5 Hallucination Countermeasures

### Countermeasure ①: Forcing Source Citation

**Problem**: the LLM ignores the search results and generates an answer from its training data.

**Solution**: state explicitly in the prompt, "answer only from the search results."

```python
from langchain.prompts import PromptTemplate

# ハルシネーション防止プロンプト
PROMPT_TEMPLATE = """あなたは正確な情報提供を重視するAIアシスタントです。

以下の検索結果のみを使って、ユーザーの質問に回答してください。
検索結果に含まれない情報は「情報が見つかりませんでした」と回答してください。
推測や憶測は絶対に含めないでください。

検索結果:
{context}

ユーザーの質問: {question}

回答の形式:
1. 回答内容（検索結果から引用）
2. 引用元（ソースのファイル名とページ番号）

回答:"""

prompt = PromptTemplate(
    template=PROMPT_TEMPLATE,
    input_variables=["context", "question"]
)
```

**Effect**: hallucination rate 40% → 10% (measured on in-house test data)

---

### Countermeasure ②: Attaching a Confidence Score

**Problem**: even when the LLM answers without confidence, the user can't tell.

**Solution**: attach a confidence score (0.0–1.0) to the answer.

```python
from langchain.llms import OpenAI
from langchain.chains import LLMChain
from pydantic import BaseModel, Field

class AnswerWithConfidence(BaseModel):
    """回答と信頼度スコア"""
    answer: str = Field(description="ユーザーへの回答")
    confidence: float = Field(
        description="信頼度スコア（0.0〜1.0）。検索結果に明確な回答がある場合1.0、推測の場合0.5以下",
        ge=0.0,
        le=1.0
    )
    sources: list[str] = Field(description="引用元のドキュメントID")

# OpenAI Function Callingで構造化出力
from langchain.output_parsers import PydanticOutputParser

parser = PydanticOutputParser(pydantic_object=AnswerWithConfidence)

# プロンプトに指示を追加
format_instructions = parser.get_format_instructions()
```

**Use examples**:
- `confidence < 0.7`: show a warning "this answer may contain speculation"
- `confidence < 0.5`: escalate to a human operator

---

### Countermeasure ③: Improving Retrieval Accuracy (Chunk-Size Optimization)

**Problem**: the document's splitting method is inappropriate, and context is lost.

**Solution**: optimize chunk size and overlap.

```python
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 最適なチャンク設定（実験で決定）
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,        # トークン数（経験的に512が最適）
    chunk_overlap=50,      # オーバーラップ（文脈保持）
    length_function=len,
    separators=["\n\n", "\n", "。", ".", " ", ""]  # 日本語対応
)

# 文書の分割
chunks = text_splitter.split_text(document_text)

# Pineconeへのアップロード
from langchain.vectorstores import Pinecone
from langchain.embeddings import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

vectorstore = Pinecone.from_texts(
    texts=chunks,
    embedding=embeddings,
    index_name="my-knowledge-base",
    namespace="product-docs"
)
```

**Experimental results of parameter selection**:

| Chunk size | Recall@5 | Response time |
|--------------|---------|---------|
| 256 | 0.65 | 1.2s |
| **512** | **0.82** | **1.5s** |
| 1024 | 0.78 | 2.1s |
| 2048 | 0.71 | 3.5s |

**Conclusion**: chunk_size=512 is the optimal balance of accuracy and speed

---

### Countermeasure ④: Metadata Filtering

**Problem**: irrelevant documents mix into the search results (e.g., an old version of a document).

**Solution**: narrow down with Pinecone's metadata filter.

```python
from datetime import datetime

# 文書のメタデータ付きアップロード
metadata = {
    "source": "product_manual_v2.pdf",
    "page": 42,
    "version": "2.0",
    "category": "API仕様",
    "last_updated": "2024-12-01"
}

vectorstore.add_texts(
    texts=[chunk_text],
    metadatas=[metadata]
)

# 検索時のフィルタリング
from langchain.vectorstores import Pinecone

vectorstore = Pinecone.from_existing_index(
    index_name="my-knowledge-base",
    embedding=embeddings,
    namespace="product-docs"
)

# 最新バージョンのみ検索
results = vectorstore.similarity_search(
    query="APIの認証方法は？",
    k=5,
    filter={
        "version": {"$eq": "2.0"},
        "category": {"$eq": "API仕様"}
    }
)
```

**Effect**: contamination rate of irrelevant results 30% → 5%

---

### Countermeasure ⑤: The Human-Review Feedback Loop

**Problem**: even if you detect a hallucination, there's no automatic improvement.

**Solution**: reflect human feedback into the system.

```python
from pydantic import BaseModel
from datetime import datetime

class UserFeedback(BaseModel):
    """ユーザーフィードバック"""
    query_id: str
    is_helpful: bool
    is_accurate: bool
    comment: str | None = None
    timestamp: datetime

# フィードバック収集API
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.post("/api/feedback")
async def submit_feedback(feedback: UserFeedback):
    # フィードバックをDBに保存
    await save_feedback_to_db(feedback)

    # 精度が低い回答を検出
    if not feedback.is_accurate:
        # アラート送信（Slack等）
        await send_alert_to_team(feedback)

        # 該当する文書の再確認フラグ
        await flag_document_for_review(feedback.query_id)

    return {"status": "success"}

# 週次での精度分析
async def analyze_feedback_trends():
    feedbacks = await get_feedbacks_last_week()

    accuracy_rate = sum(f.is_accurate for f in feedbacks) / len(feedbacks)

    if accuracy_rate < 0.85:
        # 精度低下アラート
        await send_weekly_report(accuracy_rate)
```

**Operational flow**:
1. Place 👍/👎 buttons on answers
2. A human reviews inaccurate answers
3. Improve the relevant document or adjust the chunk splitting
4. Monitor accuracy trends with a weekly report

---

## Pinecone Index Design

### The Namespace-Separation Strategy

In Pinecone, you can use **namespaces** within a single index to logically separate data.

```python
import pinecone

# Pinecone初期化
pinecone.init(
    api_key="your-api-key",
    environment="us-west1-gcp"
)

# インデックス作成（初回のみ）
pinecone.create_index(
    name="my-knowledge-base",
    dimension=1536,  # text-embedding-3-smallの次元数
    metric="cosine"   # コサイン類似度
)

# ネームスペース別のデータ投入
namespaces = {
    "product-docs": "製品ドキュメント",
    "api-reference": "API仕様書",
    "faq": "よくある質問",
    "internal-kb": "社内ナレッジベース（社員のみ）"
}

# 例：製品ドキュメントの投入
vectorstore = Pinecone.from_texts(
    texts=product_doc_chunks,
    embedding=embeddings,
    index_name="my-knowledge-base",
    namespace="product-docs"
)
```

### Multi-Tenant Support

When separating data per customer in a B2B SaaS:

```python
# 顧客ごとのネームスペース
customer_namespace = f"customer-{customer_id}"

vectorstore = Pinecone.from_existing_index(
    index_name="my-knowledge-base",
    embedding=embeddings,
    namespace=customer_namespace
)

# 検索時も同じネームスペースを指定
results = vectorstore.similarity_search(
    query=user_query,
    k=5,
    namespace=customer_namespace  # データ漏洩防止
)
```

---

## Quantitative Methods of Accuracy Evaluation

### Evaluation Metrics

| Metric | Definition | Target value |
|-----|------|-------|
| **Recall@5** | The proportion where a correct answer is in the top 5 | > 0.85 |
| **Precision@5** | The proportion of relevant documents among the top 5 | > 0.70 |
| **MRR** (Mean Reciprocal Rank) | The mean of the reciprocal of the correct answer's rank | > 0.75 |
| **Accuracy** | The answer's correctness (human evaluation) | > 0.90 |

### The Evaluation Script

```python
from typing import List, Tuple
import numpy as np

class RAGEvaluator:
    """RAGシステムの精度評価"""

    def __init__(self, test_data: List[Tuple[str, List[str]]]):
        """
        Args:
            test_data: [(質問, 正解ドキュメントIDのリスト), ...]
        """
        self.test_data = test_data

    def calculate_recall_at_k(
        self,
        retrieved_docs: List[str],
        ground_truth: List[str],
        k: int = 5
    ) -> float:
        """Recall@K を計算"""
        top_k = set(retrieved_docs[:k])
        relevant = set(ground_truth)

        if not relevant:
            return 0.0

        return len(top_k & relevant) / len(relevant)

    def calculate_precision_at_k(
        self,
        retrieved_docs: List[str],
        ground_truth: List[str],
        k: int = 5
    ) -> float:
        """Precision@K を計算"""
        top_k = set(retrieved_docs[:k])
        relevant = set(ground_truth)

        if not top_k:
            return 0.0

        return len(top_k & relevant) / k

    def calculate_mrr(
        self,
        retrieved_docs: List[str],
        ground_truth: List[str]
    ) -> float:
        """Mean Reciprocal Rank を計算"""
        for rank, doc_id in enumerate(retrieved_docs, 1):
            if doc_id in ground_truth:
                return 1.0 / rank
        return 0.0

    def evaluate(self, rag_system) -> dict:
        """RAGシステム全体を評価"""
        recalls = []
        precisions = []
        mrrs = []

        for question, ground_truth in self.test_data:
            # RAGシステムで検索
            retrieved = rag_system.retrieve(question, k=10)
            retrieved_ids = [doc.metadata["id"] for doc in retrieved]

            # 指標計算
            recalls.append(self.calculate_recall_at_k(retrieved_ids, ground_truth, k=5))
            precisions.append(self.calculate_precision_at_k(retrieved_ids, ground_truth, k=5))
            mrrs.append(self.calculate_mrr(retrieved_ids, ground_truth))

        return {
            "recall@5": np.mean(recalls),
            "precision@5": np.mean(precisions),
            "mrr": np.mean(mrrs)
        }

# 使用例
test_data = [
    ("APIキーの取得方法は？", ["doc_123", "doc_456"]),
    ("料金プランの違いは？", ["doc_789"]),
    # ... 100件以上のテストケース
]

evaluator = RAGEvaluator(test_data)
metrics = evaluator.evaluate(my_rag_system)

print(f"Recall@5: {metrics['recall@5']:.2f}")
print(f"Precision@5: {metrics['precision@5']:.2f}")
print(f"MRR: {metrics['mrr']:.2f}")
```

### Evaluation Results in a Real Project

**Before improvement**:
- Recall@5: 0.68
- Precision@5: 0.52
- MRR: 0.61

**After improvement (countermeasures ①–④ applied)**:
- Recall@5: **0.87** (+28%)
- Precision@5: **0.78** (+50%)
- MRR: **0.81** (+33%)

---

## Cost Estimate: A Real Example at 10,000 Queries/Month

### OpenAI API Cost

```
[Assumptions]
- Queries: 10,000/month
- Average prompt length: 2,000 tokens (5 search results + question)
- Average response length: 500 tokens
- Model: GPT-4 Turbo

[Calculation]
Input:  10,000 queries × 2,000 tokens × $0.01/1K = $200
Output: 10,000 queries × 500 tokens × $0.03/1K = $150
Total: $350/month
```

### Pinecone Cost

```
[Assumptions]
- Number of indexes: 1
- Number of vectors: 100,000
- Plan: Starter ($70/month for 100K vectors)

[Calculation]
Base fee: $70/month
```

### Embedding-Generation Cost (OpenAI)

```
[Assumptions]
- New documents added: 1,000/month
- Average document length: 1,000 tokens
- Model: text-embedding-3-small

[Calculation]
1,000 docs × 1,000 tokens × $0.00002/1K = $0.02/month
(virtually negligible)
```

### Infrastructure Cost (AWS ECS Fargate)

```
[Assumptions]
- Fargate Task: 0.5 vCPU, 1GB RAM
- Uptime: 24 hours × 30 days

[Calculation]
Fargate: $35/month
ALB: $23/month
CloudWatch Logs: $3/month
Total: $61/month
```

### **Total Cost**

```
OpenAI API (GPT-4 Turbo): $350/month
Pinecone: $70/month
OpenAI Embeddings: $0.02/month (negligible)
AWS infrastructure: $61/month
────────────────────────
Total: about $481/month (about ¥68,000)
```

### Cost-Reduction Strategies

#### Strategy ①: Partial Downgrade to GPT-3.5 Turbo

```python
# 簡単な質問はGPT-3.5、複雑な質問はGPT-4
def select_model(query: str, context: str) -> str:
    # トークン数で判定
    total_tokens = len(query.split()) + len(context.split())

    if total_tokens < 500:
        return "gpt-3.5-turbo"  # $0.001/1K (入力)
    else:
        return "gpt-4-turbo"
```

**Effect**: route 50% of queries to GPT-3.5 → 30% cost reduction ($350 → $245)

#### Strategy ②: Caching

```python
import hashlib
from functools import lru_cache

class RAGCache:
    """よくある質問をキャッシュ"""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl = 3600  # 1時間

    def get_cache_key(self, query: str) -> str:
        return f"rag_cache:{hashlib.md5(query.encode()).hexdigest()}"

    async def get(self, query: str) -> str | None:
        key = self.get_cache_key(query)
        return await self.redis.get(key)

    async def set(self, query: str, answer: str):
        key = self.get_cache_key(query)
        await self.redis.setex(key, self.ttl, answer)

# 使用例
cache = RAGCache(redis_client)

async def rag_with_cache(query: str) -> str:
    # キャッシュ確認
    cached = await cache.get(query)
    if cached:
        return cached

    # RAG実行
    answer = await rag_system.query(query)

    # キャッシュ保存
    await cache.set(query, answer)

    return answer
```

**Effect**: cache hit rate 30% → 30% reduction in API calls ($350 → $245)

---

## Production-Operation Pitfalls and Error Handling

### Pitfall ①: OpenAI API Rate Limits

**Problem**: a sudden traffic surge hits the API-call limit.

**Solution**: Exponential Backoff + Retry

```python
import asyncio
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from openai import RateLimitError

@retry(
    retry=retry_if_exception_type(RateLimitError),
    wait=wait_exponential(multiplier=1, min=4, max=60),
    stop=stop_after_attempt(5)
)
async def call_openai_with_retry(prompt: str) -> str:
    """OpenAI API呼び出し（リトライ付き）"""
    response = await openai_client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=500
    )
    return response.choices[0].message.content
```

### Pitfall ②: Pinecone Search Timeout

**Problem**: a timeout (>10s) on a large vector search.

**Solution**: a timeout setting + fallback

```python
import asyncio

async def search_with_timeout(
    vectorstore,
    query: str,
    k: int = 5,
    timeout: float = 5.0
) -> List[Document]:
    """タイムアウト付き検索"""
    try:
        result = await asyncio.wait_for(
            vectorstore.asimilarity_search(query, k=k),
            timeout=timeout
        )
        return result
    except asyncio.TimeoutError:
        # フォールバック：キーワード検索
        return await fallback_keyword_search(query, k=k)
```

### Pitfall ③: Memory Leaks

**Problem**: LangChain objects keep lingering in memory.

**Solution**: explicit cleanup

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def rag_session():
    """RAGセッション管理（リソース自動解放）"""
    # 初期化
    vectorstore = Pinecone.from_existing_index(...)
    llm = ChatOpenAI(...)

    try:
        yield vectorstore, llm
    finally:
        # クリーンアップ
        del vectorstore
        del llm
        import gc
        gc.collect()

# 使用例
async def query_rag(user_query: str) -> str:
    async with rag_session() as (vectorstore, llm):
        # RAG処理
        results = await vectorstore.asimilarity_search(user_query)
        # ...
    # 自動的にリソース解放
```

---

## FastAPI Integration: Implementing the Production API

```python
from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import AsyncGenerator
import asyncio

app = FastAPI(title="Production RAG API")

class QueryRequest(BaseModel):
    query: str = Field(..., min_length=1, max_length=500)
    namespace: str = Field(default="product-docs")
    max_results: int = Field(default=5, ge=1, le=10)

class QueryResponse(BaseModel):
    answer: str
    confidence: float
    sources: list[dict]
    processing_time_ms: float

# RAGシステムの初期化（起動時）
@app.on_event("startup")
async def startup_event():
    global rag_system
    rag_system = initialize_rag_system()

@app.post("/api/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
    """RAGクエリエンドポイント"""
    import time
    start_time = time.time()

    try:
        # RAG実行
        result = await rag_system.query(
            query=request.query,
            namespace=request.namespace,
            k=request.max_results
        )

        processing_time = (time.time() - start_time) * 1000

        return QueryResponse(
            answer=result.answer,
            confidence=result.confidence,
            sources=[
                {
                    "id": src.id,
                    "title": src.metadata.get("title"),
                    "page": src.metadata.get("page"),
                    "score": src.score
                }
                for src in result.sources
            ],
            processing_time_ms=processing_time
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# ストリーミングレスポンス（リアルタイム回答）
@app.post("/api/query/stream")
async def query_stream_endpoint(request: QueryRequest):
    """ストリーミングRAGクエリ"""

    async def generate() -> AsyncGenerator[str, None]:
        async for chunk in rag_system.query_stream(request.query):
            yield f"data: {chunk}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

# ヘルスチェック
@app.get("/health")
async def health_check():
    return {"status": "healthy", "service": "rag-api"}
```

---

## Summary: Success Factors of a Production RAG

### Technical Points

1. **Hallucination countermeasures are defense-in-depth**: prompt + score + human review
2. **Accuracy evaluation must be quantified**: continuous monitoring with Recall/Precision/MRR
3. **Cost is optimized by operational design**: caching + model selection + scaling

### Operational Points

1. **The feedback loop is the lifeblood**: reflect human evaluation weekly
2. **Error handling to an almost-excessive degree**: retry + timeout + fallback
3. **Staged release**: internal-only → beta → full release

### Real-World Data (My Project)

- **Accuracy**: Recall@5 0.87, Accuracy 0.92 (human evaluation)
- **Response time**: average 1.8s (95th percentile 3.2s)
- **Cost**: about ¥70,000/month at 10,000 queries/month (within budget)
- **Availability**: 99.8% (monthly downtime < 1 hour)

---

## Next Steps

If you're struggling with RAG-system implementation, feel free to reach out. I'll bring the practical know-how of LangChain/Pinecone/OpenAI to your project.

[Contact me here](/contact)

---
