Published on

Day 42: Model Serving Với FastAPI, SSE Và Production Boundary

Authors

1. Mục tiêu bài học

Sau Day 42, bạn cần build được một serving layer cho model hoặc RAG pipeline có các đặc điểm sau:

  • Có API contract rõ ràng, không chỉ là một hàm predict() trả string.
  • Có request validation, response schema, error schema và trace_id.
  • /health cho process health và /ready cho model/runtime readiness.
  • Có endpoint non-streaming cho automation và endpoint streaming SSE cho chat UI.
  • Có timeout, input limit, output limit, rate limit và concurrency limit.
  • Biết khi nào FastAPI là đủ, khi nào nên đặt FastAPI trước vLLM, TGI, Triton hoặc service chuyên dụng khác.
  • Trả lời được: "Dùng được trong production không? Nếu có thì cần điều kiện gì?"

Trong bài này, "model" có thể là:

  • Classical ML model: classifier, ranker, regressor.
  • Deep learning model: PyTorch/TensorFlow/ONNX.
  • LLM local hoặc managed provider.
  • RAG pipeline từ Day 40: retrieve, rerank, build context, generate, validate citation.

2. Mental model: training artifact khác serving contract

Training tạo artifact:

dataset + code + params -> model artifact + metrics + registry version

Serving biến artifact thành product boundary:

client
  -> API gateway
  -> auth/rate limit
  -> request validation
  -> timeout/concurrency control
  -> model runtime hoặc RAG pipeline
  -> response/stream
  -> structured logs + metrics + trace

Một model có accuracy tốt vẫn có thể fail production nếu serving layer thiếu boundary:

Vấn đềHậu quả
Không validate inputPrompt quá dài, payload sai schema, request gây OOM hoặc chi phí tăng
Không timeoutRequest treo, worker bị giữ, queue tăng và toàn service chậm
Không concurrency limitGPU hết memory do quá nhiều request đồng thời
Không rate limitMột tenant hoặc một client ăn hết quota
Không trace idKhông debug được request lỗi
Không version responseKhông biết câu trả lời đến từ model/pipeline nào
Không streamingChat UI có time-to-first-token kém, user tưởng service chết

3. Serving contract tối thiểu

Endpoint đề xuất cho model hoặc RAG pipeline:

EndpointMục đíchGhi chú production
GET /healthProcess còn sống khôngKhông gọi dependency nặng, dùng cho liveness probe
GET /readyService sẵn sàng nhận traffic khôngKiểm tra model loaded, vector DB/model server reachable
GET /models/currentTrả model/runtime/pipeline versionBắt buộc để debug rollback và A/B test
POST /queryNon-streaming inferenceDùng cho batch job, automation, test, eval
GET /query/stream hoặc POST /query/streamStreaming token/eventDùng cho chat UI, SSE một chiều server-to-client

Request contract nên giới hạn rõ:

{
  "question": "Nhân viên full-time có bao nhiêu ngày nghỉ phép năm?",
  "tenant_id": "demo",
  "top_k": 6,
  "max_output_tokens": 512,
  "include_trace": false
}

Response contract nên trả đủ dữ liệu để user dùng được và engineer debug được:

{
  "answer": "Nhân viên full-time có 12 ngày nghỉ phép năm [S1].",
  "citations": [
    {
      "source_id": "S1",
      "document_id": "doc_hr_2026",
      "chunk_id": "demo:doc_hr_2026:v1:00003",
      "title": "HR Policy 2026",
      "page_start": 3,
      "page_end": 3
    }
  ],
  "trace_id": "tr_20260510_001",
  "latency_ms": {
    "retrieval": 42,
    "rerank": 126,
    "generation": 870,
    "total": 1041
  },
  "model_version": "rag-api-v1",
  "finish_reason": "stop"
}

Error contract không trả stack trace ra client:

{
  "error": {
    "code": "MODEL_TIMEOUT",
    "message": "Model runtime timed out. Please retry.",
    "trace_id": "tr_20260510_001",
    "retryable": true
  }
}

4. FastAPI service gần production

Ví dụ dưới đây dùng fake runtime để bài học chạy được local. Khi đưa vào project thật, thay RagRuntime bằng adapter gọi RAG pipeline, vLLM/TGI OpenAI-compatible endpoint, Triton gRPC/HTTP, BentoML service hoặc managed LLM API.

Điểm production-style trong code:

  • Load runtime một lần trong lifespan, không load model trong từng request.
  • Pydantic schema dùng extra="forbid" để reject field lạ.
  • Mọi response có trace_id, model_version, finish_reason.
  • Có timeout cho non-streaming và streaming.
  • Có rate limit và concurrency limit ở gateway.
  • SSE event có token, done, error.
  • Streaming generator kiểm tra client disconnect.
  • Log có latency, tenant, input length và error code.
from __future__ import annotations

import asyncio
import json
import logging
import time
import uuid
from collections import defaultdict, deque
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, Literal

from fastapi import FastAPI, HTTPException, Query, Request, status
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, ConfigDict, Field


logger = logging.getLogger("model-serving")
logging.basicConfig(level=logging.INFO)


@dataclass(frozen=True)
class Settings:
    service_name: str = "day-42-model-serving"
    model_version: str = "rag-api-v1"
    request_timeout_s: float = 20.0
    stream_timeout_s: float = 60.0
    queue_timeout_s: float = 0.2
    max_concurrent_requests: int = 8
    rate_limit_window_s: int = 60
    rate_limit_requests: int = 60


settings = Settings()


class Citation(BaseModel):
    source_id: str
    document_id: str
    chunk_id: str
    title: str
    page_start: int | None = None
    page_end: int | None = None


class QueryRequest(BaseModel):
    model_config = ConfigDict(extra="forbid")

    question: str = Field(..., min_length=1, max_length=2000)
    tenant_id: str = Field("demo", min_length=1, max_length=64)
    top_k: int = Field(6, ge=1, le=20)
    max_output_tokens: int = Field(512, ge=16, le=2048)
    include_trace: bool = False


class QueryResponse(BaseModel):
    answer: str
    citations: list[Citation]
    trace_id: str
    latency_ms: dict[str, int]
    model_version: str
    finish_reason: Literal["stop", "timeout", "error"]


class ErrorBody(BaseModel):
    code: str
    message: str
    trace_id: str
    retryable: bool


class ErrorResponse(BaseModel):
    error: ErrorBody


class InMemoryRateLimiter:
    """Process-local limiter for local lab. Use Redis/API Gateway for production."""

    def __init__(self, max_requests: int, window_s: int) -> None:
        self.max_requests = max_requests
        self.window_s = window_s
        self._hits: dict[str, deque[float]] = defaultdict(deque)
        self._lock = asyncio.Lock()

    async def allow(self, key: str) -> bool:
        now = time.monotonic()
        async with self._lock:
            bucket = self._hits[key]
            while bucket and now - bucket[0] > self.window_s:
                bucket.popleft()
            if len(bucket) >= self.max_requests:
                return False
            bucket.append(now)
            return True


