Published on

Day 44: Observability Cho LLM App

Authors

1. Mục Tiêu

Sau bài này, bạn cần làm được các việc sau:

  • Thiết kế observability cho LLM/RAG app theo 3 lớp: logs, metrics và traces.
  • Đo được latency tổng, latency theo stage, throughput, error rate, token usage, cost/request và TTFT.
  • Tạo trace schema đủ sâu cho RAG: query, retrieved chunks, reranked chunks, context, prompt, generation, citation validation và feedback.
  • Gắn user feedback với trace_id, prompt version, model version, index version và retrieved chunks để debug regression.
  • So sánh được Langfuse, LangSmith, OpenTelemetry, Prometheus/Grafana và ELK/OpenSearch.
  • Thiết kế privacy, redaction, sampling, retention và access control trước khi log prompt/context/output.
  • Trả lời được: dùng được trong production không, nếu có thì cần điều kiện gì.

2. Tư Duy Chính

LLM app không chỉ fail theo kiểu HTTP 500. Nó có thể:

  • Trả lời chậm nhưng vẫn 200 OK.
  • Dùng quá nhiều token vì prompt/context phình to.
  • Retrieve nhầm document nhưng model vẫn viết câu trả lời nghe hợp lý.
  • Reranker đẩy chunk đúng xuống dưới.
  • Citation trỏ sai source.
  • Output đúng về ngữ pháp nhưng sai policy.
  • Cost tăng vì model router chọn model đắt.
  • User bấm thumbs down nhưng team không biết prompt, model, index và chunks nào đã tạo ra answer đó.

Vì vậy observability cho LLM app phải trả lời được 6 câu hỏi:

  1. Request nào bị lỗi hoặc chậm?
  2. Lỗi nằm ở stage nào: retrieval, rerank, context builder, model call, citation validation hay feedback?
  3. Version nào liên quan: prompt, model, embedding model, reranker, index, guardrail?
  4. Token usage và cost/request là bao nhiêu?
  5. Output có đạt quality signal không: citation valid, no-answer đúng, feedback tốt?
  6. Có log dữ liệu nhạy cảm quá mức không?

3. Map Sang Senior Software Engineering

LLM/RAG conceptSE concept tương đươngObservability cần có
PromptConfig/versioned templateVersion, input size, rendered length, owner
Model providerDownstream serviceLatency, timeout, retry, error code, rate limit
Embedding/indexSearch infrastructureIndex version, top-k, score distribution, empty retrieval
RerankerRanking serviceCandidate count, latency, top score, score gap
Context builderRequest payload builderContext token, truncation, selected chunk ids
Citation validationContract validationValid/invalid count, failure reason
User feedbackProduct quality signalRating, reason, trace linkage, triage status
Trace storeDebug databaseRetention, access control, sampling, redaction

Điểm khác biệt lớn so với API truyền thống: success không chỉ là status code. Một LLM request thành công về mặt HTTP vẫn có thể thất bại về mặt quality.

4. Logs, Metrics, Traces

Logs

Logs là event chi tiết, thường ở dạng structured JSON. Dùng logs để debug một request cụ thể hoặc search theo event.

Ví dụ event:

{
  "timestamp": "2026-05-10T10:15:21.120Z",
  "level": "INFO",
  "event": "retrieval_completed",
  "trace_id": "tr_01hxx",
  "tenant_id": "acme",
  "route": "/query",
  "top_k": 20,
  "candidate_count": 18,
  "empty_retrieval": false,
  "latency_ms": 42,
  "index_version": "policy-index-2026-05-01"
}

Log tốt có key ổn định, có trace_id, không chứa secret và không dùng message text tự do làm nguồn dữ liệu chính.

Metrics

Metrics là số liệu aggregate để dashboard và alert.

Ví dụ:

  • rag_request_total{route,status}.
  • rag_stage_latency_seconds{stage}.
  • rag_token_total{model,type} với type=input|output.
  • rag_cost_usd_total{model,tenant_tier}.
  • rag_empty_retrieval_total{index_version}.
  • rag_citation_invalid_total{reason}.
  • llm_ttft_seconds{model}.

Không đưa trace_id, user_id, query, chunk_id vào label Prometheus. Các field đó có cardinality cao, làm metric store phình nhanh và dashboard chậm.

Traces

Trace mô tả đường đi của một request qua nhiều stage. Với RAG, trace quan trọng hơn log text rời rạc vì nó giữ quan hệ cha con:

rag.query
  query.rewrite
  retrieval.hybrid_search
  rerank.cross_encoder
  context.build
  llm.generate
  citation.validate
  feedback.attach

Trace giúp trả lời: tổng latency 3.2s là do model call 2.6s, reranker 400ms hay vector DB 180ms.

5. Golden Signals Cho LLM App

SignalMetric cụ thểVì sao quan trọng
Latencytotal latency, stage latency, p50/p95/p99, TTFTUser cảm nhận tốc độ qua first token và tổng thời gian
Trafficrequests/minute, tokens/minute, streaming sessionsDự báo capacity và rate limit
Errorstimeout, provider error, schema violation, empty retrieval, invalid citationTách lỗi infrastructure khỏi lỗi quality
Saturationqueue length, CPU/RAM/GPU, connection pool, provider rate limitBiết hệ thống đang nghẽn ở đâu
Costcost/request, cost/day, cost by model/tenant/featureKiểm soát ngân sách và pricing
Qualityfeedback rating, no-answer accuracy, citation failure rate, retrieval hit rateLLM app cần đo đúng/sai, không chỉ uptime

TTFT là time to first token: thời gian từ lúc nhận request đến token đầu tiên của model stream. Với UI streaming, TTFT thường quyết định cảm giác "app có phản hồi" hơn total latency.

6. Trace Schema Cho RAG

Trace schema nên đủ chi tiết để debug nhưng không bắt buộc log raw content ở mọi môi trường.

{
  "trace_id": "tr_01hxx4e6kg9k7r8y0x",
  "request_id": "req_9d2c",
  "session_id_hash": "sha256:7c0f...",
  "tenant_id": "acme",
  "user_id_hash": "sha256:aa31...",
  "route": "/query",
  "environment": "prod",
  "query": {
    "raw_redacted": "Chính sách nghỉ phép năm 2026 là gì?",
    "raw_hash": "sha256:2f67...",
    "language": "vi",
    "length_chars": 38
  },
  "rewrite": {
    "enabled": true,
    "rewritten_query_redacted": "Chính sách nghỉ phép nhân viên năm 2026",
    "latency_ms": 35
  },
  "retrieval": {
    "strategy": "hybrid_dense_bm25_rrf",
    "index_version": "policy-index-2026-05-01",
    "embedding_model": "text-embedding-3-small",
    "top_k": 30,
    "latency_ms": 64,
    "empty": false,
    "candidates": [
      {
        "rank": 1,
        "chunk_id": "policy_2026::chunk_018",
        "document_id": "policy_2026",
        "source_uri_hash": "sha256:c91e...",
        "score_dense": 0.82,
        "score_sparse": 12.7,
        "score_rrf": 0.041,
        "acl_matched": true
      }
    ]
  },
  "rerank": {
    "enabled": true,
    "reranker_model": "bge-reranker-v2-m3",
    "candidate_count": 30,
    "selected_count": 8,
    "latency_ms": 210,
    "top_score": 0.91,
    "score_gap_top2": 0.08
  },
  "context": {
    "chunk_ids": ["policy_2026::chunk_018", "policy_2026::chunk_021"],
    "context_tokens": 1840,
    "truncated": false,
    "max_context_tokens": 6000
  },
  "generation": {
    "provider": "openai",
    "model": "gpt-4.1-mini",
    "prompt_version": "rag-answer-v7",
    "temperature": 0.1,
    "input_tokens": 2300,
    "output_tokens": 420,
    "ttft_ms": 780,
    "latency_ms": 2800,
    "finish_reason": "stop",
    "estimated_cost_usd": 0.001592
  },
  "validation": {
    "schema_valid": true,
    "citation_valid": true,
    "citation_failure_reason": null,
    "guardrail_action": "allow"
  },
  "feedback": {
    "rating": null,
    "reason": null,
    "comment_redacted": null
  },
  "result": {
    "status": "success",
    "error_type": null,
    "total_latency_ms": 3140
  }
}

