# Until you run generative-AI voice customer service 'in production': designing an unmanned kiosk with Bedrock × Whisper × Polly × pgvector

> Explaining in real code the design for taking a generative-AI voice agent that replaces in-store face-to-face service all the way to production, not a PoC. The real-time voice loop, an asynchronous/parallel inference pipeline, RAG with pgvector, the structural elimination of hallucination, and an AWS production architecture.

- Published: 2026-06-24
- Author: 友田 陽大
- Tags: AI, RAG, 音声AI, AWS Bedrock, Claude, pgvector, LangChain, Python, AWS, Terraform, アーキテクチャ設計
- URL: https://tomodahinata.com/en/blog/production-voice-ai-sales-agent-bedrock-pgvector
- Category: Voice AI
- Pillar guide: https://tomodahinata.com/en/blog/voice-ai-production-guide-stt-tts-voice-agents

## Key points

- Productionizing voice customer service hinges on crossing the five walls of latency, accuracy, safety, improvement, and operation by design.
- Latency is crossed by immediately returning HTTP (submit-and-poll) and parallel fan-out along the dependency graph, to prevent silence face-to-face.
- Don't extract product numbers/sizes with the LLM but match them against the master with regex, and divide labor — generation makes the words of explanation, the master holds the truth of the numbers — to eliminate wrong answers.
- Place an input-moderation gate with Claude as a low-temperature classifier before generation, to block irrelevant/inappropriate utterances.
- For RAG, don't add a dedicated DB but choose PostgreSQL + pgvector; first make it work correctly with a full scan, and clearly state HNSW as the migration target.

---

Consultations of "I want to automate store customer service with generative AI" have increased. If it's just making a demo, it now works over a weekend. Take the mic with `getUserMedia`, transcribe with Whisper, throw it at the LLM, and read it out with TTS — just this.

But **placing it at an unmanned storefront, in a state where you don't know what visitors will say, and keeping it running without breaking every day** is a completely different story. This article is a record of bridging, with real code, "the distance between a demo and production," from the experience of designing, implementing, and taking to production a generative-AI voice-customer-service kiosk for a retail store handling specialized merchandise (e.g., products like tires where the product number and size are everything).

The target readers are engineers and tech leads who want to put a generative-AI product on the business "without ending it at verification." The stack is Python (Flask) / React (TypeScript) / AWS (Bedrock, ECS Fargate, API Gateway, RDS+pgvector, Cognito, Lambda) / Terraform. The details of the related track record are left to the case study at the end; here I concentrate on the design judgments and trade-offs.

## The five walls that separate a demo from production

When running voice customer service in production, the walls you definitely hit were the following five.

| Wall | Ignorable in a demo | Fatal in production |
| --- | --- | --- |
| Latency | can wait 5 seconds | 5 seconds of silence face-to-face means the conversation doesn't hold |
| Accuracy | fine if it's plausible | an error in product number/size directly ties to a wrong order/complaint |
| Safety | only the developer talks | visitors say anything (irrelevant/inappropriate utterances) |
| Improvement | fix by hand | need a mechanism to recover "which answer missed" in operation |
| Operation | fine if it works locally | zero-downtime deploy, reproducible infrastructure, audit logs |

Below, I crush these five one by one.

## Overall architecture: separate the two surfaces

The first design judgment was to **completely separate "the surface visitors touch" and "the surface operators touch."** Because both the requirements and the security boundaries are entirely different.

- **Storefront kiosk (for visitors)**: converses by voice. Authentication is a short access code tied to the store terminal. Thoroughly optimized for "the speed of conversation."
- **Operation console (for operators)**: analysis of conversation logs, management of the knowledge (RAG), terminal management. Authentication is AWS Cognito. Thoroughly optimized for "accuracy and auditability."

The request flow is as follows. The operation console verifies the JWT with API Gateway's Cognito authorizer, then passes to the internal ECS via VPC Link.