class RagRuntime:
    def __init__(self, model_version: str) -> None:
        self.model_version = model_version
        self.loaded = False

    async def startup(self) -> None:
        # Replace this with model load, warmup request, DB connection, or client init.
        await asyncio.sleep(0.05)
        self.loaded = True

    async def shutdown(self) -> None:
        self.loaded = False

    async def query(self, payload: QueryRequest, trace_id: str) -> QueryResponse:
        started = time.perf_counter()
        await asyncio.sleep(0.08)
        answer = (
            "Đây là câu trả lời mẫu từ runtime. "
            "Trong project thật, thay phần này bằng RAG hoặc model server [S1]."
        )
        generation_ms = int((time.perf_counter() - started) * 1000)
        return QueryResponse(
            answer=answer,
            citations=[
                Citation(
                    source_id="S1",
                    document_id="demo-doc",
                    chunk_id="demo-doc:v1:00001",
                    title="Demo Document",
                )
            ],
            trace_id=trace_id,
            latency_ms={"generation": generation_ms},
            model_version=self.model_version,
            finish_reason="stop",
        )

    async def stream(self, payload: QueryRequest, trace_id: str) -> AsyncIterator[str]:
        tokens = [
            "Đây ",
            "là ",
            "streaming ",
            "response ",
            "mẫu ",
            "qua ",
            "SSE.",
        ]
        for token in tokens[: payload.max_output_tokens]:
            await asyncio.sleep(0.05)
            yield token


def sse(event: str, data: dict[str, Any]) -> str:
    return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"


def client_key(request: Request, tenant_id: str) -> str:
    api_key = request.headers.get("x-api-key")
    forwarded_for = request.headers.get("x-forwarded-for")
    host = forwarded_for or (request.client.host if request.client else "unknown")
    identity = api_key or host
    return f"{tenant_id}:{identity}"


def error_detail(code: str, message: str, trace_id: str, retryable: bool) -> dict[str, Any]:
    return {
        "error": {
            "code": code,
            "message": message,
            "trace_id": trace_id,
            "retryable": retryable,
        }
    }


async def acquire_slot(app: FastAPI, trace_id: str) -> None:
    try:
        await asyncio.wait_for(app.state.semaphore.acquire(), timeout=settings.queue_timeout_s)
    except asyncio.TimeoutError as exc:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=error_detail(
                code="CONCURRENCY_LIMIT",
                message="Too many concurrent requests. Please retry later.",
                trace_id=trace_id,
                retryable=True,
            ),
        ) from exc


@asynccontextmanager
async def lifespan(app: FastAPI):
    runtime = RagRuntime(settings.model_version)
    await runtime.startup()
    app.state.runtime = runtime
    app.state.semaphore = asyncio.Semaphore(settings.max_concurrent_requests)
    app.state.rate_limiter = InMemoryRateLimiter(
        max_requests=settings.rate_limit_requests,
        window_s=settings.rate_limit_window_s,
    )
    yield
    await runtime.shutdown()


app = FastAPI(title=settings.service_name, version="1.0.0", lifespan=lifespan)


@app.middleware("http")
async def add_trace_id(request: Request, call_next):
    request.state.trace_id = request.headers.get("x-trace-id", f"tr_{uuid.uuid4().hex}")
    response = await call_next(request)
    response.headers["x-trace-id"] = request.state.trace_id
    return response


@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    trace_id = getattr(request.state, "trace_id", f"tr_{uuid.uuid4().hex}")
    if isinstance(exc.detail, dict) and "error" in exc.detail:
        body = exc.detail
    else:
        body = error_detail(
            code="HTTP_ERROR",
            message=str(exc.detail),
            trace_id=trace_id,
            retryable=False,
        )
    return JSONResponse(status_code=exc.status_code, content=body)


@app.exception_handler(Exception)
async def unexpected_exception_handler(request: Request, exc: Exception):
    trace_id = getattr(request.state, "trace_id", f"tr_{uuid.uuid4().hex}")
    logger.exception("unhandled_error", extra={"trace_id": trace_id})
    return JSONResponse(
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
        content=error_detail(
            code="INTERNAL_ERROR",
            message="Unexpected model serving error.",
            trace_id=trace_id,
            retryable=False,
        ),
    )


@app.get("/health")
async def health() -> dict[str, str]:
    return {"status": "ok", "service": settings.service_name}


@app.get("/ready")
async def ready(request: Request) -> dict[str, str]:
    runtime: RagRuntime = request.app.state.runtime
    if not runtime.loaded:
        raise HTTPException(status_code=503, detail="runtime is not ready")
    return {"status": "ready", "model_version": runtime.model_version}


@app.get("/models/current")
async def current_model(request: Request) -> dict[str, str]:
    runtime: RagRuntime = request.app.state.runtime
    return {"model_version": runtime.model_version, "runtime": "RagRuntime"}


@app.post(
    "/query",
    response_model=QueryResponse,
    responses={429: {"model": ErrorResponse}, 503: {"model": ErrorResponse}, 504: {"model": ErrorResponse}},
)
async def query(payload: QueryRequest, request: Request) -> QueryResponse:
    trace_id = request.state.trace_id
    rate_key = client_key(request, payload.tenant_id)
    if not await request.app.state.rate_limiter.allow(rate_key):
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail=error_detail(
                code="RATE_LIMITED",
                message="Rate limit exceeded. Please retry later.",
                trace_id=trace_id,
                retryable=True,
            ),
        )

    await acquire_slot(request.app, trace_id)
    started = time.perf_counter()
    try:
        async with asyncio.timeout(settings.request_timeout_s):
            response = await request.app.state.runtime.query(payload, trace_id)
            response.latency_ms["total"] = int((time.perf_counter() - started) * 1000)
            logger.info(
                "query_ok",
                extra={
                    "trace_id": trace_id,
                    "tenant_id": payload.tenant_id,
                    "input_chars": len(payload.question),
                    "latency_ms": response.latency_ms["total"],
                    "model_version": response.model_version,
                },
            )
            return response
    except asyncio.TimeoutError as exc:
        logger.warning("query_timeout", extra={"trace_id": trace_id, "tenant_id": payload.tenant_id})
        raise HTTPException(
            status_code=status.HTTP_504_GATEWAY_TIMEOUT,
            detail=error_detail(
                code="MODEL_TIMEOUT",
                message="Model runtime timed out. Please retry.",
                trace_id=trace_id,
                retryable=True,
            ),
        ) from exc
    finally:
        request.app.state.semaphore.release()