Trong production nghiêm ngặt, raw_redacted có thể tắt theo tenant. Khi đó vẫn giữ raw_hash, length, language, version và metadata để debug aggregate.

7. Event Taxonomy

Nên định nghĩa event cố định từ đầu:

EventKhi ghiField quan trọng
query_receivedNhận requesttrace_id, tenant, route, query_hash
query_rewrittenRewrite xonglatency, rewrite_enabled, rewritten_hash
retrieval_completedSearch xongindex_version, top_k, count, empty, latency
rerank_completedRerank xongreranker_model, candidate_count, selected_count, latency
context_builtBuild context xongchunk_count, context_tokens, truncated
generation_startedBắt đầu gọi modelprovider, model, prompt_version
first_token_receivedCó token đầu tiênttft_ms, model
generation_completedModel trả xonginput_tokens, output_tokens, latency, cost
citation_validatedValidate citationvalid, failure_reason
feedback_receivedUser feedbacktrace_id, rating, reason
request_failedRequest lỗierror_type, stage, retryable
guardrail_blockedGuardrail chặnpolicy, action, stage

Event taxonomy giúp team không phải đoán tên field khi viết dashboard hoặc query log.

8. Code Gần Production

Đoạn code dưới đây minh họa instrumentation ở mức service thật: có trace_id, redaction, structured JSON logs, OpenTelemetry spans, Prometheus metrics, token/cost accounting và feedback endpoint.

8.1 Dependencies

pip install fastapi uvicorn pydantic prometheus-client opentelemetry-api opentelemetry-sdk

Nếu deploy bằng Gunicorn nhiều worker, Prometheus client cần cấu hình multiprocess riêng. Với bài học này, ta giữ ví dụ ở mức một process để tập trung vào observability contract.

8.2 OpenTelemetry Startup

Trong service thật, cấu hình tracer provider ở startup. Local development có thể dùng console exporter; production thường thay bằng OTLP exporter hoặc exporter mà platform đang dùng.

from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter


def configure_tracing() -> None:
    resource = Resource.create(
        {
            "service.name": "rag-api",
            "service.version": "1.0.0",
            "deployment.environment": "dev",
        }
    )
    provider = TracerProvider(resource=resource)
    provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
    trace.set_tracer_provider(provider)

Gọi configure_tracing() một lần khi app start. Không gọi lại trong từng request.

8.3 Shared Observability Module

from __future__ import annotations

import hashlib
import json
import logging
import os
import re
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Any, Literal

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from prometheus_client import Counter, Gauge, Histogram

LOGGER = logging.getLogger("rag.observability")
LOGGER.setLevel(logging.INFO)

SALT = os.environ.get("OBSERVABILITY_HASH_SALT", "dev-only-change-me")
RAW_CONTENT_LOGGING = os.environ.get("RAW_CONTENT_LOGGING", "false").lower() == "true"

tracer = trace.get_tracer("rag-api", "1.0.0")

REQUESTS = Counter(
    "rag_request_total",
    "Total RAG requests",
    ["route", "status"],
)
STAGE_LATENCY = Histogram(
    "rag_stage_latency_seconds",
    "Latency by RAG stage",
    ["stage"],
    buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30),
)
TTFT = Histogram(
    "llm_ttft_seconds",
    "Time to first token",
    ["model"],
    buckets=(0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10),
)
TOKENS = Counter(
    "llm_token_total",
    "LLM token usage",
    ["model", "type"],
)
COST = Counter(
    "llm_cost_usd_total",
    "Estimated LLM cost in USD",
    ["model"],
)
IN_FLIGHT = Gauge(
    "rag_requests_in_flight",
    "RAG requests currently being processed",
    ["route"],
)
EMPTY_RETRIEVAL = Counter(
    "rag_empty_retrieval_total",
    "RAG requests with zero retrieved chunks",
    ["index_version"],
)
CITATION_INVALID = Counter(
    "rag_citation_invalid_total",
    "Invalid citation count",
    ["reason"],
)

MODEL_PRICE_USD_PER_1M = {
    "gpt-4.1-mini": {"input": Decimal("0.40"), "output": Decimal("1.60")},
    "gpt-4.1": {"input": Decimal("2.00"), "output": Decimal("8.00")},
}

PII_PATTERNS = [
    (re.compile(r"[\w.+-]+@[\w-]+\.[\w.-]+"), "[EMAIL]"),
    (re.compile(r"\b(?:\+?84|0)(?:\d[\s.-]?){8,10}\b"), "[PHONE]"),
    (re.compile(r"\b\d{9,12}\b"), "[ID_NUMBER]"),
]


def hash_value(value: str | None) -> str | None:
    if not value:
        return None
    digest = hashlib.sha256(f"{SALT}:{value}".encode("utf-8")).hexdigest()
    return f"sha256:{digest}"


def redact_text(text: str | None) -> str | None:
    if text is None:
        return None
    redacted = text
    for pattern, replacement in PII_PATTERNS:
        redacted = pattern.sub(replacement, redacted)
    return redacted


def safe_content(text: str | None) -> str | None:
    if text is None:
        return None
    return redact_text(text) if RAW_CONTENT_LOGGING else None


def estimate_cost_usd(model: str, input_tokens: int, output_tokens: int) -> Decimal:
    price = MODEL_PRICE_USD_PER_1M.get(model)
    if not price:
        return Decimal("0")
    input_cost = Decimal(input_tokens) * price["input"] / Decimal(1_000_000)
    output_cost = Decimal(output_tokens) * price["output"] / Decimal(1_000_000)
    return (input_cost + output_cost).quantize(Decimal("0.000001"))


def log_event(event: str, **fields: Any) -> None:
    payload = {
        "event": event,
        "timestamp_ms": int(time.time() * 1000),
        **fields,
    }
    LOGGER.info(json.dumps(payload, ensure_ascii=False, default=str))


@dataclass
class RagTrace:
    trace_id: str
    route: str
    tenant_id: str
    user_id_hash: str | None
    prompt_version: str
    model: str
    index_version: str
    status: Literal["success", "error"] = "success"
    error_type: str | None = None
    stage_latency_ms: dict[str, int] = field(default_factory=dict)
    token_usage: dict[str, int] = field(default_factory=lambda: {"input": 0, "output": 0})
    estimated_cost_usd: Decimal = Decimal("0")


@contextmanager
def measured_stage(trace_record: RagTrace, stage: str, **span_attrs: Any):
    start = time.perf_counter()
    with tracer.start_as_current_span(f"rag.{stage}") as span:
        span.set_attributes(
            {
                "rag.trace_id": trace_record.trace_id,
                "rag.tenant_id": trace_record.tenant_id,
                "rag.stage": stage,
                **{k: v for k, v in span_attrs.items() if v is not None},
            }
        )
        try:
            yield span
            span.set_status(Status(StatusCode.OK))
        except Exception as exc:
            span.record_exception(exc)
            span.set_status(Status(StatusCode.ERROR, str(exc)))
            trace_record.status = "error"
            trace_record.error_type = type(exc).__name__
            raise
        finally:
            elapsed_seconds = time.perf_counter() - start
            elapsed_ms = round(elapsed_seconds * 1000)
            trace_record.stage_latency_ms[stage] = elapsed_ms
            STAGE_LATENCY.labels(stage=stage).observe(elapsed_seconds)

Điểm production quan trọng trong module này:

  • trace_id được propagate qua log, metric correlation và span attributes.
  • Raw content mặc định không log. Muốn bật phải dùng env flag và vẫn redact.
  • Prometheus label chỉ dùng field cardinality thấp như stage, model, status.
  • Cost dùng Decimal để tránh lỗi làm tròn khi aggregate.
  • Exception được record vào span và vẫn có log event riêng ở API layer.

8.4 FastAPI Query Endpoint

from typing import Annotated

from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel, Field
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from starlette.responses import Response

app = FastAPI(title="RAG API with Observability")