```text
[customer] ── audio ──▶ CloudFront ──▶ ALB ──▶ ECS(Flask) ──▶ RDS(pgvector) / S3
                                               │
                                               ├─▶ Bedrock (Claude 3.5 Sonnet)
                                               ├─▶ OpenAI (Whisper / Embeddings)
                                               └─▶ Polly (TTS)

[operator] ── JWT ──▶ API Gateway ──(Cognito Authorizer)──▶ VPC Link ──▶ NLB ──▶ ALB ──▶ ECS
```

"Why interpose API Gateway only for the operation console" is described later, but the point is **to lean Cognito authentication on the managed side and not expose the internal ECS directly to the internet.**

## Wall ① latency: serial processing definitely breaks down

A voice-customer-service pipeline is essentially heavy serial processing.

```text
record → STT(transcription) → input check → vector search → LLM generation → TTS(speech synthesis) → return
```

If you write this straightforwardly in serial, each step's several hundred ms to several seconds pile up and easily exceed 5 seconds. In face-to-face service, 5 seconds of silence is the time judged as "it froze." I attacked here in two stages.

### Stage 1: return HTTP immediately, push generation to the background

`POST /api/conversation`, which uploads the audio, **immediately returns only the transcription result and a `taskId`,** and pushes the heavy generation processing to a background task. The client polls for the result using the `taskId`.

This design has two practical benefits. One is perceived latency (the user gets "what was heard" returned immediately). The other is that it **structurally avoids API Gateway's 29-second timeout constraint.** Even on a day when LLM generation is slow, the HTTP layer doesn't clog.

In Flask, I implemented this with `flask-executor` (a thread pool).

```python
class ConversationResource(Resource):
    @jwt_required()
    def post(self):
        executor = current_app.config["EXECUTOR_CLIENT"]
        audio_file = self._decode_audio(request)

        # 文字起こしと認証情報の取得は互いに独立 → 並列に流す
        futures = {
            executor.submit(
                openai.audio.transcriptions.create,
                model="whisper-1",
                file=audio_file,
                language="ja",
            ): "transcription",
            executor.submit(self.get_access_code): "access_code",
        }
        results = {}
        for future in as_completed(futures):
            results[futures[future]] = future.result()

        transcript = results["transcription"].text
        task = BackgroundTaskResult(encoded_audio=None)
        db.session.add(task)
        db.session.commit()

        # 重い生成処理はバックグラウンドへ。HTTPはここで即返す。
        executor.submit(self.background_task, transcript, results["access_code"], task.id)
        return {"transcript": transcript, "taskId": task.id}, 202
```

The polling side distinguishes "still processing (202)" and "completed (200 + audio)."

```python
class BackgroundTaskResultResource(Resource):
    def get(self, task_id):
        task = BackgroundTaskResult.query.get_or_404(task_id)
        if task.encoded_audio is None:
            return {"status": "in_progress"}, 202
        return {"status": "completed", "audio": task.encoded_audio}, 200
```

> Note: I chose a thread pool because this pipeline is dominated by I/O-bound (external-API waiting) and Gunicorn runs with `eventlet` workers. If CPU-bound preprocessing increases, the next move here is to lean toward SQS + a dedicated worker. It's important to keep the order of "first make it work, and move it to the right place when the bottleneck becomes visible."

### Stage 2: parallel fan-out inside the pipeline

Inside the background task too, processing with no dependency relationship is flowed in parallel. For example, "fetching session info," "input moderation," and "embedding generation" are mutually independent, so run them simultaneously to shorten the critical path.

```python
def background_task(self, transcript, access_code, task_id):
    executor = current_app.config["EXECUTOR_CLIENT"]

    # フェーズ1: 互いに独立 → ファンアウト
    futures = {
        executor.submit(self.get_session, access_code): "session",
        executor.submit(self.filter_question_content, transcript): "moderation",
        executor.submit(self.get_embedding, transcript): "embedding",
    }
    phase1 = {futures[f]: f.result() for f in as_completed(futures)}

    if phase1["moderation"] == "REJECT":
        return self._store_rejection(task_id)  # 生成に進ませない

    # フェーズ2: 検索・履歴取得・QAチェーン初期化も独立 → ファンアウト
    futures = {
        executor.submit(self.retrieve_documents, phase1["embedding"]): "docs",
        executor.submit(self.get_past_conversations, phase1["session"].id): "history",
        executor.submit(load_qa_chain, self.claude_llm): "qa_chain",
    }
    phase2 = {futures[f]: f.result() for f in as_completed(futures)}

    answer = self._generate(phase2["qa_chain"], phase2["docs"], phase2["history"], transcript)
    encoded = self.encode_audio(answer)  # Polly → base64
    self._persist(task_id, encoded, answer)
```