@app.get("/query/stream")
async def query_stream(
    request: Request,
    question: str = Query(..., min_length=1, max_length=2000),
    tenant_id: str = Query("demo", min_length=1, max_length=64),
    top_k: int = Query(6, ge=1, le=20),
    max_output_tokens: int = Query(512, ge=16, le=2048),
) -> StreamingResponse:
    trace_id = request.state.trace_id
    payload = QueryRequest(
        question=question,
        tenant_id=tenant_id,
        top_k=top_k,
        max_output_tokens=max_output_tokens,
    )
    rate_key = client_key(request, payload.tenant_id)
    if not await request.app.state.rate_limiter.allow(rate_key):
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail=error_detail(
                code="RATE_LIMITED",
                message="Rate limit exceeded. Please retry later.",
                trace_id=trace_id,
                retryable=True,
            ),
        )

    async def events() -> AsyncIterator[str]:
        slot_acquired = False
        started = time.perf_counter()
        try:
            await acquire_slot(request.app, trace_id)
            slot_acquired = True
            yield sse("meta", {"trace_id": trace_id, "model_version": settings.model_version})

            async with asyncio.timeout(settings.stream_timeout_s):
                async for token in request.app.state.runtime.stream(payload, trace_id):
                    if await request.is_disconnected():
                        logger.info("stream_client_disconnected", extra={"trace_id": trace_id})
                        return
                    yield sse("token", {"text": token})

            total_ms = int((time.perf_counter() - started) * 1000)
            yield sse(
                "done",
                {
                    "trace_id": trace_id,
                    "finish_reason": "stop",
                    "latency_ms": {"total": total_ms},
                },
            )
        except HTTPException as exc:
            body = exc.detail if isinstance(exc.detail, dict) else error_detail(
                "STREAM_ERROR",
                str(exc.detail),
                trace_id,
                retryable=True,
            )
            yield sse("error", body["error"])
        except asyncio.TimeoutError:
            yield sse(
                "error",
                error_detail(
                    "STREAM_TIMEOUT",
                    "Streaming response timed out.",
                    trace_id,
                    retryable=True,
                )["error"],
            )
        except Exception:
            logger.exception("stream_failed", extra={"trace_id": trace_id})
            yield sse(
                "error",
                error_detail(
                    "INTERNAL_ERROR",
                    "Unexpected model serving error.",
                    trace_id,
                    retryable=False,
                )["error"],
            )
        finally:
            if slot_acquired:
                request.app.state.semaphore.release()

    return StreamingResponse(
        events(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

Ghi chú quan trọng:

  • GET /query/stream dễ dùng với browser EventSource, nhưng query nằm trên URL. Với dữ liệu nhạy cảm, dùng POST /query/stream và stream bằng fetch().
  • In-memory rate limiter chỉ đúng cho local hoặc một process. Production nên dùng Redis, API Gateway, Envoy, Kong, NGINX, Cloudflare hoặc service quota tập trung.
  • Nếu dùng nhiều Uvicorn workers, mỗi worker có limiter và semaphore riêng. Với GPU local, nhiều worker có thể load nhiều bản model và gây OOM.
  • asyncio.timeout() cần Python 3.11+. Nếu project dùng Python cũ hơn, thay bằng asyncio.wait_for().
  • Nếu model runtime là sync CPU-bound, không gọi trực tiếp trong async endpoint. Đưa vào worker thread/process hoặc tách thành model server riêng.

5. Streaming SSE contract

SSE phù hợp khi server chỉ cần đẩy token/event xuống client:

client sends query
server yields token events
client renders partial answer
server yields done or error event

Event contract đề xuất:

event: meta
data: {"trace_id":"tr_123","model_version":"rag-api-v1"}

event: token
data: {"text":"Nhân viên "}

event: citation
data: {"source_id":"S1","document_id":"doc_hr_2026","chunk_id":"..."}

event: done
data: {"trace_id":"tr_123","finish_reason":"stop","latency_ms":{"total":1420}}

event: error
data: {"code":"STREAM_TIMEOUT","message":"Streaming response timed out.","trace_id":"tr_123","retryable":true}

SSE trade-off:

Lựa chọnNên dùng khiKhông nên dùng khi
SSEChat completion, token stream, progress event một chiềuCần client và server gửi event hai chiều liên tục
WebSocketCollaboration, voice, realtime bidirectional, tool session statefulChỉ stream token một chiều
Non-streaming HTTPBatch job, eval runner, automation, output ngắnChat UI cần phản hồi sớm

Best default cho RAG chat: POST /query cho automation/eval và SSE endpoint cho UI.

6. Timeout, rate limit và concurrency limit

Các limit nên được thiết kế theo nhiều lớp:

LayerControlVí dụ
Inputquestion.max_length, max file size, max context docsReject trước khi gọi model
Outputmax_output_tokens, stop sequenceGiảm latency và cost
Queuequeue_timeout_sKhông để request đợi vô hạn khi service bận
Runtimerequest_timeout_s, upstream HTTP timeoutFail clean thay vì treo worker
Tenantrequests/minute, tokens/minuteBảo vệ quota và budget
GPUmax_concurrent_requests, KV cache budgetTránh OOM
Networkreverse proxy timeout, idle timeoutTránh stream bị cắt bất ngờ

Rate limit trả 429 RATE_LIMITED. Concurrency limit thường trả 503 CONCURRENCY_LIMIT nếu request không lấy được slot trong queue timeout. Model timeout trả 504 MODEL_TIMEOUT.

Production nên tách rõ:

  • Rate limit theo tenant, user, API key và token budget, không chỉ theo IP.
  • Concurrency limit theo model/pool/GPU, không chỉ toàn service.
  • Timeout theo stage: retrieval timeout, rerank timeout, generation timeout, total timeout.

7. Batching trade-off

Batching gom nhiều request để dùng GPU hiệu quả hơn. Với LLM, các runtime như vLLM/TGI thường dùng continuous batching để thêm request mới vào batch đang chạy.

Lợi íchChi phí
Tăng tokens/second và GPU utilizationTăng queue delay nếu max_wait_ms quá cao
Giảm overhead mỗi requestp95/p99 latency có thể xấu hơn
Hữu ích cho batch/eval/offline workloadTime-to-first-token của chat có thể chậm
Tận dụng tốt GPU cho model lớnCần tuning tránh OOM do batch quá lớn

Metric cần đo trước khi bật batching mạnh:

  • p50/p95/p99 latency.
  • Time-to-first-token với streaming.
  • Tokens/second tổng và tokens/second mỗi request.
  • GPU utilization, memory usage, KV cache usage.
  • Error rate, timeout rate, OOM rate.

Best rule:

Interactive chat: ưu tiên p95 latency và time-to-first-token.
Batch/offline inference: ưu tiên throughput và cost/request.

Nếu workload là RAG chat nhiều user, nên dùng FastAPI làm gateway và để vLLM/TGI xử lý continuous batching phía model server.

8. So sánh serving tools

ToolPhù hợp nhấtĐiểm mạnhTrade-offKhi chọn
FastAPIAPI gateway, RAG orchestration, business logic, model nhỏPython-native, schema rõ, dễ debug, dễ tích hợp auth/loggingKhông tự giải quyết GPU scheduling, continuous batching hoặc model optimizationDefault cho capstone, RAG app, wrapper trước model server
BentoMLĐóng gói Python model service, batchable inference, model registry đơn giảnDeveloper experience tốt cho Python ML, build image/service nhanhVới LLM throughput cao vẫn cần runtime chuyên dụng phía sauClassical ML, embedding/reranker service, team muốn packaging chuẩn
TorchServeServe PyTorch model bằng handlerHợp nếu org đã chuẩn hóa TorchServe và model là PyTorchÍt linh hoạt cho LLM/RAG custom, cần viết handler và ops riêngPyTorch classifier/detector/ranker đã ổn định
Triton Inference ServerGPU inference throughput cao, multi-framework, dynamic batchingMạnh cho TensorRT/ONNX/PyTorch backend, gRPC/HTTP, model repositoryOps phức tạp, cần hiểu GPU profiling và model formatHigh-throughput CV/NLP model, multi-model GPU serving
vLLMLLM serving throughput caoOpenAI-compatible API, continuous batching, memory-efficient KV cachePhụ thuộc model architecture được hỗ trợ, cần GPU memory planningSelf-host LLM chat/completion, cần throughput tốt
TGIHugging Face text generation servingStreaming, tensor parallel, hợp HF ecosystemTuning vẫn cần GPU/ops skill, API khác nhau theo phiên bảnSelf-host HF model, muốn runtime chuyên cho generation

Best solution theo context:

ContextGiải pháp nên chọn
Mini-project hoặc RAG product mớiFastAPI gateway + RAG services rõ boundary
LLM self-host có traffic thậtFastAPI gateway + vLLM hoặc TGI phía sau
Model CV/NLP cần GPU throughputFastAPI gateway + Triton
Classical ML hoặc embedding service Python-firstBentoML hoặc FastAPI tùy team
Org đã có PyTorch serving platformTorchServe nếu vận hành đã mature
Cần ship nhanh, data không quá nhạyFastAPI gateway + managed model API

9. Best practices

  1. Load model/runtime một lần ở startup, warm up trước khi /ready trả ready.
  2. Tách /health/ready. Liveness không nên fail vì vector DB chậm tạm thời.
  3. Dùng Pydantic để validate request và response. Reject field lạ nếu contract cần chặt.
  4. Luôn trả trace_id, model_version hoặc pipeline_version.
  5. Log structured JSON với stage latency, input length, output tokens, tenant và error code.
  6. Không log raw prompt/response nếu có PII, hoặc phải redact trước khi lưu.
  7. Đặt timeout ở gateway và upstream client. Không chỉ dựa vào proxy timeout.
  8. Có concurrency limit để bảo vệ GPU/KV cache/provider quota.
  9. Streaming phải handle client disconnect và vẫn release resource.
  10. Với RAG, response phải có citation và citation validator trước khi trả client.
  11. Với self-host LLM, dùng runtime chuyên dụng cho batching thay vì tự batch trong FastAPI.
  12. Với multi-tenant, rate limit theo tenant/user/API key và có token budget.

10. Dùng được trong production không?

Câu trả lời thực tế: pattern trong bài dùng được trong production, nhưng sample code chỉ là baseline học tập. Có thể đưa vào production khi thỏa các điều kiện sau:

  • Runtime thật đã được tách rõ: model/RAG pipeline có version, health check, warmup và rollback plan.
  • Rate limit chuyển từ in-memory sang Redis/API Gateway hoặc quota service tập trung.
  • Có auth thật, tenant isolation và policy không log PII.
  • Timeout được đặt ở gateway, upstream client, reverse proxy và model server.
  • Concurrency limit được sizing bằng load test trên CPU/GPU thật.
  • Với LLM self-host, FastAPI chỉ làm gateway, còn batching và KV cache do vLLM/TGI/Triton hoặc runtime chuyên dụng xử lý.
  • Có metrics cho request count, error rate, timeout rate, p50/p95/p99 latency, tokens/sec, cost và GPU memory.
  • Có CI test cho schema, error contract, streaming event format và regression test cho RAG output.
  • Có deployment strategy: canary, rollback, model version pinning và dashboard.

Không nên gọi là production-ready nếu:

  • Model load trong mỗi request.
  • Không có timeout/concurrency limit.
  • Không có trace id hoặc model version trong response.
  • Dùng in-memory limiter với nhiều replicas nhưng tưởng là global quota.
  • SSE stream không release resource khi client disconnect.
  • Chưa load test p95 latency và OOM behavior.

11. Kết luận

FastAPI là lựa chọn tốt cho serving contract, orchestration và product boundary. Nhưng với GPU throughput hoặc LLM traffic nghiêm túc, FastAPI nên đứng trước model server chuyên dụng. Production serving không chỉ là "API chạy được", mà là một hệ thống có contract, limit, observability, versioning và rollback.


Tài liệu

1. Serving architecture template

Template kiến trúc cho model hoặc RAG pipeline:

Client/UI/Batch Job
  -> API Gateway or FastAPI
      -> Authentication
      -> Tenant/User context
      -> Request validation
      -> Rate limit
      -> Concurrency limit
      -> Timeout wrapper
      -> Model/RAG runtime adapter
      -> Response contract or SSE stream
      -> Structured logs, metrics, traces

Runtime adapter
  -> Local model, BentoML, TorchServe, Triton, vLLM, TGI, or managed provider

Boundary quan trọng:

BoundaryTrách nhiệm
API contractInput/output schema, error schema, event schema
GatewayAuth, validation, limit, timeout, trace id, response mapping
Runtime adapterGọi model server/provider, retry có kiểm soát, mapping lỗi
Model serverLoad model, optimize inference, batching, GPU memory
ObservabilityLogs, metrics, traces, alert, dashboard

2. Project structure mẫu

model-serving-api/
  app/
    main.py
    api/
      health.py
      query.py
      streaming.py
      models.py
    core/
      config.py
      errors.py
      logging.py
      limits.py
      security.py
    schemas/
      query.py
      errors.py
      events.py
    services/
      runtime.py
      rag_runtime.py
      llm_client.py
      metrics.py
    tests/
      test_contract.py
      test_validation.py
      test_streaming.py
      test_limits.py
  scripts/
    smoke_test.py
    stream_client.py
    load_test.py
  pyproject.toml
  Dockerfile
  .env.example
  README.md

Với project nhỏ, có thể gom file lại. Nhưng vẫn nên giữ rõ 4 nhóm: api, schemas, services, core.

3. API contract template

GET /health

Mục đích: liveness. Không gọi dependency nặng.

Response:

{
  "status": "ok",
  "service": "model-serving-api",
  "version": "1.0.0"
}

GET /ready

Mục đích: readiness. Có thể kiểm tra model loaded, runtime client, vector DB hoặc model server.

Response:

{
  "status": "ready",
  "model_version": "rag-api-v1",
  "dependencies": {
    "runtime": "ok",
    "vector_db": "ok"
  }
}

Nếu chưa sẵn sàng:

{
  "error": {
    "code": "NOT_READY",
    "message": "Runtime is not ready.",
    "trace_id": "tr_123",
    "retryable": true
  }
}

POST /query

Request:

{
  "question": "Nhân viên full-time có bao nhiêu ngày nghỉ phép năm?",
  "tenant_id": "demo",
  "top_k": 6,
  "max_output_tokens": 512,
  "include_trace": false
}

Validation rules:

FieldRuleLý do
questionRequired, 1..2000 charsChặn prompt rỗng hoặc quá dài
tenant_idRequired/default, 1..64 charsRate limit và data isolation
top_k1..20Chặn context quá lớn
max_output_tokens16..2048Kiểm soát latency và cost
include_traceBooleanTrace chi tiết chỉ nên bật khi cần

Response:

{
  "answer": "Nhân viên full-time có 12 ngày nghỉ phép năm [S1].",
  "citations": [
    {
      "source_id": "S1",
      "document_id": "doc_hr_2026",
      "chunk_id": "demo:doc_hr_2026:v1:00003",
      "title": "HR Policy 2026",
      "page_start": 3,
      "page_end": 3
    }
  ],
  "trace_id": "tr_123",
  "latency_ms": {
    "retrieval": 40,
    "rerank": 130,
    "generation": 900,
    "total": 1070
  },
  "model_version": "rag-api-v1",
  "finish_reason": "stop"
}

GET /query/stream

Query params:

question=Nhân viên full-time có bao nhiêu ngày nghỉ phép năm?
tenant_id=demo
top_k=6
max_output_tokens=512

Headers:

Accept: text/event-stream
Cache-Control: no-cache

Event format:

event: meta
data: {"trace_id":"tr_123","model_version":"rag-api-v1"}

event: token
data: {"text":"Nhân viên "}

event: citation
data: {"source_id":"S1","document_id":"doc_hr_2026","chunk_id":"demo:doc_hr_2026:v1:00003"}

event: done
data: {"trace_id":"tr_123","finish_reason":"stop","latency_ms":{"total":1070}}

event: error
data: {"code":"STREAM_TIMEOUT","message":"Streaming response timed out.","trace_id":"tr_123","retryable":true}

Production note:

  • Dùng GET nếu cần browser EventSource đơn giản.
  • Dùng POST streaming với fetch() nếu query nhạy cảm hoặc payload lớn.
  • Disable buffering ở reverse proxy cho SSE, ví dụ X-Accel-Buffering: no với NGINX.
  • Kiểm tra idle timeout của load balancer, CDN và ingress.

GET /models/current

Response:

{
  "model_version": "rag-api-v1",
  "runtime": "vLLM",
  "model_name": "meta-llama/Llama-3.1-8B-Instruct",
  "prompt_version": "answer-prompt-v3",
  "deployed_at": "2026-05-10T10:00:00Z"
}

4. Error code catalog

HTTPCodeRetryableKhi nào
400BAD_REQUESTNoPayload sai logic nhưng qua schema
401UNAUTHORIZEDNoThiếu hoặc sai auth
403FORBIDDENNoUser không có quyền tenant/model
422VALIDATION_ERRORNoFastAPI/Pydantic reject schema
429RATE_LIMITEDYesVượt request/token quota
503NOT_READYYesRuntime chưa load xong
503CONCURRENCY_LIMITYesHết slot inference
504MODEL_TIMEOUTYesRuntime hoặc upstream quá timeout
500INTERNAL_ERRORMaybeLỗi bất ngờ, cần trace

Rule:

  • Client chỉ thấy message an toàn.
  • Server log giữ exception type, stack trace và upstream error.
  • Mọi error phải có trace_id.

5. Config template

.env.example:

APP_ENV=local
SERVICE_NAME=model-serving-api
MODEL_VERSION=rag-api-v1

REQUEST_TIMEOUT_S=20
STREAM_TIMEOUT_S=60
QUEUE_TIMEOUT_S=0.2
MAX_CONCURRENT_REQUESTS=8

RATE_LIMIT_WINDOW_S=60
RATE_LIMIT_REQUESTS=60
RATE_LIMIT_TOKENS=120000

RUNTIME_KIND=fake
RUNTIME_BASE_URL=http://vllm:8000/v1
RUNTIME_API_KEY=change-me

LOG_LEVEL=INFO
REDACT_PROMPTS=true

Timeout sizing gợi ý:

WorkloadRequest timeoutStream timeoutQueue timeout
Classifier CPU nhỏ1-3sN/A50-100ms
Embedding batch nhỏ5-15sN/A100-300ms
RAG non-streaming15-30sN/A100-500ms
LLM chat streaming10-30s first response, 60-180s total60-180s100-500ms

Không copy số này vào production một cách máy móc. Phải đo trên model, GPU, context length và traffic thật.

6. Structured log fields

Log một request thành công:

{
  "event": "query_ok",
  "trace_id": "tr_123",
  "tenant_id": "demo",
  "user_id": "u_456",
  "route": "/query",
  "input_chars": 68,
  "prompt_tokens": 2100,
  "completion_tokens": 180,
  "latency_ms": {
    "retrieval": 40,
    "rerank": 130,
    "generation": 900,
    "total": 1070
  },
  "model_version": "rag-api-v1",
  "finish_reason": "stop"
}

Log một request lỗi:

{
  "event": "query_failed",
  "trace_id": "tr_123",
  "tenant_id": "demo",
  "route": "/query",
  "error_code": "MODEL_TIMEOUT",
  "retryable": true,
  "latency_ms": {
    "total": 20010
  },
  "model_version": "rag-api-v1"
}

Không nên log raw prompt/response mặc định. Nếu cần debug, bật sampling thấp, redact PII và giới hạn retention.

7. Tool decision matrix

ContextBest defaultVì saoAlternative
RAG app hoặc product APIFastAPI gatewayCần business logic, auth, schema, trace, citationBentoML nếu muốn package pipeline như model service
LLM self-host nhiều trafficFastAPI + vLLM/TGIGateway giữ product contract, runtime giữ batching/KV cacheManaged LLM API nếu muốn giảm ops
CV/NLP model cần throughput GPUFastAPI + TritonTriton mạnh về dynamic batching và model repositoryBentoML nếu throughput vừa phải
Classical ML model PythonFastAPI hoặc BentoMLDễ vận hành, dễ testTorchServe nếu org chuẩn PyTorch
PyTorch model đã có handler chuẩnTorchServeHợp nếu platform đã có sẵnTriton hoặc BentoML
Offline/batch inferenceBentoML/Triton/job runnerThroughput quan trọng hơn p95 user latencyFastAPI nếu vẫn cần online API

Short rule:

FastAPI = product/API boundary.
vLLM/TGI/Triton = high-throughput runtime boundary.
BentoML = Python model packaging boundary.
TorchServe = PyTorch serving boundary khi org đã chuẩn hóa.

8. Batching checklist

Trước khi bật batching:

  • Biết workload là interactive, batch, hay mixed.
  • Có baseline p50/p95/p99 latency khi batch off hoặc batch nhỏ.
  • Có metric time-to-first-token cho streaming.
  • Có metric tokens/sec và GPU utilization.
  • Có limit cho max_batch_size, max_wait_ms, max_num_batched_tokens.
  • Có timeout nếu request đứng trong queue quá lâu.
  • Có test với prompt dài và output dài.
  • Có alert cho OOM, timeout rate và queue depth.

Decision:

Nếu mục tiêu làTối ưu
Chat UI phản hồi nhanhmax_wait_ms thấp, concurrency vừa phải, stream sớm
Batch eval qua đêmBatch lớn hơn, queue lâu hơn được
Cost thấpThroughput/GPU utilization cao, chấp nhận latency
SLA p95 chặtBatch nhỏ hơn, scale replicas hoặc dùng model nhỏ hơn

9. Security checklist

  • Auth bắt buộc cho endpoint inference.
  • Tenant/user context không lấy hoàn toàn từ body nếu client có thể giả mạo.
  • Rate limit theo tenant/user/API key.
  • Token budget theo tenant để tránh cost spike.
  • Không trả stack trace, provider error raw hoặc prompt nội bộ ra client.
  • Redact prompt/response trong logs.
  • Nếu RAG multi-tenant, filter ACL ở retriever, không giao cho prompt.
  • Validate content type và payload size.
  • Có allowlist model nếu client được chọn model.
  • Có audit log cho model version, prompt version và deployment.

10. Production readiness checklist

Contract

  • OpenAPI schema rõ cho request/response/error.
  • trace_id trong header và response body.
  • model_version hoặc pipeline_version.
  • finish_reason.
  • SSE event có meta, token, done, error.

Reliability

  • /health/ready tách riêng.
  • Startup load/warmup model một lần.
  • Timeout ở gateway, upstream client và proxy.
  • Concurrency limit đã sizing bằng load test.
  • Rate limit global, không chỉ process-local.
  • Graceful shutdown không cắt request đang xử lý quá thô.

Performance

  • Đo p50/p95/p99 latency.
  • Đo time-to-first-token.
  • Đo tokens/sec và request/sec.
  • Đo CPU/GPU memory.
  • Đã test prompt dài, output dài và burst traffic.
  • Batching được tuning theo SLA.

Observability

  • Structured logs có trace id, tenant, route, latency, model version.
  • Metrics có request count, error rate, timeout rate, latency histogram.
  • Dashboard có queue depth/concurrency/GPU memory nếu self-host.
  • Alert cho timeout spike, OOM, readiness fail và error rate.

Release

  • Canary hoặc blue/green deploy.
  • Rollback model/prompt/runtime version.
  • Smoke test sau deploy.
  • Contract test trong CI.
  • Eval regression cho RAG/model quality.

11. Incident runbook

Timeout tăng mạnh

  1. Kiểm tra p95/p99 latency theo stage: retrieval, rerank, generation, upstream.
  2. Kiểm tra queue depth và concurrency.
  3. Kiểm tra prompt length và output tokens có tăng không.
  4. Tạm giảm max_output_tokens hoặc top_k nếu cần bảo vệ service.
  5. Scale runtime hoặc rollback model nếu latency tăng sau deploy.

GPU OOM

  1. Dừng rollout hoặc giảm traffic.
  2. Giảm max_concurrent_requests, batch size hoặc max context/output tokens.
  3. Kiểm tra model replicas có bị nhân đôi do nhiều worker không.
  4. Kiểm tra KV cache usage và prompt length distribution.
  5. Chạy lại load test trước khi mở traffic.

SSE stream bị cắt

  1. Kiểm tra reverse proxy/CDN/ingress idle timeout.
  2. Kiểm tra buffering có bị bật không.
  3. Kiểm tra server có gửi heartbeat hoặc token đều không.
  4. Kiểm tra client disconnect rate.
  5. Log finish_reason để phân biệt done, timeout, client_disconnected.

Model version mismatch

  1. So sánh /models/current với deployment manifest.
  2. Kiểm tra registry version và image tag.
  3. Kiểm tra cache hoặc replica cũ chưa drain.
  4. Rollback hoặc restart replica lệch version.
  5. Gắn test vào CI/CD để chặn mismatch lần sau.

12. Câu trả lời production readiness mẫu

Pattern này dùng được trong production nếu FastAPI chỉ giữ API/product boundary,
runtime được load/warmup đúng cách, limit được quản lý global, timeout được đặt ở
mọi layer, và hệ thống có observability + rollback.

Sample local chưa production-ready vì rate limiter là in-memory, fake runtime,
chưa có auth thật, chưa có Redis/API Gateway quota, chưa có dashboard/alert,
và chưa được load test trên traffic + GPU thật.

Bài tập

Mục tiêu

Bạn sẽ triển khai một API serving cho model hoặc RAG pipeline với:

  • GET /health
  • GET /ready
  • GET /models/current
  • POST /query
  • GET /query/stream dùng Server-Sent Events
  • Pydantic request/response validation
  • Timeout, rate limit và concurrency limit
  • Structured logs có trace_id
  • Test script cho non-streaming và streaming
  • README trả lời production readiness

Thời lượng đề xuất:

  • Bản tối thiểu: 60-90 phút.
  • Bản tốt cho portfolio: 0.5-1 ngày.
  • Bản gần production: 2-3 ngày, thêm auth, Redis limiter, Docker, metrics và CI.

0. Acceptance criteria

Hoàn thành bài tập khi bạn có:

  • POST /query reject request sai schema bằng 422.
  • POST /query trả answer, citations, trace_id, latency_ms, model_version, finish_reason.
  • GET /query/stream trả event meta, nhiều event token, và event done.
  • Timeout trả error contract có code MODEL_TIMEOUT hoặc STREAM_TIMEOUT.
  • Rate limit trả 429 RATE_LIMITED.
  • Concurrency limit trả 503 CONCURRENCY_LIMIT hoặc SSE error.
  • Log có trace_id, tenant_id, input_chars, latency_ms, model_version.
  • Có script test streaming bằng Python hoặc curl -N.
  • README trả lời: "Dùng được trong production không? Nếu có thì cần điều kiện gì?"

1. Chọn runtime

Chọn một trong ba mức:

MứcRuntimeKhi nào dùng
Local fake runtimeHàm async giả lập modelHọc API boundary nhanh
RAG runtime từ Day 40Gọi retrieval + generation pipelineMuốn nối với mini-project RAG
LLM runtime thậtGọi vLLM/TGI/managed LLM qua HTTPMuốn demo self-host hoặc provider thật

Khuyến nghị cho lần đầu: dùng fake runtime trước. Sau khi contract ổn, thay runtime bằng RAG hoặc LLM thật.

2. Scaffold project

Tạo folder:

day-42-serving-lab/
  app/
    main.py
    schemas.py
    runtime.py
    limits.py
  scripts/
    smoke_test.py
    stream_client.py
  pyproject.toml
  README.md

Dependencies tối thiểu:

[project]
requires-python = ">=3.11"
dependencies = [
  "fastapi",
  "uvicorn[standard]",
  "httpx",
  "pydantic",
]

Nếu dùng config từ env, thêm:

"pydantic-settings"

Chạy local:

uvicorn app.main:app --reload --port 8000

3. Implement schema

Tạo app/schemas.py:

from typing import Literal

from pydantic import BaseModel, ConfigDict, Field


class Citation(BaseModel):
    source_id: str
    document_id: str
    chunk_id: str
    title: str
    page_start: int | None = None
    page_end: int | None = None


class QueryRequest(BaseModel):
    model_config = ConfigDict(extra="forbid")

    question: str = Field(..., min_length=1, max_length=2000)
    tenant_id: str = Field("demo", min_length=1, max_length=64)
    top_k: int = Field(6, ge=1, le=20)
    max_output_tokens: int = Field(512, ge=16, le=2048)
    include_trace: bool = False


class QueryResponse(BaseModel):
    answer: str
    citations: list[Citation]
    trace_id: str
    latency_ms: dict[str, int]
    model_version: str
    finish_reason: Literal["stop", "timeout", "error"]

Test nhanh validation:

curl -s -X POST http://localhost:8000/query \
  -H 'content-type: application/json' \
  -d '{"question":"","unknown_field":true}' | jq

Kỳ vọng: FastAPI trả 422.

4. Implement fake runtime

Tạo app/runtime.py:

import asyncio
import time
from collections.abc import AsyncIterator

from app.schemas import Citation, QueryRequest, QueryResponse


class Runtime:
    def __init__(self, model_version: str) -> None:
        self.model_version = model_version
        self.loaded = False

    async def startup(self) -> None:
        await asyncio.sleep(0.05)
        self.loaded = True

    async def shutdown(self) -> None:
        self.loaded = False

    async def query(self, payload: QueryRequest, trace_id: str) -> QueryResponse:
        started = time.perf_counter()
        await asyncio.sleep(0.15)
        return QueryResponse(
            answer="Câu trả lời mẫu có citation [S1].",
            citations=[
                Citation(
                    source_id="S1",
                    document_id="demo-doc",
                    chunk_id="demo-doc:v1:00001",
                    title="Demo Document",
                )
            ],
            trace_id=trace_id,
            latency_ms={"generation": int((time.perf_counter() - started) * 1000)},
            model_version=self.model_version,
            finish_reason="stop",
        )

    async def stream(self, payload: QueryRequest, trace_id: str) -> AsyncIterator[str]:
        tokens = ["Câu ", "trả ", "lời ", "đang ", "được ", "stream ", "qua ", "SSE."]
        for token in tokens:
            await asyncio.sleep(0.08)
            yield token

Nâng cấp sau:

  • Với RAG Day 40: query() gọi retrieval, rerank, generator và citation validator.
  • Với vLLM/TGI: query() gọi HTTP endpoint, stream() parse upstream stream rồi map về SSE event contract của bạn.
  • Với Triton: query() gọi gRPC/HTTP client và map tensor output về response schema.

5. Implement rate/concurrency limit

Tạo app/limits.py:

import asyncio
import time
from collections import defaultdict, deque


class InMemoryRateLimiter:
    def __init__(self, max_requests: int, window_s: int) -> None:
        self.max_requests = max_requests
        self.window_s = window_s
        self._hits: dict[str, deque[float]] = defaultdict(deque)
        self._lock = asyncio.Lock()

    async def allow(self, key: str) -> bool:
        now = time.monotonic()
        async with self._lock:
            bucket = self._hits[key]
            while bucket and now - bucket[0] > self.window_s:
                bucket.popleft()
            if len(bucket) >= self.max_requests:
                return False
            bucket.append(now)
            return True

Ghi rõ trong README:

In-memory limiter chỉ dùng cho local. Production cần Redis/API Gateway/quota service
để limit có hiệu lực trên nhiều process và nhiều replicas.

6. Implement FastAPI app

Tạo app/main.py. Bạn có thể dùng code trong lession.md làm baseline, hoặc tự implement với các yêu cầu:

  • lifespan load runtime một lần.
  • Middleware tạo trace_id và trả header x-trace-id.
  • Exception handler trả error contract.
  • /health không gọi dependency nặng.
  • /ready kiểm tra runtime loaded.
  • /query bọc runtime call bằng timeout.
  • /query/stream trả StreamingResponse với text/event-stream.
  • Streaming generator release semaphore trong finally.

Checklist code:

  • Không gọi Runtime() trong từng request.
  • Không dùng bare except rồi nuốt lỗi.
  • Không trả stack trace ra client.
  • Không để request chờ semaphore vô hạn.
  • Không log raw prompt nếu data có thể chứa PII.

7. Test bằng curl

Health:

curl -s http://localhost:8000/health | jq
curl -s http://localhost:8000/ready | jq
curl -s http://localhost:8000/models/current | jq

Non-streaming:

curl -s -X POST http://localhost:8000/query \
  -H 'content-type: application/json' \
  -H 'x-api-key: dev-key' \
  -d '{
    "question": "Day 42 học gì?",
    "tenant_id": "demo",
    "top_k": 6,
    "max_output_tokens": 256
  }' | jq

Streaming:

curl -N 'http://localhost:8000/query/stream?tenant_id=demo&question=Day%2042%20h%E1%BB%8Dc%20g%C3%AC%3F&max_output_tokens=128'

Kỳ vọng:

event: meta
data: {"trace_id":"...","model_version":"..."}

event: token
data: {"text":"Câu "}

event: done
data: {"trace_id":"...","finish_reason":"stop","latency_ms":{"total":...}}

8. Viết smoke test

Tạo scripts/smoke_test.py:

import asyncio

import httpx


BASE_URL = "http://localhost:8000"


async def main() -> None:
    async with httpx.AsyncClient(timeout=30) as client:
        health = await client.get(f"{BASE_URL}/health")
        health.raise_for_status()

        response = await client.post(
            f"{BASE_URL}/query",
            json={
                "question": "Day 42 học gì?",
                "tenant_id": "demo",
                "top_k": 6,
                "max_output_tokens": 128,
            },
        )
        response.raise_for_status()
        body = response.json()
        assert body["trace_id"]
        assert body["model_version"]
        assert body["finish_reason"] == "stop"
        assert "total" in body["latency_ms"]
        print("smoke test ok", body["trace_id"])


if __name__ == "__main__":
    asyncio.run(main())

Chạy:

python scripts/smoke_test.py

9. Viết streaming client test

Tạo scripts/stream_client.py:

import asyncio

import httpx


BASE_URL = "http://localhost:8000"


async def main() -> None:
    params = {
        "question": "Day 42 học gì?",
        "tenant_id": "demo",
        "max_output_tokens": 128,
    }
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("GET", f"{BASE_URL}/query/stream", params=params) as response:
            response.raise_for_status()
            async for line in response.aiter_lines():
                if line:
                    print(line)


if __name__ == "__main__":
    asyncio.run(main())

Chạy:

python scripts/stream_client.py

Acceptance:

  • Nhìn thấy event: meta.
  • Nhìn thấy nhiều event: token.
  • Nhìn thấy event: done.
  • Nếu giảm timeout rất thấp, nhìn thấy event: error.

10. Test timeout

Sửa fake runtime tạm thời:

async def query(self, payload, trace_id):
    await asyncio.sleep(999)
    return QueryResponse(
        answer="timeout simulation",
        citations=[],
        trace_id=trace_id,
        latency_ms={"total": 999000},
        model_version="fake-rag-v1",
        finish_reason="timeout",
    )

Giảm config:

request_timeout_s = 1.0
stream_timeout_s = 1.0

Gọi:

curl -s -X POST http://localhost:8000/query \
  -H 'content-type: application/json' \
  -d '{"question":"timeout test","tenant_id":"demo"}' | jq

Kỳ vọng:

{
  "error": {
    "code": "MODEL_TIMEOUT",
    "message": "Model runtime timed out. Please retry.",
    "trace_id": "...",
    "retryable": true
  }
}

Sau test, revert phần sleep giả lập trong lab code của bạn.

11. Test rate limit

Đặt rate_limit_requests = 2, rate_limit_window_s = 60.

Gọi 3 lần liên tiếp:

for i in 1 2 3; do
  curl -s -o /dev/null -w "%{http_code}\n" \
    -X POST http://localhost:8000/query \
    -H 'content-type: application/json' \
    -H 'x-api-key: same-client' \
    -d '{"question":"rate test","tenant_id":"demo"}'
done

Kỳ vọng:

200
200
429

12. Test concurrency limit

Đặt:

max_concurrent_requests = 1
queue_timeout_s = 0.05

Tạo nhiều request song song:

import asyncio

import httpx


async def call(i: int) -> None:
    async with httpx.AsyncClient(timeout=10) as client:
        response = await client.post(
            "http://localhost:8000/query",
            json={"question": f"concurrency test {i}", "tenant_id": "demo"},
        )
        print(i, response.status_code, response.text[:120])


async def main() -> None:
    await asyncio.gather(*(call(i) for i in range(10)))


asyncio.run(main())

Kỳ vọng: một số request thành công, một số request trả 503 CONCURRENCY_LIMIT.

13. Benchmark batching trade-off

Nếu bạn dùng vLLM/TGI/Triton hoặc runtime có batching, chạy ít nhất 3 cấu hình:

ConfigMục tiêu
Batch nhỏ, wait thấpBaseline latency
Batch vừaCân bằng latency/throughput
Batch lớn, wait caoThroughput tối đa

Ghi lại:

MetricBatch nhỏBatch vừaBatch lớn
request/sec
tokens/sec
p50 latency
p95 latency
p99 latency
time-to-first-token
timeout rate
GPU memory peak

Viết kết luận:

Với interactive chat, chọn config ... vì p95 và time-to-first-token tốt hơn.
Với offline batch, chọn config ... vì throughput cao hơn và SLA latency không chặt.

14. Thay fake runtime bằng runtime thật

Option A: RAG pipeline từ Day 40

Map flow:

QueryRequest
  -> normalize question
  -> retrieve with tenant/ACL filter
  -> rerank
  -> build context
  -> generate answer
  -> validate citation
  -> QueryResponse

Yêu cầu:

  • tenant_id phải đi vào retriever filter.
  • top_k phải có upper bound.
  • Response phải có citation thật.
  • Trace latency phải tách retrieval, rerank, generation.

Option B: vLLM/TGI hoặc managed LLM

Map flow:

QueryRequest
  -> prompt builder
  -> HTTP client with timeout
  -> parse completion or stream chunks
  -> QueryResponse/SSE events

Yêu cầu:

  • HTTP client có timeout.
  • Không retry vô hạn.
  • Map upstream timeout thành MODEL_TIMEOUT.
  • Map upstream rate limit thành UPSTREAM_RATE_LIMITED hoặc RATE_LIMITED.
  • Không leak API key/provider error raw ra client.

Option C: Triton/BentoML/TorchServe

Map flow:

QueryRequest
  -> preprocess
  -> model server request
  -> postprocess
  -> QueryResponse

Yêu cầu:

  • Version model rõ.
  • Preprocess/postprocess có test.
  • Timeout và batch behavior được đo.

15. README production readiness

README phải có một section:

## Production Readiness

### Dùng được trong production không?

Pattern API này có thể dùng trong production, nhưng bản lab hiện tại chưa đủ
production-ready.

### Điều kiện cần để production-ready

- Thay in-memory rate limiter bằng Redis/API Gateway/quota service.
- Thêm authentication và tenant authorization thật.
- Không log raw prompt/response hoặc phải redact PII.
- Chạy load test để sizing timeout, concurrency và batching.
- Thêm metrics, dashboard và alert cho latency, timeout, error rate, queue depth.
- Nếu self-host LLM, đặt FastAPI trước vLLM/TGI/Triton thay vì tự batch trong FastAPI.
- Thêm canary deploy, rollback model version và contract test trong CI.

16. Câu hỏi review

Trả lời các câu sau sau khi hoàn thành lab:

  1. Contract của /query có field nào giúp debug production?
  2. Vì sao /health/ready không nên giống nhau?
  3. Vì sao in-memory rate limiter sai khi chạy nhiều replicas?
  4. Concurrency limit bảo vệ điều gì trong LLM serving?
  5. Batching cải thiện metric nào và có thể làm xấu metric nào?
  6. Khi nào nên chọn FastAPI-only?
  7. Khi nào nên chọn FastAPI + vLLM/TGI?
  8. Khi nào Triton hợp lý hơn FastAPI-only?
  9. SSE có lợi gì so với non-streaming HTTP cho chat UI?
  10. Bản lab của bạn còn thiếu gì để production-ready?

17. Rubric tự chấm

MứcTiêu chí
Đạt/query, /query/stream, validation, timeout và README production readiness
KháCó rate/concurrency limit, structured logs, smoke test và stream client
TốtCó runtime thật, trace latency theo stage, benchmark p95/TTFT và error contract đầy đủ
Gần productionCó auth, Redis/API Gateway limiter, metrics/dashboard, Docker, CI contract test và rollout/rollback plan