class QueryRequest(BaseModel):
    query: str = Field(min_length=1, max_length=4000)
    session_id: str | None = None
    top_k: int = Field(default=20, ge=1, le=100)


class QueryResponse(BaseModel):
    trace_id: str
    answer: str
    citations: list[dict[str, str]]
    usage: dict[str, int]
    estimated_cost_usd: str


@app.get("/metrics")
def metrics() -> Response:
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)


@app.post("/query", response_model=QueryResponse)
async def query(
    request: QueryRequest,
    x_tenant_id: Annotated[str, Header(alias="X-Tenant-Id")] = "demo",
    x_user_id: Annotated[str | None, Header(alias="X-User-Id")] = None,
) -> QueryResponse:
    trace_id = f"tr_{uuid.uuid4().hex}"
    route = "/query"
    model = "gpt-4.1-mini"
    prompt_version = "rag-answer-v7"
    index_version = "policy-index-2026-05-01"
    trace_record = RagTrace(
        trace_id=trace_id,
        route=route,
        tenant_id=x_tenant_id,
        user_id_hash=hash_value(x_user_id),
        prompt_version=prompt_version,
        model=model,
        index_version=index_version,
    )

    IN_FLIGHT.labels(route=route).inc()
    start = time.perf_counter()

    with tracer.start_as_current_span("rag.query") as root_span:
        root_span.set_attributes(
            {
                "rag.trace_id": trace_id,
                "rag.tenant_id": x_tenant_id,
                "rag.prompt_version": prompt_version,
                "rag.model": model,
                "rag.index_version": index_version,
            }
        )
        log_event(
            "query_received",
            trace_id=trace_id,
            tenant_id=x_tenant_id,
            user_id_hash=trace_record.user_id_hash,
            query_hash=hash_value(request.query),
            query_redacted=safe_content(request.query),
            query_length_chars=len(request.query),
            top_k=request.top_k,
        )

        try:
            with measured_stage(trace_record, "retrieval", index_version=index_version):
                retrieved = await retrieve_chunks(request.query, top_k=request.top_k)
                if not retrieved:
                    EMPTY_RETRIEVAL.labels(index_version=index_version).inc()

            log_event(
                "retrieval_completed",
                trace_id=trace_id,
                tenant_id=x_tenant_id,
                index_version=index_version,
                top_k=request.top_k,
                candidate_count=len(retrieved),
                empty_retrieval=len(retrieved) == 0,
                latency_ms=trace_record.stage_latency_ms["retrieval"],
                chunk_ids=[chunk["chunk_id"] for chunk in retrieved[:10]],
            )

            with measured_stage(trace_record, "rerank", candidate_count=len(retrieved)):
                reranked = await rerank_chunks(request.query, retrieved)

            with measured_stage(trace_record, "context_build", selected_count=len(reranked[:8])):
                context = build_context(reranked[:8])

            log_event(
                "context_built",
                trace_id=trace_id,
                chunk_count=len(context["chunk_ids"]),
                context_tokens=context["token_count"],
                truncated=context["truncated"],
            )

            with measured_stage(trace_record, "generation", model=model, prompt_version=prompt_version):
                generation = await generate_answer_streaming(
                    query=request.query,
                    context=context["text"],
                    model=model,
                    prompt_version=prompt_version,
                    trace_id=trace_id,
                )

            input_tokens = int(generation["usage"]["input_tokens"])
            output_tokens = int(generation["usage"]["output_tokens"])
            cost = estimate_cost_usd(model, input_tokens, output_tokens)
            trace_record.token_usage = {"input": input_tokens, "output": output_tokens}
            trace_record.estimated_cost_usd = cost

            TOKENS.labels(model=model, type="input").inc(input_tokens)
            TOKENS.labels(model=model, type="output").inc(output_tokens)
            COST.labels(model=model).inc(float(cost))
            TTFT.labels(model=model).observe(generation["ttft_ms"] / 1000)

            with measured_stage(trace_record, "citation_validation"):
                validation = validate_citations(generation["citations"], context["chunk_ids"])
                if not validation["valid"]:
                    CITATION_INVALID.labels(reason=validation["reason"]).inc()

            log_event(
                "generation_completed",
                trace_id=trace_id,
                tenant_id=x_tenant_id,
                model=model,
                prompt_version=prompt_version,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                ttft_ms=generation["ttft_ms"],
                latency_ms=trace_record.stage_latency_ms["generation"],
                estimated_cost_usd=str(cost),
                finish_reason=generation["finish_reason"],
            )
            log_event(
                "citation_validated",
                trace_id=trace_id,
                valid=validation["valid"],
                failure_reason=validation["reason"],
            )

            REQUESTS.labels(route=route, status="success").inc()
            total_latency_ms = round((time.perf_counter() - start) * 1000)
            log_event(
                "query_completed",
                trace_id=trace_id,
                status="success",
                total_latency_ms=total_latency_ms,
                stage_latency_ms=trace_record.stage_latency_ms,
                token_usage=trace_record.token_usage,
                estimated_cost_usd=str(trace_record.estimated_cost_usd),
            )

            return QueryResponse(
                trace_id=trace_id,
                answer=generation["answer"],
                citations=generation["citations"],
                usage=trace_record.token_usage,
                estimated_cost_usd=str(trace_record.estimated_cost_usd),
            )

        except TimeoutError as exc:
            REQUESTS.labels(route=route, status="timeout").inc()
            log_event(
                "request_failed",
                trace_id=trace_id,
                status="timeout",
                error_type=type(exc).__name__,
                stage_latency_ms=trace_record.stage_latency_ms,
            )
            raise HTTPException(status_code=504, detail={"trace_id": trace_id, "error": "timeout"}) from exc
        except Exception as exc:
            REQUESTS.labels(route=route, status="error").inc()
            log_event(
                "request_failed",
                trace_id=trace_id,
                status="error",
                error_type=type(exc).__name__,
                stage_latency_ms=trace_record.stage_latency_ms,
            )
            raise HTTPException(status_code=500, detail={"trace_id": trace_id, "error": "internal_error"}) from exc
        finally:
            IN_FLIGHT.labels(route=route).dec()

Các hàm retrieve_chunks, rerank_chunks, build_context, generate_answer_streamingvalidate_citations là boundary của app. Bài học không ràng buộc bạn vào provider cụ thể, nhưng observability contract phải ổn định dù implementation đổi từ Qdrant sang pgvector hoặc từ model A sang model B.

8.5 Đo TTFT Với Streaming

async def generate_answer_streaming(
    query: str,
    context: str,
    model: str,
    prompt_version: str,
    trace_id: str,
) -> dict[str, Any]:
    started = time.perf_counter()
    first_token_ms: int | None = None
    chunks: list[str] = []
    last_event: Any | None = None

    prompt = render_prompt(prompt_version=prompt_version, query=query, context=context)
    input_tokens_estimate = estimate_tokens(prompt)

    log_event(
        "generation_started",
        trace_id=trace_id,
        model=model,
        prompt_version=prompt_version,
        prompt_tokens_estimate=input_tokens_estimate,
    )

    async for token_event in llm_client.stream(model=model, prompt=prompt, temperature=0.1):
        last_event = token_event
        if first_token_ms is None:
            first_token_ms = round((time.perf_counter() - started) * 1000)
            log_event("first_token_received", trace_id=trace_id, model=model, ttft_ms=first_token_ms)
        chunks.append(token_event.text)

    answer = "".join(chunks)
    provider_usage = getattr(last_event, "usage", None)
    finish_reason = getattr(last_event, "finish_reason", None)
    usage = provider_usage if provider_usage else {
        "input_tokens": input_tokens_estimate,
        "output_tokens": estimate_tokens(answer),
    }

    return {
        "answer": answer,
        "citations": extract_citations(answer),
        "usage": usage,
        "ttft_ms": first_token_ms or round((time.perf_counter() - started) * 1000),
        "finish_reason": finish_reason or "unknown",
    }

Nếu provider không trả token usage, có thể estimate bằng tokenizer tương thích. Nhưng trong billing production, estimate chỉ dùng cho dashboard gần đúng. Billing thật vẫn cần đối soát với provider invoice.

9. Feedback Loop