The point isn't to "make everything asynchronous" but to **draw the dependency graph and flow only the independent edges simultaneously.** Blind parallelization is a hotbed of bugs.

## Wall ② accuracy: eliminate wrong answers with a hybrid of generation and rules

The scariest thing with specialized merchandise is a "fluent wrong answer." The LLM confidently states a wrong product number. If it misspeaks the tire size `225/60R15` as `225/65R15`, it directly ties to a wrong order.

The design principle here is clear. **"Where ambiguity is allowed, lean to generation; where an error is fatal, lean to rules."**

Don't leave the extraction of product number/size to the LLM; extract it deterministically with regex from the utterance text and match it against the master table (`normalized_data`).

```python
TIRE_SIZE = re.compile(r"(\d{3})\s*[-/／]?\s*(\d{2})\s*[-/Rr]?\s*(\d{2})")

def resolve_product_number(self, transcript: str) -> str | None:
    m = TIRE_SIZE.search(transcript)
    if not m:
        return None
    w, aspect, rim = m.groups()
    row = NormalizedData.query.filter_by(
        digit_1=w, digit_2=aspect, digit_3=rim
    ).first()
    return row.product_number if row else None
```

If it can be matched, pass that fixed value to the LLM as a constraint of the answer. **Divide labor — generative AI plays the role of making "the words of explanation," and the master plays the role of holding "the truth of the numbers."** With just this, complaint-class wrong answers almost disappear.