Feedback phải gắn với trace_id. Nếu chỉ lưu thumbs_down mà không lưu trace, feedback gần như vô dụng cho debug.

from datetime import datetime, timezone
from typing import Literal

from pydantic import BaseModel, Field


class FeedbackRequest(BaseModel):
    trace_id: str = Field(pattern=r"^tr_[a-f0-9]+$")
    rating: Literal["up", "down"]
    reason: Literal[
        "helpful",
        "wrong_answer",
        "wrong_source",
        "missing_context",
        "too_slow",
        "unsafe",
        "other",
    ]
    comment: str | None = Field(default=None, max_length=2000)


@app.post("/feedback")
async def feedback(
    request: FeedbackRequest,
    x_tenant_id: Annotated[str, Header(alias="X-Tenant-Id")] = "demo",
    x_user_id: Annotated[str | None, Header(alias="X-User-Id")] = None,
) -> dict[str, str]:
    feedback_record = {
        "trace_id": request.trace_id,
        "tenant_id": x_tenant_id,
        "user_id_hash": hash_value(x_user_id),
        "rating": request.rating,
        "reason": request.reason,
        "comment_redacted": safe_content(request.comment),
        "comment_hash": hash_value(request.comment),
        "created_at": datetime.now(timezone.utc).isoformat(),
        "triage_status": "new",
    }
    await feedback_store.insert(feedback_record)
    log_event("feedback_received", **feedback_record)
    return {"status": "accepted", "trace_id": request.trace_id}

Feedback loop thực tế:

  1. User gửi feedback.
  2. Feedback store join với trace store qua trace_id.
  3. Triage phân loại lỗi: retrieval, rerank, prompt, generation, citation, policy, UX.
  4. Lỗi chất lượng được đưa vào golden set hoặc regression set.
  5. Release sau phải chứng minh metric cải thiện hoặc không regression.

10. Tooling Comparison

ToolMạnh ở đâuYếu ở đâuKhi nên dùng
LangfuseLLM trace, prompt versioning, cost, score, feedback, dataset/eval workflowCần xem xét data policy nếu dùng SaaS; self-host cần vận hành thêmCapstone, LLM app độc lập, team muốn observability chuyên cho prompt/model
LangSmithTrace/eval tốt trong ecosystem LangChain/LangGraphLock-in theo LangChain/LangGraph nhiều hơn; không thay thế metrics infraApp dùng LangChain/LangGraph và cần debug chain/agent
OpenTelemetryVendor-neutral traces, span context, integration microserviceKhông tự hiểu prompt/chunk/feedback nếu bạn không thiết kế attributesEnterprise, nhiều service, cần gửi trace sang nhiều backend
Prometheus/GrafanaMetrics, alert, SLO dashboard, chi phí thấp, phổ biếnKhông lưu trace chi tiết; label cardinality phải kiểm soátProduction metrics mặc định cho API/RAG service
ELK/OpenSearchSearch structured logs, incident investigation, retention policyTốn storage nếu log raw prompt/context; cần index lifecycleKhi cần search log chi tiết, audit và debug theo event
Custom JSON trace storeKiểm soát schema, rẻ cho capstone nhỏPhải tự làm dashboard/report/retentionLearning project, MVP nội bộ, policy không cho gửi data ra SaaS

Không cần dùng tất cả từ ngày đầu. Vấn đề cần giải quyết trước là schema và policy. Tool chỉ là nơi lưu và hiển thị.

11. Trade-off Quan Trọng

Quyết địnhLợi íchChi phí/rủi roKhuyến nghị
Log raw prompt/context/outputDebug quality rất tốtRủi ro PII/confidential data, storage cost caoTắt mặc định, chỉ bật theo tenant/debug window, có redaction và approval
Metadata-only traceAn toàn hơn, rẻ hơnDebug hallucination khó hơnDefault production cho dữ liệu nhạy cảm
Full tracing 100% requestKhông bỏ sót lỗi hiếmStorage và export overhead100% metadata trace, sample raw content theo policy
SamplingGiảm costCó thể bỏ sót incidentAlways keep errors, timeouts, thumbs down; sample success
Prometheus labels chi tiếtQuery dashboard có vẻ tiệnCardinality explosionLabel chỉ dùng field ít giá trị; chi tiết đưa vào logs/traces
SaaS observabilityNhanh, UI tốtData residency, cost, vendor dependencyDùng nếu data policy cho phép và có DPA/security review
Self-hostKiểm soát dữ liệuTốn vận hànhDùng khi compliance yêu cầu hoặc scale đủ lớn
Sync loggingCode đơn giảnTăng latency tailVới traffic cao, dùng queue/batch exporter

12. Best Solution Theo Context Và Performance

Capstone hoặc MVP nội bộ

Best solution:

  • Structured JSON logs.
  • Một trace table hoặc JSONL trace store.
  • Prometheus metrics cho latency, errors, token, cost.
  • Grafana dashboard đơn giản.
  • Feedback endpoint gắn với trace_id.
  • Raw content logging tắt mặc định, bật tạm khi debug.

Lý do: đủ chứng minh production thinking, ít dependency, dễ chạy local và ít rủi ro data policy.

App dùng LangChain hoặc LangGraph nhiều

Best solution:

  • LangSmith để trace chain/agent/eval.
  • Prometheus/Grafana cho service metrics và alert.
  • Structured logs hoặc ELK/OpenSearch cho audit event.
  • Redaction callback trước khi gửi prompt/tool args ra ngoài.

Lý do: LangSmith nhìn tốt graph/node/tool call, nhưng metrics production vẫn nên ở Prometheus/Grafana.

Enterprise nhiều microservice

Best solution:

  • OpenTelemetry làm chuẩn trace context xuyên API gateway, RAG service, retriever, reranker, LLM gateway.
  • Prometheus/Grafana cho metrics và alert.
  • ELK/OpenSearch cho logs.
  • LLM-specific trace store như Langfuse nếu security review cho phép, hoặc custom trace store nếu không.

Lý do: vendor-neutral, tích hợp được với platform observability hiện có, không ép toàn bộ tổ chức vào một LLM SaaS.

Dữ liệu nhạy cảm hoặc regulated domain

Best solution:

  • Metadata-only trace mặc định.
  • Hash user/session/query.
  • Redaction trước log/export.
  • Sampling raw content rất thấp, theo allowlist tenant và thời gian hữu hạn.
  • Access control theo vai trò, audit người xem trace.
  • Retention ngắn cho raw content, dài hơn cho aggregate metrics.

Lý do: debug không được đánh đổi bằng việc rò rỉ dữ liệu khách hàng.

Performance-sensitive app

Best solution:

  • Không ghi log đồng bộ nhiều payload lớn trong request path.
  • Batch exporter cho traces.
  • Async/background logging cho trace detail.
  • Metrics dùng label cardinality thấp.
  • Sample success traces, giữ toàn bộ error traces.
  • Đo overhead của instrumentation trong load test.

Target hợp lý: observability overhead p95 nhỏ hơn 5% total latency, trừ khi đang bật debug mode tạm thời.

13. Privacy, Redaction Và Sampling

Redaction

Các nhóm dữ liệu cần xử lý trước khi log:

  • Email, phone, ID number, address.
  • API key, bearer token, cookie, connection string.
  • Customer name, employee id, salary, contract clause nếu domain nhạy cảm.
  • Retrieved context từ document nội bộ.
  • Tool arguments chứa dữ liệu hệ thống.

Redaction nên chạy trước khi dữ liệu rời process, không chỉ trước khi hiển thị trên dashboard.

Sampling

Sampling policy gợi ý:

Loại requestMetadataRaw prompt/context/output
Success bình thường100%1-5% nếu policy cho phép
Timeout/error100%100% redacted hoặc theo allowlist
Thumbs down100%100% redacted hoặc theo allowlist
Tenant nhạy cảm100%0% mặc định
Eval/golden set100%100% vì dùng synthetic hoặc approved data

Sampling không nên áp dụng mù. Error, timeout và negative feedback phải được giữ nhiều hơn success traffic.

Retention

Gợi ý retention:

  • Raw prompt/context/output: 7-30 ngày tùy policy.
  • Redacted trace: 30-90 ngày.
  • Metrics aggregate: 6-18 tháng.
  • Audit log truy cập trace: theo yêu cầu compliance.
  • Eval traces dùng cho regression: versioned lâu hơn, nhưng nên dùng dữ liệu đã được phê duyệt.

14. Production Readiness

Dùng Được Trong Production Không?

Có, observability stack kiểu này dùng được trong production nếu thỏa các điều kiện sau:

  • trace_id xuyên suốt request path và feedback path.
  • Có structured logs, metrics và traces cho từng stage quan trọng.
  • Có dashboard cho p50/p95/p99 latency, TTFT, token usage, cost/request, error rate, empty retrieval và citation failure.
  • Có alert cho latency spike, error spike, cost spike, provider timeout, empty retrieval spike và citation failure spike.
  • Có redaction trước khi ghi log/export trace.
  • Có sampling policy rõ, đặc biệt với raw prompt/context/output.
  • Có retention, access control và audit cho trace store.
  • Có versioning cho prompt, model, embedding model, reranker và index.
  • Có golden set/eval để biến feedback thành regression test.
  • Có performance test chứng minh instrumentation không làm tăng latency quá mức.

Chưa Đủ Production Nếu

  • Chỉ print() prompt và answer ra console.
  • Log raw PII/context vào third-party tool mà chưa có security review.
  • Metrics không có token/cost/TTFT.
  • Trace không lưu prompt/model/index version.
  • Feedback không gắn với trace_id.
  • Prometheus label chứa user_id, query, trace_id hoặc chunk_id.
  • Không có alert và không có owner xử lý alert.
  • Không có retention policy nên log storage tăng vô hạn.

15. Checklist Cuối Bài

  • Mỗi request có trace_id.
  • Mỗi stage có latency riêng.
  • Logs là JSON và có event taxonomy cố định.
  • Metrics có request count, latency, error, TTFT, token, cost, empty retrieval và citation failure.
  • Trace schema lưu prompt/model/index/embedding/reranker version.
  • Feedback endpoint gắn với trace_id.
  • Raw content logging có redaction, sampling và retention.
  • Dashboard có p95 latency, p95 TTFT, cost/request, token/request và quality signals.
  • Alert đầu tiên tập trung vào latency, error, cost, empty retrieval và citation failure.
  • Bạn trả lời được production readiness bằng điều kiện cụ thể, không chỉ nói "có logging".

Tài liệu

Tài liệu này là phần reference cho Day 44. Dùng nó khi cần thiết kế schema, dashboard, alert, privacy policy hoặc review production readiness cho một LLM/RAG app.

1. Architecture Tham Khảo

Client
  -> API Gateway
  -> RAG API
       -> Trace Context
       -> Query Rewrite
       -> Hybrid Retrieval
       -> Reranker
       -> Context Builder
       -> LLM Gateway
       -> Citation Validator
       -> Feedback API
  -> Observability Outputs
       -> Metrics: Prometheus -> Grafana
       -> Logs: JSON -> ELK/OpenSearch
       -> Traces: OpenTelemetry -> Tempo/Jaeger/Datadog/etc.
       -> LLM Trace Store: Langfuse/LangSmith/custom table

Nguyên tắc: app không phụ thuộc vào một tool duy nhất. App phát ra telemetry theo contract ổn định, còn backend lưu trữ có thể thay đổi.

2. Trace Schema Contract

2.1 Required Fields

FieldTypeBắt buộcGhi chú
trace_idstringSinh ở đầu request, trả về client
tenant_idstringKhông dùng làm Prometheus label nếu quá nhiều tenant
user_id_hashstring/nullHash có salt, không log raw user id
routestringVí dụ /query
environmentstringdev, staging, prod
query.raw_hashstringDùng join/debug không cần raw text
query.raw_redactedstring/nullTùy policyChỉ có khi raw logging được phép
retrieval.index_versionstringBắt buộc để rollback/debug
retrieval.candidatesarrayCó thể chỉ lưu top N
rerank.reranker_modelstring/nullNull nếu không rerank
context.chunk_idsarrayDanh sách chunk đưa vào prompt
context.context_tokensnumberToken budget control
generation.modelstringModel thực tế đã gọi
generation.prompt_versionstringPrompt template version
generation.input_tokensnumberTừ provider hoặc tokenizer estimate
generation.output_tokensnumberTừ provider hoặc tokenizer estimate
generation.ttft_msnumber/nullNull nếu không streaming và không đo được
generation.latency_msnumberModel call latency
generation.estimated_cost_usdnumberCost estimate theo pricing table version
validation.citation_validbooleanQuality signal
result.statusstringsuccess, error, timeout, blocked
result.total_latency_msnumberEnd-to-end latency

2.2 Candidate Chunk Fields

FieldTypeGhi chú
ranknumberRank trước hoặc sau rerank, ghi rõ context
chunk_idstringStable id, không chứa raw content
document_idstringDocument id hoặc hash
source_uri_hashstringHash nếu source URI nhạy cảm
score_densenumber/nullDense similarity
score_sparsenumber/nullBM25/sparse score
score_rrfnumber/nullHybrid merge score
rerank_scorenumber/nullCross-encoder score
acl_matchedbooleanPermission filter
metadataobjectChỉ metadata an toàn

Không nên lưu toàn bộ chunk text trong candidate list mặc định. Nếu cần debug, lưu bản redacted hoặc lưu reference để người có quyền mở document gốc.

3. Event Catalog

EventLevelPayload tối thiểu
query_receivedINFOtrace_id, tenant_id, user_id_hash, query_hash, query_length_chars
query_rewrittenINFOtrace_id, rewrite_enabled, rewritten_query_hash, latency_ms
retrieval_startedDEBUG/INFOtrace_id, strategy, top_k, index_version
retrieval_completedINFOtrace_id, candidate_count, empty_retrieval, latency_ms
rerank_completedINFOtrace_id, candidate_count, selected_count, latency_ms, top_score
context_builtINFOtrace_id, chunk_count, context_tokens, truncated
generation_startedINFOtrace_id, provider, model, prompt_version
first_token_receivedINFOtrace_id, model, ttft_ms
generation_completedINFOtrace_id, input_tokens, output_tokens, latency_ms, cost
citation_validatedINFO/WARNtrace_id, valid, failure_reason
guardrail_blockedWARNtrace_id, policy, action, stage
feedback_receivedINFOtrace_id, rating, reason, user_id_hash
request_failedERRORtrace_id, stage, error_type, retryable

Quy ước payload:

  • Dùng snake_case.
  • Timestamp UTC.
  • trace_id luôn có.
  • Error có error_type, stage, retryable.
  • Không log bearer token, API key, cookie, connection string.

4. Metrics Naming

4.1 Service Metrics

MetricTypeLabelsÝ nghĩa
rag_request_totalCounterroute, statusTổng request
rag_requests_in_flightGaugerouteRequest đang xử lý
rag_stage_latency_secondsHistogramstageLatency theo stage
rag_request_latency_secondsHistogramrouteEnd-to-end latency
rag_error_totalCounterstage, error_typeLỗi theo stage

4.2 LLM Metrics

MetricTypeLabelsÝ nghĩa
llm_ttft_secondsHistogrammodelTime to first token
llm_request_totalCountermodel, statusModel calls
llm_token_totalCountermodel, typeInput/output tokens
llm_cost_usd_totalCountermodelEstimated cost
llm_retry_totalCountermodel, reasonRetry count
llm_rate_limit_totalCounterprovider, modelProvider rate limit

4.3 RAG Quality Metrics

MetricTypeLabelsÝ nghĩa
rag_empty_retrieval_totalCounterindex_versionRetrieval trả 0 chunk
rag_context_truncated_totalCounterprompt_versionContext bị cắt
rag_citation_invalid_totalCounterreasonCitation sai
rag_feedback_totalCounterrating, reasonUser feedback
rag_no_answer_totalCounterreasonApp từ chối hoặc không đủ context

4.4 Label Cardinality Rules