The LLM-side generation, too, isn't used unconstrained. While passing the searched top documents and the recent conversation history to LangChain's QA chain, **constrain the answer length** (since it's read out by voice, an overly-long answer is a disqualification as customer service).

```python
prompt = (
    "あなたは店舗の接客スタッフです。以下の参考情報と会話履歴だけを根拠に、"
    "200文字以内で、確認できない事項は断定せず案内してください。\n"
    f"# 確定した商品番号: {product_number or '未確定'}\n"
    f"# 直近の会話:\n{recent_history}\n"
    f"# 質問: {transcript}"
)
answer = qa_chain.run({"input_documents": documents, "question": prompt})
```

## Wall ③ safety: gate the utterance before generation

An unmanned kiosk can't control "who says what." Irrelevant chatter, inappropriate utterances, and aggressive prompts fly in. So I placed **a gate that classifies and blocks the input before proceeding to generation.** Using Claude 3.5 Sonnet as a low-temperature (`temperature=0.1`) classifier, I make it return only `ACCEPT` / `REJECT`.

```python
def filter_question_content(self, transcript: str) -> str:
    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 5,
        "temperature": 0.1,
        "messages": [{
            "role": "user",
            "content": (
                "次の発話が店舗接客として適切なら ACCEPT、"
                "成人向け・政治・誹謗中傷・商材と無関係なら REJECT のみを返答:\n"
                f"{transcript}"
            ),
        }],
    })
    res = self.bedrock.invoke_model(
        modelId="anthropic.claude-3-5-sonnet-20240620-v1:0",
        body=body,
    )
    verdict = json.loads(res["body"].read())["content"][0]["text"].strip()
    return "REJECT" if "REJECT" in verdict else "ACCEPT"
```

Commonly using the same Bedrock Claude for the three purposes of "answer generation," "input moderation," and "judging whether to show an image/video" is also a cost/operational contrivance. Without increasing providers, model updates are done in one place.

## RAG foundation: why pgvector, and the limit of a full scan

For the foundation of knowledge search (RAG), I chose **PostgreSQL + pgvector** without adding a dedicated vector DB. There are three reasons.

1. **Simplicity of operation**: the business data and the embeddings ride on the same RDB. Backups, permissions, and transactions are unified.
2. **Cost**: don't increase the monthly fee or the separate monitoring of Pinecone, etc.
3. **Consistency**: conversations, documents, and search results (similarity scores) can be handled within the same transaction boundary.

For embeddings, I used OpenAI `text-embedding-3-large` **truncated to 1024 dimensions.** It's a balance point that secures search accuracy while holding down storage/compute cost.

Let me write this honestly. The initial implementation's search was **a naive method that reads out all documents and computes the inner product on the Python side.**

```python
def retrieve_documents(self, query_embedding):
    cur = self.conn.cursor()
    cur.execute("SELECT id, content, vector FROM documents")
    scored = [
        (np.dot(query_embedding, vec), content, doc_id)
        for doc_id, content, vec in cur.fetchall()
    ]
    scored.sort(key=lambda x: x[0], reverse=True)
    top = scored[:10]
    return [LangchainDocument(page_content=c) for _, c, _ in top], top
```

At a per-store catalog scale, this works practically enough. It's the judgment of **avoiding premature optimization and first making it work correctly.** But this is an O(n) full scan and breaks down when documents exceed tens of thousands. The correct migration target at scale is to lean toward pgvector's index (HNSW / IVFFlat) and distance operators.

```sql
-- スケール時の移行先: DB側でANN検索を完結させる
CREATE INDEX ON documents USING hnsw (vector vector_cosine_ops);

SELECT id, content
FROM documents
ORDER BY vector <=> %(query)s   -- コサイン距離。インデックスが効く
LIMIT 10;
```

"Make it work correctly at the current scale, and clearly state the limit and the path beyond it" — this, I believe, is RAG operation without bluffing. The search results are saved along with the similarity score in `vector_search_results`, **so that you can later trace 'why that answer came out.'** This pays off in the next "improvement loop."

## Wall ④ improvement: a loop that raises accuracy while operating

The biggest difference between a PoC and production is **"whether there's a mechanism to recover and fix the answers that missed."** The data model was designed to hold the full context of the conversation (summary).

```text
User ─< Terminal ─< Session ─< Conversation ─< Chat
                                     │
                                     ├─ vector_search_results (search basis + similarity)
                                     └─ documents (pgvector embeddings) ─< attachments(image/video)
normalized_data (product-number master)   background_task_results (async handle for generated audio)
```

`Conversation` holds `failure_reason` (a code like speech-recognition failure or moderation rejection), `process_time` (the measured latency), and furthermore, as **teacher data added later by the operator,** `failure_reason_should_be` and `system_response_message_should_be`.

When an operator who looked at a conversation in the operation console inputs "this answer should originally have been like this," it can be **re-embedded and re-injected into the knowledge.** That is, operation itself becomes a loop that produces learning data. If you accumulate `process_time`, you can also detect latency regression.

## Wall ⑤ operation: two-layer authentication and reproducible infrastructure

### Separate authentication by purpose

Handling visitors and operators with the same authentication is poor design. So I separated it into two layers.

- **Kiosk**: verify a 6-digit access code tied to the store terminal, and issue a JWT in an `HttpOnly` cookie (access 24h / refresh 7d, with a CSRF token). Visitors don't do a login operation.
- **Operation console**: AWS Cognito (SRP authentication). Verify the JWT with API Gateway's Cognito authorizer, then pass it inside.

```python
# キオスク: アクセスコード → JWT(Cookie)
class AccessCodeResource(Resource):
    def post(self):
        code = AccessCodeSchema().load(request.get_json())["accessCode"]
        terminal = Terminal.query.filter_by(access_code=code, is_active=True).first()
        if terminal is None:
            return {"message": "invalid code"}, 401
        access = create_access_token(identity={"token": code},
                                    expires_delta=timedelta(hours=24))
        resp = make_response({"ok": True})
        set_access_cookies(resp, access, max_age=24 * 60 * 60)  # HttpOnly + CSRF
        return resp
```

Input is boundary-validated with Marshmallow schemas, and DB access is via SQLAlchemy's ORM (parameterized queries). Catalog images/videos on S3 are distributed only by signed URLs, and the bucket isn't made public.

### Infrastructure fully coded with Terraform

So that `staging` and `production` can be reproduced with the **same configuration,** I managed everything from the VPC to the app with Terraform. The backend is ECS Fargate (lightweight tasks, zero-downtime with a rolling deploy).

```hcl
resource "aws_ecs_service" "backend" {
  name            = "${var.env}-backend"
  cluster         = aws_ecs_cluster.backend.id
  task_definition = aws_ecs_task_definition.backend.arn
  launch_type     = "FARGATE"
  desired_count   = 1

  deployment_configuration {
    maximum_percent         = 200  # 新タスクを立ててから
    minimum_healthy_percent = 100  # 旧タスクを落とす = 無停止
  }
}
```

The RAG document ingestion, being heavy processing (embedding generation), was separated into a VPC-attached Lambda. While writing directly to RDS, it exits to OpenAI via NAT.

```hcl
resource "aws_lambda_function" "insert_rag_document" {
  function_name = "${var.env}-insert-rag-document"
  runtime       = "python3.11"
  timeout       = 300            # 大きめのPDFも完走させる
  vpc_config {                   # RDS(pgvector)へ到達するためVPC内に置く
    subnet_ids         = [var.lambda_subnet_id]
    security_group_ids = [var.lambda_sg_id]
  }
}
```

Why does the operation console alone have many layers, "API Gateway → VPC Link → NLB → ALB → ECS"? It's to **offload Cognito authentication to API Gateway's authorizer** while not exposing the internal ECS publicly. Since VPC Link requires an NLB as the connection target, the configuration places an NLB in front of the ALB.

### Deployment automated with GitHub Actions

```yaml
# バックエンド: ビルド → ECRへpush → ECSローリングデプロイ
- run: docker build -t $ECR_REPO:latest -f backend/Dockerfile.prod ./backend
- run: docker push $ECR_REPO:latest
- run: |
    aws ecs update-service --cluster $CLUSTER --service $SERVICE \
      --force-new-deployment
```

The front is S3 sync + CloudFront invalidation. The SPA's routing is absorbed by CloudFront's "404 → `/index.html` (200)."

## How I guaranteed observability, resilience, and idempotency

Finally, let me organize the "plain but important" designs that matter in production operation.

- **Observability**: record `process_time` and `failure_reason` per conversation. ECS / RDS go to CloudWatch Logs. "Slow / missed" can be traced per conversation.
- **Resilience**: don't drop generation failures/moderation rejections with an exception, but always converge them into one record with `failure_reason`. Always return some audio to the visitor (don't let it freeze silently).
- **Idempotency**: since async results are written to `background_task_results` keyed by `taskId`, double generation doesn't occur with polling re-sends or network retries.
- **Testability**: deterministic logic like product-number extraction/normalization is carved out into pure functions and can be unit-tested without external APIs/DB. Having separated the "ambiguous LLM part" and the "strict logic part" also pays off in the test strategy.

## Summary: a demo is talent, production is design

Anyone can now make a demo of voice customer service. What makes the difference is **how you cross "the five walls of production" by design.**

1. **Latency** is crossed by immediate HTTP return (submit-and-poll) + parallel fan-out along the dependency graph.
2. **Accuracy** is crossed by a hybrid of generation (fluency) and rules (product-number master matching).
3. **Safety** is crossed by placing an input-moderation gate before generation.
4. **Improvement** is crossed by a loop where you leave the search basis and teacher data, and operation produces learning data.
5. **Operation** is crossed by purpose-specific two-layer authentication, and reproducible infrastructure / zero-downtime deploy with Terraform.

The value of a generative-AI product isn't "calling a smart model" but **the design that meshes the model's ambiguity with the strictness of the business.** Whether you can do that carefully is, I believe, the watershed that turns a PoC into production.

---

How I actually built this voice-customer-service kiosk and what technology judgments I made are introduced in detail in the case study. If you want to put generative AI on the business without ending it at "verification," please feel free to consult me.