Không dùng các field sau làm metric label:

  • trace_id.
  • user_id hoặc user_id_hash.
  • Raw query.
  • chunk_id.
  • document_id nếu số lượng lớn.
  • session_id.
  • Error message tự do.

Các field này nên nằm trong logs/traces, không nằm trong Prometheus labels.

5. Dashboard Panels

Dashboard tối thiểu cho Day 44:

PanelQuery/nguồnMục tiêu
Request raterate(rag_request_total[5m])Traffic
Error ratesum(rate(rag_request_total{status!="success"}[5m])) / sum(rate(rag_request_total[5m]))Reliability
p95 total latencyhistogram quantile trên rag_request_latency_secondsSLA
p95 stage latencyhistogram quantile theo rag_stage_latency_secondsBottleneck
p95 TTFThistogram quantile trên llm_ttft_secondsUX streaming
Input/output tokens per minuterate(llm_token_total[5m])Token budget
Cost per hourincrease(llm_cost_usd_total[1h])Budget
Empty retrieval raterate(rag_empty_retrieval_total[5m]) / rate(rag_request_total[5m])Retrieval health
Citation failure raterate(rag_citation_invalid_total[5m]) / rate(rag_request_total[5m])Answer trust
Feedback down raterate(rag_feedback_total{rating="down"}[1h]) / rate(rag_feedback_total[1h])Quality trend

Dashboard tốt phải có filter theo environment, route, model, prompt version và index version. Với tenant nhiều, chỉ filter tenant trên trace/log backend hoặc metric backend đã được thiết kế để chịu cardinality đó.

6. Alert Rules Gợi Ý

Ví dụ PromQL ở mức tham khảo:

groups:
  - name: rag-observability
    rules:
      - alert: RAGHighErrorRate
        expr: |
          sum(rate(rag_request_total{status!="success"}[5m]))
          /
          sum(rate(rag_request_total[5m])) > 0.03
        for: 10m
        labels:
          severity: page
        annotations:
          summary: "RAG error rate > 3%"

      - alert: RAGHighP95Latency
        expr: |
          histogram_quantile(
            0.95,
            sum(rate(rag_request_latency_seconds_bucket[5m])) by (le)
          ) > 8
        for: 10m
        labels:
          severity: ticket
        annotations:
          summary: "RAG p95 latency > 8s"

      - alert: LLMHighTTFT
        expr: |
          histogram_quantile(
            0.95,
            sum(rate(llm_ttft_seconds_bucket[5m])) by (le, model)
          ) > 3
        for: 10m
        labels:
          severity: ticket
        annotations:
          summary: "LLM p95 TTFT > 3s"

      - alert: RAGCostSpike
        expr: |
          sum(increase(llm_cost_usd_total[1h]))
          >
          2 * (sum(increase(llm_cost_usd_total[24h])) / 24)
        for: 15m
        labels:
          severity: ticket
        annotations:
          summary: "LLM hourly cost is above normal baseline"

      - alert: RAGCitationFailureSpike
        expr: |
          sum(rate(rag_citation_invalid_total[15m]))
          /
          sum(rate(rag_request_total[15m])) > 0.05
        for: 15m
        labels:
          severity: ticket
        annotations:
          summary: "Citation failure rate > 5%"

Điều chỉnh threshold theo baseline thật. Alert không có owner và runbook thì chỉ tạo noise.

7. Storage Schema Tham Khảo

7.1 Trace Table

CREATE TABLE rag_traces (
    trace_id TEXT PRIMARY KEY,
    tenant_id TEXT NOT NULL,
    user_id_hash TEXT,
    session_id_hash TEXT,
    route TEXT NOT NULL,
    environment TEXT NOT NULL,
    query_hash TEXT NOT NULL,
    query_redacted TEXT,
    prompt_version TEXT NOT NULL,
    model TEXT NOT NULL,
    embedding_model TEXT,
    reranker_model TEXT,
    index_version TEXT NOT NULL,
    input_tokens INTEGER NOT NULL DEFAULT 0,
    output_tokens INTEGER NOT NULL DEFAULT 0,
    estimated_cost_usd NUMERIC(12, 6) NOT NULL DEFAULT 0,
    ttft_ms INTEGER,
    total_latency_ms INTEGER NOT NULL,
    status TEXT NOT NULL,
    error_type TEXT,
    citation_valid BOOLEAN,
    raw_trace JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_rag_traces_created_at ON rag_traces (created_at DESC);
CREATE INDEX idx_rag_traces_tenant_created ON rag_traces (tenant_id, created_at DESC);
CREATE INDEX idx_rag_traces_versions ON rag_traces (prompt_version, model, index_version);
CREATE INDEX idx_rag_traces_status ON rag_traces (status, error_type);

7.2 Feedback Table

CREATE TABLE rag_feedback (
    id BIGSERIAL PRIMARY KEY,
    trace_id TEXT NOT NULL REFERENCES rag_traces(trace_id),
    tenant_id TEXT NOT NULL,
    user_id_hash TEXT,
    rating TEXT NOT NULL CHECK (rating IN ('up', 'down')),
    reason TEXT NOT NULL,
    comment_hash TEXT,
    comment_redacted TEXT,
    triage_status TEXT NOT NULL DEFAULT 'new',
    triage_owner TEXT,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_rag_feedback_trace ON rag_feedback (trace_id);
CREATE INDEX idx_rag_feedback_rating_created ON rag_feedback (rating, created_at DESC);

Nếu dùng data warehouse, có thể flatten một số field thường query và giữ raw_trace làm JSON.

8. Tooling Decision Matrix

ContextTooling khuyến nghịLý do
Capstone cá nhânJSON logs + SQLite/Postgres trace table + report scriptDễ demo, ít vận hành
MVP nội bộPrometheus/Grafana + OpenTelemetry + structured logsĐủ SLO và debug
RAG app cần prompt/feedback UI nhanhLangfuse + Prometheus/GrafanaLLM-specific trace và cost workflow
LangChain/LangGraph appLangSmith + Prometheus/GrafanaTrace chain/agent tốt
Enterprise platformOpenTelemetry + Prometheus/Grafana + ELK/OpenSearch + optional LLM trace storeHợp chuẩn platform
Regulated dataSelf-host hoặc custom trace store, metadata-only defaultKiểm soát data residency

Best default cho đa số team: OpenTelemetry cho traces, Prometheus/Grafana cho metrics, structured JSON logs cho ELK/OpenSearch, và LLM-specific trace store chỉ dùng khi đã qua security review.

9. Privacy Policy Template

9.1 Data Classification

DataClassificationDefault action
Trace id, latency, statusOperational metadataLog 100%
Model, prompt version, index versionOperational metadataLog 100%
Token usage, costBilling metadataLog 100%
User idPersonal dataHash with salt
Query textPotential PIIHash, redact, sample raw only when allowed
Retrieved contextPotential confidential dataStore chunk ids, not raw text
Answer textPotential PII/confidentialRedact and sample
Feedback commentPotential PIIHash, redact, length limit
API keys/tokensSecretNever log

9.2 Redaction Requirements

  • Redact trước khi ghi log, export trace hoặc gửi sang SaaS.
  • Redaction phải chạy trên prompt, context, output, tool args và feedback comment.
  • Secret scanning rule phải bắt bearer token, API key, cookie, password, connection string.
  • Không dựa vào UI masking làm lớp bảo vệ duy nhất.
  • Có test cho redaction với email, phone, ID number và secret format của công ty.

9.3 Sampling Requirements

  • Metadata trace: 100%.
  • Error/timeout/blocked: 100%.
  • Negative feedback: 100%.
  • Success raw content: 0-5% tùy data policy.
  • Regulated tenant: raw content 0% mặc định.
  • Eval/golden set: 100% nếu dữ liệu đã được approved.

9.4 Access Control

  • Developer xem được metadata và redacted trace.
  • Support chỉ xem trace của tenant được phân quyền.
  • Security/admin xem audit log truy cập trace.
  • Raw content nếu có phải cần quyền riêng và có expiry.
  • Mọi truy cập trace production nên có audit event.

10. Runbook Incident

10.1 Latency Spike

  1. Kiểm tra p95 total latency và stage latency.
  2. Nếu generation tăng: kiểm tra provider status, model, token/request, retry/rate limit.
  3. Nếu retrieval tăng: kiểm tra vector DB latency, DB connection pool, index size, filter.
  4. Nếu rerank tăng: kiểm tra candidate count, reranker model, CPU/GPU queue.
  5. Nếu context_build tăng: kiểm tra chunk count, tokenizer, document metadata.
  6. Rollback prompt/model/index nếu spike gắn với version mới.

10.2 Cost Spike

  1. So sánh token/request theo prompt version và route.
  2. Kiểm tra context truncation và selected chunk count.
  3. Kiểm tra model router có chọn model đắt bất thường không.
  4. Kiểm tra retry storm hoặc timeout retry.
  5. Kiểm tra traffic theo tenant/feature.
  6. Tạm bật budget guardrail hoặc model fallback nếu cần.

10.3 Citation Failure Spike

  1. Lọc trace có citation_valid=false.
  2. Kiểm tra selected chunk ids có chứa source được cite không.
  3. Kiểm tra prompt version có thay đổi citation format không.
  4. Kiểm tra parser citation có regression không.
  5. Kiểm tra index version và metadata source_id/chunk_id.
  6. Thêm case vào golden set trước khi fix.

10.4 Negative Feedback Spike

  1. Join feedback với trace theo trace_id.
  2. Phân loại reason: wrong answer, wrong source, missing context, too slow, unsafe.
  3. Với wrong source: xem retrieval/rerank/citation.
  4. Với missing context: xem ingestion/index/ACL/chunking.
  5. Với too slow: xem TTFT và stage latency.
  6. Tạo regression set từ các trace đã triage.

11. Release Checklist

  • Có dashboard staging và production.
  • Alert có owner và runbook.
  • Prompt/model/index version xuất hiện trong trace.
  • Redaction tests pass.
  • Không có Prometheus high-cardinality labels.
  • Feedback endpoint hoạt động và join được với trace.
  • Golden set chạy trước release.
  • Release note có thay đổi prompt/model/index.
  • Có rollback plan cho prompt/model/index.
  • Cost budget và rate limit đã cấu hình.

12. Nguồn Chính Thức Nên Đọc


Bài tập

Mục tiêu bài tập: biến một RAG API đang chạy được thành một service có observability đủ để debug latency, quality, token usage, cost/request và feedback.

Bạn có thể dùng mini-project Day 40 hoặc RAG app riêng. Không cần đổi model/provider nếu app hiện tại đã chạy được.

1. Yêu Cầu Đầu Ra

Sau lab, repository của bạn cần có:

  • trace_id được sinh ở đầu request và trả về trong response.
  • Structured JSON logs cho từng stage.
  • Metrics endpoint /metrics theo Prometheus format.
  • Trace record có retrieval, rerank, context, generation, citation validation và feedback.
  • Token usage, cost/request và TTFT nếu app dùng streaming.
  • Feedback endpoint POST /feedback.
  • Report cho ít nhất 30 câu hỏi golden set.
  • Production readiness answer: dùng được trong production không, nếu có thì cần điều kiện gì.

2. Bước 1: Thêm Trace ID

Trong endpoint /query, tạo trace_id ngay khi nhận request:

import uuid


def new_trace_id() -> str:
    return f"tr_{uuid.uuid4().hex}"

Response nên có:

{
  "trace_id": "tr_abc123",
  "answer": "...",
  "citations": [],
  "usage": {
    "input": 1200,
    "output": 180
  },
  "estimated_cost_usd": "0.000768"
}

Checklist:

  • trace_id xuất hiện trong mọi log event.
  • trace_id trả về client.
  • Feedback dùng lại trace_id.
  • Error response cũng trả trace_id.

3. Bước 2: Structured JSON Logs

Thêm helper:

import json
import logging
import time
from typing import Any

logger = logging.getLogger("rag")


def log_event(event: str, **fields: Any) -> None:
    logger.info(
        json.dumps(
            {
                "event": event,
                "timestamp_ms": int(time.time() * 1000),
                **fields,
            },
            ensure_ascii=False,
            default=str,
        )
    )

Log tối thiểu:

log_event("query_received", trace_id=trace_id, query_hash=query_hash, top_k=top_k)
log_event("retrieval_completed", trace_id=trace_id, candidate_count=len(chunks), latency_ms=...)
log_event("rerank_completed", trace_id=trace_id, selected_count=len(selected), latency_ms=...)
log_event("context_built", trace_id=trace_id, context_tokens=context_tokens, truncated=truncated)
log_event("generation_completed", trace_id=trace_id, input_tokens=..., output_tokens=..., cost=...)
log_event("citation_validated", trace_id=trace_id, valid=True, failure_reason=None)

Không log raw query/context nếu chưa có redaction.

4. Bước 3: Đo Latency Theo Stage

Thêm context manager:

from contextlib import contextmanager
import time


@contextmanager
def timed(stage: str, latency_ms: dict[str, int]):
    start = time.perf_counter()
    try:
        yield
    finally:
        latency_ms[stage] = round((time.perf_counter() - start) * 1000)

Dùng trong pipeline:

latency_ms = {}
request_started = time.perf_counter()

with timed("retrieval", latency_ms):
    chunks = retrieve(query)

with timed("rerank", latency_ms):
    selected = rerank(query, chunks)

with timed("context_build", latency_ms):
    context = build_context(selected)

with timed("generation", latency_ms):
    answer = generate(context, query)

Report cuối request:

log_event(
    "query_completed",
    trace_id=trace_id,
    total_latency_ms=round((time.perf_counter() - request_started) * 1000),
    stage_latency_ms=latency_ms,
)

Nếu có code async, context manager vẫn dùng được nếu block bên trong await không yêu cầu asynccontextmanager.

5. Bước 4: Prometheus Metrics

Cài dependency:

pip install prometheus-client

Thêm metrics:

from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response

REQUESTS = Counter("rag_request_total", "Total RAG requests", ["route", "status"])
STAGE_LATENCY = Histogram("rag_stage_latency_seconds", "Latency by stage", ["stage"])
TOKENS = Counter("llm_token_total", "LLM token usage", ["model", "type"])
COST = Counter("llm_cost_usd_total", "LLM cost in USD", ["model"])
IN_FLIGHT = Gauge("rag_requests_in_flight", "Requests in flight", ["route"])


@app.get("/metrics")
def metrics():
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

Ghi metrics:

REQUESTS.labels(route="/query", status="success").inc()
STAGE_LATENCY.labels(stage="retrieval").observe(latency_ms["retrieval"] / 1000)
TOKENS.labels(model=model, type="input").inc(input_tokens)
TOKENS.labels(model=model, type="output").inc(output_tokens)
COST.labels(model=model).inc(float(cost_usd))

Kiểm tra:

curl http://localhost:8000/metrics | grep rag_

6. Bước 5: Token Usage Và Cost/Request

Tạo pricing table versioned:

from decimal import Decimal

MODEL_PRICE_USD_PER_1M = {
    "gpt-4.1-mini": {"input": Decimal("0.40"), "output": Decimal("1.60")},
}


def estimate_cost_usd(model: str, input_tokens: int, output_tokens: int) -> Decimal:
    price = MODEL_PRICE_USD_PER_1M[model]
    return (
        Decimal(input_tokens) * price["input"] / Decimal(1_000_000)
        + Decimal(output_tokens) * price["output"] / Decimal(1_000_000)
    ).quantize(Decimal("0.000001"))

Yêu cầu:

  • Lưu input_tokens.
  • Lưu output_tokens.
  • Lưu estimated_cost_usd.
  • Lưu pricing_table_version nếu pricing có thể đổi.
  • Nếu provider không trả usage, ghi rõ usage_source="estimated".

7. Bước 6: Đo TTFT

Nếu endpoint stream token, đo time to first token:

import time


async def stream_answer(prompt: str, model: str, trace_id: str):
    started = time.perf_counter()
    first_token_seen = False

    async for token in llm_client.stream(prompt=prompt, model=model):
        if not first_token_seen:
            ttft_ms = round((time.perf_counter() - started) * 1000)
            log_event("first_token_received", trace_id=trace_id, model=model, ttft_ms=ttft_ms)
            first_token_seen = True
        yield token.text

Nếu app không streaming, ghi ttft_ms=null và đo generation.latency_ms. Không bịa TTFT từ total latency.

8. Bước 7: Trace Record

Tạo một object trace và lưu cuối request:

trace_record = {
    "trace_id": trace_id,
    "tenant_id": tenant_id,
    "user_id_hash": user_id_hash,
    "query": {
        "raw_hash": query_hash,
        "raw_redacted": query_redacted,
        "length_chars": len(query),
    },
    "retrieval": {
        "strategy": "hybrid",
        "index_version": index_version,
        "top_k": top_k,
        "latency_ms": latency_ms["retrieval"],
        "candidates": candidate_summaries,
    },
    "rerank": {
        "enabled": True,
        "reranker_model": reranker_model,
        "latency_ms": latency_ms["rerank"],
        "selected_count": len(selected),
    },
    "context": {
        "chunk_ids": [chunk["chunk_id"] for chunk in selected],
        "context_tokens": context_tokens,
        "truncated": truncated,
    },
    "generation": {
        "model": model,
        "prompt_version": prompt_version,
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "ttft_ms": ttft_ms,
        "latency_ms": latency_ms["generation"],
        "estimated_cost_usd": str(cost_usd),
    },
    "validation": {
        "citation_valid": citation_valid,
        "citation_failure_reason": citation_failure_reason,
    },
    "result": {
        "status": "success",
        "total_latency_ms": total_latency_ms,
    },
}

Lưu vào Postgres, SQLite, JSONL hoặc Langfuse/LangSmith tùy stack. Với capstone, JSONL hoặc SQLite là đủ nếu report đọc được.

9. Bước 8: Feedback Endpoint

Contract:

POST /feedback
{
  "trace_id": "tr_abc123",
  "rating": "down",
  "reason": "wrong_source",
  "comment": "Answer cited policy 2024, but the question asked 2026"
}

Pydantic model:

from typing import Literal
from pydantic import BaseModel, Field


class FeedbackRequest(BaseModel):
    trace_id: str
    rating: Literal["up", "down"]
    reason: Literal[
        "helpful",
        "wrong_answer",
        "wrong_source",
        "missing_context",
        "too_slow",
        "unsafe",
        "other",
    ]
    comment: str | None = Field(default=None, max_length=2000)

Checklist:

  • Validate trace_id tồn tại.
  • Redact/hash comment.
  • Lưu rating, reason, triage_status.
  • Log event feedback_received.
  • Có report feedback theo reason.

10. Bước 9: Privacy, Redaction, Sampling

Implement tối thiểu:

import hashlib
import os
import re

SALT = os.environ["OBSERVABILITY_HASH_SALT"]


def hash_value(value: str) -> str:
    return "sha256:" + hashlib.sha256(f"{SALT}:{value}".encode()).hexdigest()


def redact_text(text: str) -> str:
    text = re.sub(r"[\w.+-]+@[\w-]+\.[\w.-]+", "[EMAIL]", text)
    text = re.sub(r"\b(?:\+?84|0)(?:\d[\s.-]?){8,10}\b", "[PHONE]", text)
    text = re.sub(r"\b\d{9,12}\b", "[ID_NUMBER]", text)
    return text

Sampling policy cần nộp:

Request typeMetadata traceRaw content
Success100%0-5%
Error/timeout100%100% redacted hoặc theo allowlist
Thumbs down100%100% redacted hoặc theo allowlist
Sensitive tenant100%0%

11. Bước 10: Chạy Golden Set

Chuẩn bị golden_questions.jsonl với ít nhất 30 câu:

{"id":"q001","query":"Chính sách nghỉ phép năm 2026 là gì?","expected_source":"policy_2026"}
{"id":"q002","query":"Nhân viên thử việc có được nghỉ phép không?","expected_source":"policy_hr"}

Runner đơn giản:

import json
import requests

with open("golden_questions.jsonl", "r", encoding="utf-8") as f:
    for line in f:
        item = json.loads(line)
        response = requests.post(
            "http://localhost:8000/query",
            json={"query": item["query"], "top_k": 20},
            timeout=30,
        )
        print(json.dumps({"id": item["id"], **response.json()}, ensure_ascii=False))

Chạy:

python run_golden_set.py > traces/golden_run_day44.jsonl

Nếu bạn không tạo file runner riêng trong repo, có thể chạy bằng notebook hoặc script tạm, nhưng report phải có số liệu.

12. Bước 11: Report Bắt Buộc

Tạo bảng:

MetricGiá trị
Total queries30
Success rate
p50 total latency
p95 total latency
p95 retrieval latency
p95 rerank latency
p95 generation latency
p95 TTFT
Average input tokens
Average output tokens
Average cost/request
Empty retrieval rate
Citation failure rate
Thumbs down rate

Top slowest:

RankTrace IDQuery IDTotal latencyBottleneck stage
1
2
3
4
5

Top highest-cost:

RankTrace IDQuery IDInput tokensOutput tokensCost
1
2
3
4
5

Error classification:

Error classCountVí dụ traceFix đề xuất
Retrieval
Rerank
Context builder
Generation
Citation
Timeout
Guardrail

13. Bước 12: Viết Production Readiness Answer

Trả lời theo format sau:

Dùng được trong production không?

Có, nhưng chỉ ở mức internal beta nếu thỏa:
- Observability: mọi request có trace_id, stage latency, token usage, cost/request và error type.
- Privacy: raw query/context/output được redact trước khi log; raw trace chỉ lưu theo sampling policy.
- Cost control: dashboard có cost/request, cost/day, token/request và alert cost spike.
- Alert/runbook: p95 latency, error rate, timeout rate, citation failure và empty retrieval đều có owner.
- Eval/feedback loop: feedback gắn trace_id và golden set chạy trước khi đổi prompt/model/index.
- Performance overhead: instrumentation overhead dưới 5% p95 latency so với baseline.

Chưa được public production nếu còn thiếu:
- Chưa có access control cho trace store, chưa có retention policy, chưa có redaction test tự động,
  hoặc chưa có load test chứng minh overhead của observability.

Ví dụ câu trả lời tốt:

Có thể dùng cho internal beta. Hệ thống đã có trace_id, JSON logs, Prometheus metrics,
token/cost accounting, feedback endpoint và dashboard p95 latency/cost/citation failure.
Để lên public production cần thêm redaction test tự động, retention policy, access control
cho trace store, alert có owner, load test chứng minh overhead dưới 5%, và golden set chạy
trước mỗi lần đổi prompt/model/index.

14. Rubric Chấm Điểm

Hạng mụcĐiểm
Trace schema đủ retrieval/rerank/context/generation/validation20
Logs JSON có event taxonomy và trace_id15
Metrics có latency, error, token, cost, TTFT15
Feedback loop gắn trace và triage reason10
Privacy/redaction/sampling policy15
Report golden set với slowest/highest-cost/error classification15
Production readiness answer rõ điều kiện10

Tổng: 100 điểm.

15. Lỗi Thường Gặp

  • Chỉ log final answer, không log retrieved chunks.
  • Không có prompt/model/index version trong trace.
  • Lưu raw query/context/output mà không redact.
  • Dùng trace_id làm Prometheus label.
  • Chỉ đo total latency, không đo stage latency.
  • Không đo token usage và cost/request.
  • Feedback không join được với trace.
  • Không phân biệt empty retrieval, invalid citation, timeout và provider error.
  • Alert quá nhiều nhưng không có owner hoặc runbook.
  • Báo "production ready" chỉ vì endpoint chạy được.

16. Deliverable Cuối Cùng

Nộp các phần sau:

  1. Link hoặc screenshot /metrics.
  2. 3-5 log events mẫu đã redact.
  3. 1 trace JSON hoàn chỉnh.
  4. Report 30 golden queries.
  5. Top 5 slowest queries.
  6. Top 5 highest-cost queries.
  7. Bảng phân loại lỗi.
  8. Sampling/redaction policy.
  9. 3 alert production đầu tiên.
  10. Production readiness answer.