- Published on
Day 42: Model Serving Với FastAPI, SSE Và Production Boundary
- Authors

- Name
- Trần Mạnh Thắng
- @TranManhThang96
1. Mục tiêu bài học
Sau Day 42, bạn cần build được một serving layer cho model hoặc RAG pipeline có các đặc điểm sau:
- Có API contract rõ ràng, không chỉ là một hàm
predict()trả string. - Có request validation, response schema, error schema và
trace_id. - Có
/healthcho process health và/readycho model/runtime readiness. - Có endpoint non-streaming cho automation và endpoint streaming SSE cho chat UI.
- Có timeout, input limit, output limit, rate limit và concurrency limit.
- Biết khi nào FastAPI là đủ, khi nào nên đặt FastAPI trước vLLM, TGI, Triton hoặc service chuyên dụng khác.
- Trả lời được: "Dùng được trong production không? Nếu có thì cần điều kiện gì?"
Trong bài này, "model" có thể là:
- Classical ML model: classifier, ranker, regressor.
- Deep learning model: PyTorch/TensorFlow/ONNX.
- LLM local hoặc managed provider.
- RAG pipeline từ Day 40: retrieve, rerank, build context, generate, validate citation.
2. Mental model: training artifact khác serving contract
Training tạo artifact:
dataset + code + params -> model artifact + metrics + registry version
Serving biến artifact thành product boundary:
client
-> API gateway
-> auth/rate limit
-> request validation
-> timeout/concurrency control
-> model runtime hoặc RAG pipeline
-> response/stream
-> structured logs + metrics + trace
Một model có accuracy tốt vẫn có thể fail production nếu serving layer thiếu boundary:
| Vấn đề | Hậu quả |
|---|---|
| Không validate input | Prompt quá dài, payload sai schema, request gây OOM hoặc chi phí tăng |
| Không timeout | Request treo, worker bị giữ, queue tăng và toàn service chậm |
| Không concurrency limit | GPU hết memory do quá nhiều request đồng thời |
| Không rate limit | Một tenant hoặc một client ăn hết quota |
| Không trace id | Không debug được request lỗi |
| Không version response | Không biết câu trả lời đến từ model/pipeline nào |
| Không streaming | Chat UI có time-to-first-token kém, user tưởng service chết |
3. Serving contract tối thiểu
Endpoint đề xuất cho model hoặc RAG pipeline:
| Endpoint | Mục đích | Ghi chú production |
|---|---|---|
GET /health | Process còn sống không | Không gọi dependency nặng, dùng cho liveness probe |
GET /ready | Service sẵn sàng nhận traffic không | Kiểm tra model loaded, vector DB/model server reachable |
GET /models/current | Trả model/runtime/pipeline version | Bắt buộc để debug rollback và A/B test |
POST /query | Non-streaming inference | Dùng cho batch job, automation, test, eval |
GET /query/stream hoặc POST /query/stream | Streaming token/event | Dùng cho chat UI, SSE một chiều server-to-client |
Request contract nên giới hạn rõ:
{
"question": "Nhân viên full-time có bao nhiêu ngày nghỉ phép năm?",
"tenant_id": "demo",
"top_k": 6,
"max_output_tokens": 512,
"include_trace": false
}
Response contract nên trả đủ dữ liệu để user dùng được và engineer debug được:
{
"answer": "Nhân viên full-time có 12 ngày nghỉ phép năm [S1].",
"citations": [
{
"source_id": "S1",
"document_id": "doc_hr_2026",
"chunk_id": "demo:doc_hr_2026:v1:00003",
"title": "HR Policy 2026",
"page_start": 3,
"page_end": 3
}
],
"trace_id": "tr_20260510_001",
"latency_ms": {
"retrieval": 42,
"rerank": 126,
"generation": 870,
"total": 1041
},
"model_version": "rag-api-v1",
"finish_reason": "stop"
}
Error contract không trả stack trace ra client:
{
"error": {
"code": "MODEL_TIMEOUT",
"message": "Model runtime timed out. Please retry.",
"trace_id": "tr_20260510_001",
"retryable": true
}
}
4. FastAPI service gần production
Ví dụ dưới đây dùng fake runtime để bài học chạy được local. Khi đưa vào project thật, thay RagRuntime bằng adapter gọi RAG pipeline, vLLM/TGI OpenAI-compatible endpoint, Triton gRPC/HTTP, BentoML service hoặc managed LLM API.
Điểm production-style trong code:
- Load runtime một lần trong
lifespan, không load model trong từng request. - Pydantic schema dùng
extra="forbid"để reject field lạ. - Mọi response có
trace_id,model_version,finish_reason. - Có timeout cho non-streaming và streaming.
- Có rate limit và concurrency limit ở gateway.
- SSE event có
token,done,error. - Streaming generator kiểm tra client disconnect.
- Log có latency, tenant, input length và error code.
from __future__ import annotations
import asyncio
import json
import logging
import time
import uuid
from collections import defaultdict, deque
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, Literal
from fastapi import FastAPI, HTTPException, Query, Request, status
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, ConfigDict, Field
logger = logging.getLogger("model-serving")
logging.basicConfig(level=logging.INFO)
@dataclass(frozen=True)
class Settings:
service_name: str = "day-42-model-serving"
model_version: str = "rag-api-v1"
request_timeout_s: float = 20.0
stream_timeout_s: float = 60.0
queue_timeout_s: float = 0.2
max_concurrent_requests: int = 8
rate_limit_window_s: int = 60
rate_limit_requests: int = 60
settings = Settings()
class Citation(BaseModel):
source_id: str
document_id: str
chunk_id: str
title: str
page_start: int | None = None
page_end: int | None = None
class QueryRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
question: str = Field(..., min_length=1, max_length=2000)
tenant_id: str = Field("demo", min_length=1, max_length=64)
top_k: int = Field(6, ge=1, le=20)
max_output_tokens: int = Field(512, ge=16, le=2048)
include_trace: bool = False
class QueryResponse(BaseModel):
answer: str
citations: list[Citation]
trace_id: str
latency_ms: dict[str, int]
model_version: str
finish_reason: Literal["stop", "timeout", "error"]
class ErrorBody(BaseModel):
code: str
message: str
trace_id: str
retryable: bool
class ErrorResponse(BaseModel):
error: ErrorBody
class InMemoryRateLimiter:
"""Process-local limiter for local lab. Use Redis/API Gateway for production."""
def __init__(self, max_requests: int, window_s: int) -> None:
self.max_requests = max_requests
self.window_s = window_s
self._hits: dict[str, deque[float]] = defaultdict(deque)
self._lock = asyncio.Lock()
async def allow(self, key: str) -> bool:
now = time.monotonic()
async with self._lock:
bucket = self._hits[key]
while bucket and now - bucket[0] > self.window_s:
bucket.popleft()
if len(bucket) >= self.max_requests:
return False
bucket.append(now)
return True
class RagRuntime:
def __init__(self, model_version: str) -> None:
self.model_version = model_version
self.loaded = False
async def startup(self) -> None:
# Replace this with model load, warmup request, DB connection, or client init.
await asyncio.sleep(0.05)
self.loaded = True
async def shutdown(self) -> None:
self.loaded = False
async def query(self, payload: QueryRequest, trace_id: str) -> QueryResponse:
started = time.perf_counter()
await asyncio.sleep(0.08)
answer = (
"Đây là câu trả lời mẫu từ runtime. "
"Trong project thật, thay phần này bằng RAG hoặc model server [S1]."
)
generation_ms = int((time.perf_counter() - started) * 1000)
return QueryResponse(
answer=answer,
citations=[
Citation(
source_id="S1",
document_id="demo-doc",
chunk_id="demo-doc:v1:00001",
title="Demo Document",
)
],
trace_id=trace_id,
latency_ms={"generation": generation_ms},
model_version=self.model_version,
finish_reason="stop",
)
async def stream(self, payload: QueryRequest, trace_id: str) -> AsyncIterator[str]:
tokens = [
"Đây ",
"là ",
"streaming ",
"response ",
"mẫu ",
"qua ",
"SSE.",
]
for token in tokens[: payload.max_output_tokens]:
await asyncio.sleep(0.05)
yield token
def sse(event: str, data: dict[str, Any]) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
def client_key(request: Request, tenant_id: str) -> str:
api_key = request.headers.get("x-api-key")
forwarded_for = request.headers.get("x-forwarded-for")
host = forwarded_for or (request.client.host if request.client else "unknown")
identity = api_key or host
return f"{tenant_id}:{identity}"
def error_detail(code: str, message: str, trace_id: str, retryable: bool) -> dict[str, Any]:
return {
"error": {
"code": code,
"message": message,
"trace_id": trace_id,
"retryable": retryable,
}
}
async def acquire_slot(app: FastAPI, trace_id: str) -> None:
try:
await asyncio.wait_for(app.state.semaphore.acquire(), timeout=settings.queue_timeout_s)
except asyncio.TimeoutError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=error_detail(
code="CONCURRENCY_LIMIT",
message="Too many concurrent requests. Please retry later.",
trace_id=trace_id,
retryable=True,
),
) from exc
@asynccontextmanager
async def lifespan(app: FastAPI):
runtime = RagRuntime(settings.model_version)
await runtime.startup()
app.state.runtime = runtime
app.state.semaphore = asyncio.Semaphore(settings.max_concurrent_requests)
app.state.rate_limiter = InMemoryRateLimiter(
max_requests=settings.rate_limit_requests,
window_s=settings.rate_limit_window_s,
)
yield
await runtime.shutdown()
app = FastAPI(title=settings.service_name, version="1.0.0", lifespan=lifespan)
@app.middleware("http")
async def add_trace_id(request: Request, call_next):
request.state.trace_id = request.headers.get("x-trace-id", f"tr_{uuid.uuid4().hex}")
response = await call_next(request)
response.headers["x-trace-id"] = request.state.trace_id
return response
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
trace_id = getattr(request.state, "trace_id", f"tr_{uuid.uuid4().hex}")
if isinstance(exc.detail, dict) and "error" in exc.detail:
body = exc.detail
else:
body = error_detail(
code="HTTP_ERROR",
message=str(exc.detail),
trace_id=trace_id,
retryable=False,
)
return JSONResponse(status_code=exc.status_code, content=body)
@app.exception_handler(Exception)
async def unexpected_exception_handler(request: Request, exc: Exception):
trace_id = getattr(request.state, "trace_id", f"tr_{uuid.uuid4().hex}")
logger.exception("unhandled_error", extra={"trace_id": trace_id})
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=error_detail(
code="INTERNAL_ERROR",
message="Unexpected model serving error.",
trace_id=trace_id,
retryable=False,
),
)
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok", "service": settings.service_name}
@app.get("/ready")
async def ready(request: Request) -> dict[str, str]:
runtime: RagRuntime = request.app.state.runtime
if not runtime.loaded:
raise HTTPException(status_code=503, detail="runtime is not ready")
return {"status": "ready", "model_version": runtime.model_version}
@app.get("/models/current")
async def current_model(request: Request) -> dict[str, str]:
runtime: RagRuntime = request.app.state.runtime
return {"model_version": runtime.model_version, "runtime": "RagRuntime"}
@app.post(
"/query",
response_model=QueryResponse,
responses={429: {"model": ErrorResponse}, 503: {"model": ErrorResponse}, 504: {"model": ErrorResponse}},
)
async def query(payload: QueryRequest, request: Request) -> QueryResponse:
trace_id = request.state.trace_id
rate_key = client_key(request, payload.tenant_id)
if not await request.app.state.rate_limiter.allow(rate_key):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=error_detail(
code="RATE_LIMITED",
message="Rate limit exceeded. Please retry later.",
trace_id=trace_id,
retryable=True,
),
)
await acquire_slot(request.app, trace_id)
started = time.perf_counter()
try:
async with asyncio.timeout(settings.request_timeout_s):
response = await request.app.state.runtime.query(payload, trace_id)
response.latency_ms["total"] = int((time.perf_counter() - started) * 1000)
logger.info(
"query_ok",
extra={
"trace_id": trace_id,
"tenant_id": payload.tenant_id,
"input_chars": len(payload.question),
"latency_ms": response.latency_ms["total"],
"model_version": response.model_version,
},
)
return response
except asyncio.TimeoutError as exc:
logger.warning("query_timeout", extra={"trace_id": trace_id, "tenant_id": payload.tenant_id})
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail=error_detail(
code="MODEL_TIMEOUT",
message="Model runtime timed out. Please retry.",
trace_id=trace_id,
retryable=True,
),
) from exc
finally:
request.app.state.semaphore.release()
@app.get("/query/stream")
async def query_stream(
request: Request,
question: str = Query(..., min_length=1, max_length=2000),
tenant_id: str = Query("demo", min_length=1, max_length=64),
top_k: int = Query(6, ge=1, le=20),
max_output_tokens: int = Query(512, ge=16, le=2048),
) -> StreamingResponse:
trace_id = request.state.trace_id
payload = QueryRequest(
question=question,
tenant_id=tenant_id,
top_k=top_k,
max_output_tokens=max_output_tokens,
)
rate_key = client_key(request, payload.tenant_id)
if not await request.app.state.rate_limiter.allow(rate_key):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=error_detail(
code="RATE_LIMITED",
message="Rate limit exceeded. Please retry later.",
trace_id=trace_id,
retryable=True,
),
)
async def events() -> AsyncIterator[str]:
slot_acquired = False
started = time.perf_counter()
try:
await acquire_slot(request.app, trace_id)
slot_acquired = True
yield sse("meta", {"trace_id": trace_id, "model_version": settings.model_version})
async with asyncio.timeout(settings.stream_timeout_s):
async for token in request.app.state.runtime.stream(payload, trace_id):
if await request.is_disconnected():
logger.info("stream_client_disconnected", extra={"trace_id": trace_id})
return
yield sse("token", {"text": token})
total_ms = int((time.perf_counter() - started) * 1000)
yield sse(
"done",
{
"trace_id": trace_id,
"finish_reason": "stop",
"latency_ms": {"total": total_ms},
},
)
except HTTPException as exc:
body = exc.detail if isinstance(exc.detail, dict) else error_detail(
"STREAM_ERROR",
str(exc.detail),
trace_id,
retryable=True,
)
yield sse("error", body["error"])
except asyncio.TimeoutError:
yield sse(
"error",
error_detail(
"STREAM_TIMEOUT",
"Streaming response timed out.",
trace_id,
retryable=True,
)["error"],
)
except Exception:
logger.exception("stream_failed", extra={"trace_id": trace_id})
yield sse(
"error",
error_detail(
"INTERNAL_ERROR",
"Unexpected model serving error.",
trace_id,
retryable=False,
)["error"],
)
finally:
if slot_acquired:
request.app.state.semaphore.release()
return StreamingResponse(
events(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
Ghi chú quan trọng:
GET /query/streamdễ dùng với browserEventSource, nhưng query nằm trên URL. Với dữ liệu nhạy cảm, dùngPOST /query/streamvà stream bằngfetch().- In-memory rate limiter chỉ đúng cho local hoặc một process. Production nên dùng Redis, API Gateway, Envoy, Kong, NGINX, Cloudflare hoặc service quota tập trung.
- Nếu dùng nhiều Uvicorn workers, mỗi worker có limiter và semaphore riêng. Với GPU local, nhiều worker có thể load nhiều bản model và gây OOM.
asyncio.timeout()cần Python 3.11+. Nếu project dùng Python cũ hơn, thay bằngasyncio.wait_for().- Nếu model runtime là sync CPU-bound, không gọi trực tiếp trong async endpoint. Đưa vào worker thread/process hoặc tách thành model server riêng.
5. Streaming SSE contract
SSE phù hợp khi server chỉ cần đẩy token/event xuống client:
client sends query
server yields token events
client renders partial answer
server yields done or error event
Event contract đề xuất:
event: meta
data: {"trace_id":"tr_123","model_version":"rag-api-v1"}
event: token
data: {"text":"Nhân viên "}
event: citation
data: {"source_id":"S1","document_id":"doc_hr_2026","chunk_id":"..."}
event: done
data: {"trace_id":"tr_123","finish_reason":"stop","latency_ms":{"total":1420}}
event: error
data: {"code":"STREAM_TIMEOUT","message":"Streaming response timed out.","trace_id":"tr_123","retryable":true}
SSE trade-off:
| Lựa chọn | Nên dùng khi | Không nên dùng khi |
|---|---|---|
| SSE | Chat completion, token stream, progress event một chiều | Cần client và server gửi event hai chiều liên tục |
| WebSocket | Collaboration, voice, realtime bidirectional, tool session stateful | Chỉ stream token một chiều |
| Non-streaming HTTP | Batch job, eval runner, automation, output ngắn | Chat UI cần phản hồi sớm |
Best default cho RAG chat: POST /query cho automation/eval và SSE endpoint cho UI.
6. Timeout, rate limit và concurrency limit
Các limit nên được thiết kế theo nhiều lớp:
| Layer | Control | Ví dụ |
|---|---|---|
| Input | question.max_length, max file size, max context docs | Reject trước khi gọi model |
| Output | max_output_tokens, stop sequence | Giảm latency và cost |
| Queue | queue_timeout_s | Không để request đợi vô hạn khi service bận |
| Runtime | request_timeout_s, upstream HTTP timeout | Fail clean thay vì treo worker |
| Tenant | requests/minute, tokens/minute | Bảo vệ quota và budget |
| GPU | max_concurrent_requests, KV cache budget | Tránh OOM |
| Network | reverse proxy timeout, idle timeout | Tránh stream bị cắt bất ngờ |
Rate limit trả 429 RATE_LIMITED. Concurrency limit thường trả 503 CONCURRENCY_LIMIT nếu request không lấy được slot trong queue timeout. Model timeout trả 504 MODEL_TIMEOUT.
Production nên tách rõ:
- Rate limit theo tenant, user, API key và token budget, không chỉ theo IP.
- Concurrency limit theo model/pool/GPU, không chỉ toàn service.
- Timeout theo stage: retrieval timeout, rerank timeout, generation timeout, total timeout.
7. Batching trade-off
Batching gom nhiều request để dùng GPU hiệu quả hơn. Với LLM, các runtime như vLLM/TGI thường dùng continuous batching để thêm request mới vào batch đang chạy.
| Lợi ích | Chi phí |
|---|---|
| Tăng tokens/second và GPU utilization | Tăng queue delay nếu max_wait_ms quá cao |
| Giảm overhead mỗi request | p95/p99 latency có thể xấu hơn |
| Hữu ích cho batch/eval/offline workload | Time-to-first-token của chat có thể chậm |
| Tận dụng tốt GPU cho model lớn | Cần tuning tránh OOM do batch quá lớn |
Metric cần đo trước khi bật batching mạnh:
- p50/p95/p99 latency.
- Time-to-first-token với streaming.
- Tokens/second tổng và tokens/second mỗi request.
- GPU utilization, memory usage, KV cache usage.
- Error rate, timeout rate, OOM rate.
Best rule:
Interactive chat: ưu tiên p95 latency và time-to-first-token.
Batch/offline inference: ưu tiên throughput và cost/request.
Nếu workload là RAG chat nhiều user, nên dùng FastAPI làm gateway và để vLLM/TGI xử lý continuous batching phía model server.
8. So sánh serving tools
| Tool | Phù hợp nhất | Điểm mạnh | Trade-off | Khi chọn |
|---|---|---|---|---|
| FastAPI | API gateway, RAG orchestration, business logic, model nhỏ | Python-native, schema rõ, dễ debug, dễ tích hợp auth/logging | Không tự giải quyết GPU scheduling, continuous batching hoặc model optimization | Default cho capstone, RAG app, wrapper trước model server |
| BentoML | Đóng gói Python model service, batchable inference, model registry đơn giản | Developer experience tốt cho Python ML, build image/service nhanh | Với LLM throughput cao vẫn cần runtime chuyên dụng phía sau | Classical ML, embedding/reranker service, team muốn packaging chuẩn |
| TorchServe | Serve PyTorch model bằng handler | Hợp nếu org đã chuẩn hóa TorchServe và model là PyTorch | Ít linh hoạt cho LLM/RAG custom, cần viết handler và ops riêng | PyTorch classifier/detector/ranker đã ổn định |
| Triton Inference Server | GPU inference throughput cao, multi-framework, dynamic batching | Mạnh cho TensorRT/ONNX/PyTorch backend, gRPC/HTTP, model repository | Ops phức tạp, cần hiểu GPU profiling và model format | High-throughput CV/NLP model, multi-model GPU serving |
| vLLM | LLM serving throughput cao | OpenAI-compatible API, continuous batching, memory-efficient KV cache | Phụ thuộc model architecture được hỗ trợ, cần GPU memory planning | Self-host LLM chat/completion, cần throughput tốt |
| TGI | Hugging Face text generation serving | Streaming, tensor parallel, hợp HF ecosystem | Tuning vẫn cần GPU/ops skill, API khác nhau theo phiên bản | Self-host HF model, muốn runtime chuyên cho generation |
Best solution theo context:
| Context | Giải pháp nên chọn |
|---|---|
| Mini-project hoặc RAG product mới | FastAPI gateway + RAG services rõ boundary |
| LLM self-host có traffic thật | FastAPI gateway + vLLM hoặc TGI phía sau |
| Model CV/NLP cần GPU throughput | FastAPI gateway + Triton |
| Classical ML hoặc embedding service Python-first | BentoML hoặc FastAPI tùy team |
| Org đã có PyTorch serving platform | TorchServe nếu vận hành đã mature |
| Cần ship nhanh, data không quá nhạy | FastAPI gateway + managed model API |
9. Best practices
- Load model/runtime một lần ở startup, warm up trước khi
/readytrảready. - Tách
/healthvà/ready. Liveness không nên fail vì vector DB chậm tạm thời. - Dùng Pydantic để validate request và response. Reject field lạ nếu contract cần chặt.
- Luôn trả
trace_id,model_versionhoặcpipeline_version. - Log structured JSON với stage latency, input length, output tokens, tenant và error code.
- Không log raw prompt/response nếu có PII, hoặc phải redact trước khi lưu.
- Đặt timeout ở gateway và upstream client. Không chỉ dựa vào proxy timeout.
- Có concurrency limit để bảo vệ GPU/KV cache/provider quota.
- Streaming phải handle client disconnect và vẫn release resource.
- Với RAG, response phải có citation và citation validator trước khi trả client.
- Với self-host LLM, dùng runtime chuyên dụng cho batching thay vì tự batch trong FastAPI.
- Với multi-tenant, rate limit theo tenant/user/API key và có token budget.
10. Dùng được trong production không?
Câu trả lời thực tế: pattern trong bài dùng được trong production, nhưng sample code chỉ là baseline học tập. Có thể đưa vào production khi thỏa các điều kiện sau:
- Runtime thật đã được tách rõ: model/RAG pipeline có version, health check, warmup và rollback plan.
- Rate limit chuyển từ in-memory sang Redis/API Gateway hoặc quota service tập trung.
- Có auth thật, tenant isolation và policy không log PII.
- Timeout được đặt ở gateway, upstream client, reverse proxy và model server.
- Concurrency limit được sizing bằng load test trên CPU/GPU thật.
- Với LLM self-host, FastAPI chỉ làm gateway, còn batching và KV cache do vLLM/TGI/Triton hoặc runtime chuyên dụng xử lý.
- Có metrics cho request count, error rate, timeout rate, p50/p95/p99 latency, tokens/sec, cost và GPU memory.
- Có CI test cho schema, error contract, streaming event format và regression test cho RAG output.
- Có deployment strategy: canary, rollback, model version pinning và dashboard.
Không nên gọi là production-ready nếu:
- Model load trong mỗi request.
- Không có timeout/concurrency limit.
- Không có trace id hoặc model version trong response.
- Dùng in-memory limiter với nhiều replicas nhưng tưởng là global quota.
- SSE stream không release resource khi client disconnect.
- Chưa load test p95 latency và OOM behavior.
11. Kết luận
FastAPI là lựa chọn tốt cho serving contract, orchestration và product boundary. Nhưng với GPU throughput hoặc LLM traffic nghiêm túc, FastAPI nên đứng trước model server chuyên dụng. Production serving không chỉ là "API chạy được", mà là một hệ thống có contract, limit, observability, versioning và rollback.
Tài liệu
1. Serving architecture template
Template kiến trúc cho model hoặc RAG pipeline:
Client/UI/Batch Job
-> API Gateway or FastAPI
-> Authentication
-> Tenant/User context
-> Request validation
-> Rate limit
-> Concurrency limit
-> Timeout wrapper
-> Model/RAG runtime adapter
-> Response contract or SSE stream
-> Structured logs, metrics, traces
Runtime adapter
-> Local model, BentoML, TorchServe, Triton, vLLM, TGI, or managed provider
Boundary quan trọng:
| Boundary | Trách nhiệm |
|---|---|
| API contract | Input/output schema, error schema, event schema |
| Gateway | Auth, validation, limit, timeout, trace id, response mapping |
| Runtime adapter | Gọi model server/provider, retry có kiểm soát, mapping lỗi |
| Model server | Load model, optimize inference, batching, GPU memory |
| Observability | Logs, metrics, traces, alert, dashboard |
2. Project structure mẫu
model-serving-api/
app/
main.py
api/
health.py
query.py
streaming.py
models.py
core/
config.py
errors.py
logging.py
limits.py
security.py
schemas/
query.py
errors.py
events.py
services/
runtime.py
rag_runtime.py
llm_client.py
metrics.py
tests/
test_contract.py
test_validation.py
test_streaming.py
test_limits.py
scripts/
smoke_test.py
stream_client.py
load_test.py
pyproject.toml
Dockerfile
.env.example
README.md
Với project nhỏ, có thể gom file lại. Nhưng vẫn nên giữ rõ 4 nhóm: api, schemas, services, core.
3. API contract template
GET /health
Mục đích: liveness. Không gọi dependency nặng.
Response:
{
"status": "ok",
"service": "model-serving-api",
"version": "1.0.0"
}
GET /ready
Mục đích: readiness. Có thể kiểm tra model loaded, runtime client, vector DB hoặc model server.
Response:
{
"status": "ready",
"model_version": "rag-api-v1",
"dependencies": {
"runtime": "ok",
"vector_db": "ok"
}
}
Nếu chưa sẵn sàng:
{
"error": {
"code": "NOT_READY",
"message": "Runtime is not ready.",
"trace_id": "tr_123",
"retryable": true
}
}
POST /query
Request:
{
"question": "Nhân viên full-time có bao nhiêu ngày nghỉ phép năm?",
"tenant_id": "demo",
"top_k": 6,
"max_output_tokens": 512,
"include_trace": false
}
Validation rules:
| Field | Rule | Lý do |
|---|---|---|
question | Required, 1..2000 chars | Chặn prompt rỗng hoặc quá dài |
tenant_id | Required/default, 1..64 chars | Rate limit và data isolation |
top_k | 1..20 | Chặn context quá lớn |
max_output_tokens | 16..2048 | Kiểm soát latency và cost |
include_trace | Boolean | Trace chi tiết chỉ nên bật khi cần |
Response:
{
"answer": "Nhân viên full-time có 12 ngày nghỉ phép năm [S1].",
"citations": [
{
"source_id": "S1",
"document_id": "doc_hr_2026",
"chunk_id": "demo:doc_hr_2026:v1:00003",
"title": "HR Policy 2026",
"page_start": 3,
"page_end": 3
}
],
"trace_id": "tr_123",
"latency_ms": {
"retrieval": 40,
"rerank": 130,
"generation": 900,
"total": 1070
},
"model_version": "rag-api-v1",
"finish_reason": "stop"
}
GET /query/stream
Query params:
question=Nhân viên full-time có bao nhiêu ngày nghỉ phép năm?
tenant_id=demo
top_k=6
max_output_tokens=512
Headers:
Accept: text/event-stream
Cache-Control: no-cache
Event format:
event: meta
data: {"trace_id":"tr_123","model_version":"rag-api-v1"}
event: token
data: {"text":"Nhân viên "}
event: citation
data: {"source_id":"S1","document_id":"doc_hr_2026","chunk_id":"demo:doc_hr_2026:v1:00003"}
event: done
data: {"trace_id":"tr_123","finish_reason":"stop","latency_ms":{"total":1070}}
event: error
data: {"code":"STREAM_TIMEOUT","message":"Streaming response timed out.","trace_id":"tr_123","retryable":true}
Production note:
- Dùng
GETnếu cần browserEventSourceđơn giản. - Dùng
POSTstreaming vớifetch()nếu query nhạy cảm hoặc payload lớn. - Disable buffering ở reverse proxy cho SSE, ví dụ
X-Accel-Buffering: novới NGINX. - Kiểm tra idle timeout của load balancer, CDN và ingress.
GET /models/current
Response:
{
"model_version": "rag-api-v1",
"runtime": "vLLM",
"model_name": "meta-llama/Llama-3.1-8B-Instruct",
"prompt_version": "answer-prompt-v3",
"deployed_at": "2026-05-10T10:00:00Z"
}
4. Error code catalog
| HTTP | Code | Retryable | Khi nào |
|---|---|---|---|
| 400 | BAD_REQUEST | No | Payload sai logic nhưng qua schema |
| 401 | UNAUTHORIZED | No | Thiếu hoặc sai auth |
| 403 | FORBIDDEN | No | User không có quyền tenant/model |
| 422 | VALIDATION_ERROR | No | FastAPI/Pydantic reject schema |
| 429 | RATE_LIMITED | Yes | Vượt request/token quota |
| 503 | NOT_READY | Yes | Runtime chưa load xong |
| 503 | CONCURRENCY_LIMIT | Yes | Hết slot inference |
| 504 | MODEL_TIMEOUT | Yes | Runtime hoặc upstream quá timeout |
| 500 | INTERNAL_ERROR | Maybe | Lỗi bất ngờ, cần trace |
Rule:
- Client chỉ thấy message an toàn.
- Server log giữ exception type, stack trace và upstream error.
- Mọi error phải có
trace_id.
5. Config template
.env.example:
APP_ENV=local
SERVICE_NAME=model-serving-api
MODEL_VERSION=rag-api-v1
REQUEST_TIMEOUT_S=20
STREAM_TIMEOUT_S=60
QUEUE_TIMEOUT_S=0.2
MAX_CONCURRENT_REQUESTS=8
RATE_LIMIT_WINDOW_S=60
RATE_LIMIT_REQUESTS=60
RATE_LIMIT_TOKENS=120000
RUNTIME_KIND=fake
RUNTIME_BASE_URL=http://vllm:8000/v1
RUNTIME_API_KEY=change-me
LOG_LEVEL=INFO
REDACT_PROMPTS=true
Timeout sizing gợi ý:
| Workload | Request timeout | Stream timeout | Queue timeout |
|---|---|---|---|
| Classifier CPU nhỏ | 1-3s | N/A | 50-100ms |
| Embedding batch nhỏ | 5-15s | N/A | 100-300ms |
| RAG non-streaming | 15-30s | N/A | 100-500ms |
| LLM chat streaming | 10-30s first response, 60-180s total | 60-180s | 100-500ms |
Không copy số này vào production một cách máy móc. Phải đo trên model, GPU, context length và traffic thật.
6. Structured log fields
Log một request thành công:
{
"event": "query_ok",
"trace_id": "tr_123",
"tenant_id": "demo",
"user_id": "u_456",
"route": "/query",
"input_chars": 68,
"prompt_tokens": 2100,
"completion_tokens": 180,
"latency_ms": {
"retrieval": 40,
"rerank": 130,
"generation": 900,
"total": 1070
},
"model_version": "rag-api-v1",
"finish_reason": "stop"
}
Log một request lỗi:
{
"event": "query_failed",
"trace_id": "tr_123",
"tenant_id": "demo",
"route": "/query",
"error_code": "MODEL_TIMEOUT",
"retryable": true,
"latency_ms": {
"total": 20010
},
"model_version": "rag-api-v1"
}
Không nên log raw prompt/response mặc định. Nếu cần debug, bật sampling thấp, redact PII và giới hạn retention.
7. Tool decision matrix
| Context | Best default | Vì sao | Alternative |
|---|---|---|---|
| RAG app hoặc product API | FastAPI gateway | Cần business logic, auth, schema, trace, citation | BentoML nếu muốn package pipeline như model service |
| LLM self-host nhiều traffic | FastAPI + vLLM/TGI | Gateway giữ product contract, runtime giữ batching/KV cache | Managed LLM API nếu muốn giảm ops |
| CV/NLP model cần throughput GPU | FastAPI + Triton | Triton mạnh về dynamic batching và model repository | BentoML nếu throughput vừa phải |
| Classical ML model Python | FastAPI hoặc BentoML | Dễ vận hành, dễ test | TorchServe nếu org chuẩn PyTorch |
| PyTorch model đã có handler chuẩn | TorchServe | Hợp nếu platform đã có sẵn | Triton hoặc BentoML |
| Offline/batch inference | BentoML/Triton/job runner | Throughput quan trọng hơn p95 user latency | FastAPI nếu vẫn cần online API |
Short rule:
FastAPI = product/API boundary.
vLLM/TGI/Triton = high-throughput runtime boundary.
BentoML = Python model packaging boundary.
TorchServe = PyTorch serving boundary khi org đã chuẩn hóa.
8. Batching checklist
Trước khi bật batching:
- Biết workload là interactive, batch, hay mixed.
- Có baseline p50/p95/p99 latency khi batch off hoặc batch nhỏ.
- Có metric time-to-first-token cho streaming.
- Có metric tokens/sec và GPU utilization.
- Có limit cho
max_batch_size,max_wait_ms,max_num_batched_tokens. - Có timeout nếu request đứng trong queue quá lâu.
- Có test với prompt dài và output dài.
- Có alert cho OOM, timeout rate và queue depth.
Decision:
| Nếu mục tiêu là | Tối ưu |
|---|---|
| Chat UI phản hồi nhanh | max_wait_ms thấp, concurrency vừa phải, stream sớm |
| Batch eval qua đêm | Batch lớn hơn, queue lâu hơn được |
| Cost thấp | Throughput/GPU utilization cao, chấp nhận latency |
| SLA p95 chặt | Batch nhỏ hơn, scale replicas hoặc dùng model nhỏ hơn |
9. Security checklist
- Auth bắt buộc cho endpoint inference.
- Tenant/user context không lấy hoàn toàn từ body nếu client có thể giả mạo.
- Rate limit theo tenant/user/API key.
- Token budget theo tenant để tránh cost spike.
- Không trả stack trace, provider error raw hoặc prompt nội bộ ra client.
- Redact prompt/response trong logs.
- Nếu RAG multi-tenant, filter ACL ở retriever, không giao cho prompt.
- Validate content type và payload size.
- Có allowlist model nếu client được chọn model.
- Có audit log cho model version, prompt version và deployment.
10. Production readiness checklist
Contract
- OpenAPI schema rõ cho request/response/error.
- Có
trace_idtrong header và response body. - Có
model_versionhoặcpipeline_version. - Có
finish_reason. - SSE event có
meta,token,done,error.
Reliability
-
/healthvà/readytách riêng. - Startup load/warmup model một lần.
- Timeout ở gateway, upstream client và proxy.
- Concurrency limit đã sizing bằng load test.
- Rate limit global, không chỉ process-local.
- Graceful shutdown không cắt request đang xử lý quá thô.
Performance
- Đo p50/p95/p99 latency.
- Đo time-to-first-token.
- Đo tokens/sec và request/sec.
- Đo CPU/GPU memory.
- Đã test prompt dài, output dài và burst traffic.
- Batching được tuning theo SLA.
Observability
- Structured logs có trace id, tenant, route, latency, model version.
- Metrics có request count, error rate, timeout rate, latency histogram.
- Dashboard có queue depth/concurrency/GPU memory nếu self-host.
- Alert cho timeout spike, OOM, readiness fail và error rate.
Release
- Canary hoặc blue/green deploy.
- Rollback model/prompt/runtime version.
- Smoke test sau deploy.
- Contract test trong CI.
- Eval regression cho RAG/model quality.
11. Incident runbook
Timeout tăng mạnh
- Kiểm tra p95/p99 latency theo stage: retrieval, rerank, generation, upstream.
- Kiểm tra queue depth và concurrency.
- Kiểm tra prompt length và output tokens có tăng không.
- Tạm giảm
max_output_tokenshoặctop_knếu cần bảo vệ service. - Scale runtime hoặc rollback model nếu latency tăng sau deploy.
GPU OOM
- Dừng rollout hoặc giảm traffic.
- Giảm
max_concurrent_requests, batch size hoặc max context/output tokens. - Kiểm tra model replicas có bị nhân đôi do nhiều worker không.
- Kiểm tra KV cache usage và prompt length distribution.
- Chạy lại load test trước khi mở traffic.
SSE stream bị cắt
- Kiểm tra reverse proxy/CDN/ingress idle timeout.
- Kiểm tra buffering có bị bật không.
- Kiểm tra server có gửi heartbeat hoặc token đều không.
- Kiểm tra client disconnect rate.
- Log
finish_reasonđể phân biệtdone,timeout,client_disconnected.
Model version mismatch
- So sánh
/models/currentvới deployment manifest. - Kiểm tra registry version và image tag.
- Kiểm tra cache hoặc replica cũ chưa drain.
- Rollback hoặc restart replica lệch version.
- Gắn test vào CI/CD để chặn mismatch lần sau.
12. Câu trả lời production readiness mẫu
Pattern này dùng được trong production nếu FastAPI chỉ giữ API/product boundary,
runtime được load/warmup đúng cách, limit được quản lý global, timeout được đặt ở
mọi layer, và hệ thống có observability + rollback.
Sample local chưa production-ready vì rate limiter là in-memory, fake runtime,
chưa có auth thật, chưa có Redis/API Gateway quota, chưa có dashboard/alert,
và chưa được load test trên traffic + GPU thật.
Bài tập
Mục tiêu
Bạn sẽ triển khai một API serving cho model hoặc RAG pipeline với:
GET /healthGET /readyGET /models/currentPOST /queryGET /query/streamdùng Server-Sent Events- Pydantic request/response validation
- Timeout, rate limit và concurrency limit
- Structured logs có
trace_id - Test script cho non-streaming và streaming
- README trả lời production readiness
Thời lượng đề xuất:
- Bản tối thiểu: 60-90 phút.
- Bản tốt cho portfolio: 0.5-1 ngày.
- Bản gần production: 2-3 ngày, thêm auth, Redis limiter, Docker, metrics và CI.
0. Acceptance criteria
Hoàn thành bài tập khi bạn có:
-
POST /queryreject request sai schema bằng422. -
POST /querytrảanswer,citations,trace_id,latency_ms,model_version,finish_reason. -
GET /query/streamtrả eventmeta, nhiều eventtoken, và eventdone. - Timeout trả error contract có code
MODEL_TIMEOUThoặcSTREAM_TIMEOUT. - Rate limit trả
429 RATE_LIMITED. - Concurrency limit trả
503 CONCURRENCY_LIMIThoặc SSEerror. - Log có
trace_id,tenant_id,input_chars,latency_ms,model_version. - Có script test streaming bằng Python hoặc
curl -N. - README trả lời: "Dùng được trong production không? Nếu có thì cần điều kiện gì?"
1. Chọn runtime
Chọn một trong ba mức:
| Mức | Runtime | Khi nào dùng |
|---|---|---|
| Local fake runtime | Hàm async giả lập model | Học API boundary nhanh |
| RAG runtime từ Day 40 | Gọi retrieval + generation pipeline | Muốn nối với mini-project RAG |
| LLM runtime thật | Gọi vLLM/TGI/managed LLM qua HTTP | Muốn demo self-host hoặc provider thật |
Khuyến nghị cho lần đầu: dùng fake runtime trước. Sau khi contract ổn, thay runtime bằng RAG hoặc LLM thật.
2. Scaffold project
Tạo folder:
day-42-serving-lab/
app/
main.py
schemas.py
runtime.py
limits.py
scripts/
smoke_test.py
stream_client.py
pyproject.toml
README.md
Dependencies tối thiểu:
[project]
requires-python = ">=3.11"
dependencies = [
"fastapi",
"uvicorn[standard]",
"httpx",
"pydantic",
]
Nếu dùng config từ env, thêm:
"pydantic-settings"
Chạy local:
uvicorn app.main:app --reload --port 8000
3. Implement schema
Tạo app/schemas.py:
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field
class Citation(BaseModel):
source_id: str
document_id: str
chunk_id: str
title: str
page_start: int | None = None
page_end: int | None = None
class QueryRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
question: str = Field(..., min_length=1, max_length=2000)
tenant_id: str = Field("demo", min_length=1, max_length=64)
top_k: int = Field(6, ge=1, le=20)
max_output_tokens: int = Field(512, ge=16, le=2048)
include_trace: bool = False
class QueryResponse(BaseModel):
answer: str
citations: list[Citation]
trace_id: str
latency_ms: dict[str, int]
model_version: str
finish_reason: Literal["stop", "timeout", "error"]
Test nhanh validation:
curl -s -X POST http://localhost:8000/query \
-H 'content-type: application/json' \
-d '{"question":"","unknown_field":true}' | jq
Kỳ vọng: FastAPI trả 422.
4. Implement fake runtime
Tạo app/runtime.py:
import asyncio
import time
from collections.abc import AsyncIterator
from app.schemas import Citation, QueryRequest, QueryResponse
class Runtime:
def __init__(self, model_version: str) -> None:
self.model_version = model_version
self.loaded = False
async def startup(self) -> None:
await asyncio.sleep(0.05)
self.loaded = True
async def shutdown(self) -> None:
self.loaded = False
async def query(self, payload: QueryRequest, trace_id: str) -> QueryResponse:
started = time.perf_counter()
await asyncio.sleep(0.15)
return QueryResponse(
answer="Câu trả lời mẫu có citation [S1].",
citations=[
Citation(
source_id="S1",
document_id="demo-doc",
chunk_id="demo-doc:v1:00001",
title="Demo Document",
)
],
trace_id=trace_id,
latency_ms={"generation": int((time.perf_counter() - started) * 1000)},
model_version=self.model_version,
finish_reason="stop",
)
async def stream(self, payload: QueryRequest, trace_id: str) -> AsyncIterator[str]:
tokens = ["Câu ", "trả ", "lời ", "đang ", "được ", "stream ", "qua ", "SSE."]
for token in tokens:
await asyncio.sleep(0.08)
yield token
Nâng cấp sau:
- Với RAG Day 40:
query()gọi retrieval, rerank, generator và citation validator. - Với vLLM/TGI:
query()gọi HTTP endpoint,stream()parse upstream stream rồi map về SSE event contract của bạn. - Với Triton:
query()gọi gRPC/HTTP client và map tensor output về response schema.
5. Implement rate/concurrency limit
Tạo app/limits.py:
import asyncio
import time
from collections import defaultdict, deque
class InMemoryRateLimiter:
def __init__(self, max_requests: int, window_s: int) -> None:
self.max_requests = max_requests
self.window_s = window_s
self._hits: dict[str, deque[float]] = defaultdict(deque)
self._lock = asyncio.Lock()
async def allow(self, key: str) -> bool:
now = time.monotonic()
async with self._lock:
bucket = self._hits[key]
while bucket and now - bucket[0] > self.window_s:
bucket.popleft()
if len(bucket) >= self.max_requests:
return False
bucket.append(now)
return True
Ghi rõ trong README:
In-memory limiter chỉ dùng cho local. Production cần Redis/API Gateway/quota service
để limit có hiệu lực trên nhiều process và nhiều replicas.
6. Implement FastAPI app
Tạo app/main.py. Bạn có thể dùng code trong lession.md làm baseline, hoặc tự implement với các yêu cầu:
lifespanload runtime một lần.- Middleware tạo
trace_idvà trả headerx-trace-id. - Exception handler trả error contract.
/healthkhông gọi dependency nặng./readykiểm tra runtime loaded./querybọc runtime call bằng timeout./query/streamtrảStreamingResponsevớitext/event-stream.- Streaming generator release semaphore trong
finally.
Checklist code:
- Không gọi
Runtime()trong từng request. - Không dùng bare
exceptrồi nuốt lỗi. - Không trả stack trace ra client.
- Không để request chờ semaphore vô hạn.
- Không log raw prompt nếu data có thể chứa PII.
7. Test bằng curl
Health:
curl -s http://localhost:8000/health | jq
curl -s http://localhost:8000/ready | jq
curl -s http://localhost:8000/models/current | jq
Non-streaming:
curl -s -X POST http://localhost:8000/query \
-H 'content-type: application/json' \
-H 'x-api-key: dev-key' \
-d '{
"question": "Day 42 học gì?",
"tenant_id": "demo",
"top_k": 6,
"max_output_tokens": 256
}' | jq
Streaming:
curl -N 'http://localhost:8000/query/stream?tenant_id=demo&question=Day%2042%20h%E1%BB%8Dc%20g%C3%AC%3F&max_output_tokens=128'
Kỳ vọng:
event: meta
data: {"trace_id":"...","model_version":"..."}
event: token
data: {"text":"Câu "}
event: done
data: {"trace_id":"...","finish_reason":"stop","latency_ms":{"total":...}}
8. Viết smoke test
Tạo scripts/smoke_test.py:
import asyncio
import httpx
BASE_URL = "http://localhost:8000"
async def main() -> None:
async with httpx.AsyncClient(timeout=30) as client:
health = await client.get(f"{BASE_URL}/health")
health.raise_for_status()
response = await client.post(
f"{BASE_URL}/query",
json={
"question": "Day 42 học gì?",
"tenant_id": "demo",
"top_k": 6,
"max_output_tokens": 128,
},
)
response.raise_for_status()
body = response.json()
assert body["trace_id"]
assert body["model_version"]
assert body["finish_reason"] == "stop"
assert "total" in body["latency_ms"]
print("smoke test ok", body["trace_id"])
if __name__ == "__main__":
asyncio.run(main())
Chạy:
python scripts/smoke_test.py
9. Viết streaming client test
Tạo scripts/stream_client.py:
import asyncio
import httpx
BASE_URL = "http://localhost:8000"
async def main() -> None:
params = {
"question": "Day 42 học gì?",
"tenant_id": "demo",
"max_output_tokens": 128,
}
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("GET", f"{BASE_URL}/query/stream", params=params) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line:
print(line)
if __name__ == "__main__":
asyncio.run(main())
Chạy:
python scripts/stream_client.py
Acceptance:
- Nhìn thấy
event: meta. - Nhìn thấy nhiều
event: token. - Nhìn thấy
event: done. - Nếu giảm timeout rất thấp, nhìn thấy
event: error.
10. Test timeout
Sửa fake runtime tạm thời:
async def query(self, payload, trace_id):
await asyncio.sleep(999)
return QueryResponse(
answer="timeout simulation",
citations=[],
trace_id=trace_id,
latency_ms={"total": 999000},
model_version="fake-rag-v1",
finish_reason="timeout",
)
Giảm config:
request_timeout_s = 1.0
stream_timeout_s = 1.0
Gọi:
curl -s -X POST http://localhost:8000/query \
-H 'content-type: application/json' \
-d '{"question":"timeout test","tenant_id":"demo"}' | jq
Kỳ vọng:
{
"error": {
"code": "MODEL_TIMEOUT",
"message": "Model runtime timed out. Please retry.",
"trace_id": "...",
"retryable": true
}
}
Sau test, revert phần sleep giả lập trong lab code của bạn.
11. Test rate limit
Đặt rate_limit_requests = 2, rate_limit_window_s = 60.
Gọi 3 lần liên tiếp:
for i in 1 2 3; do
curl -s -o /dev/null -w "%{http_code}\n" \
-X POST http://localhost:8000/query \
-H 'content-type: application/json' \
-H 'x-api-key: same-client' \
-d '{"question":"rate test","tenant_id":"demo"}'
done
Kỳ vọng:
200
200
429
12. Test concurrency limit
Đặt:
max_concurrent_requests = 1
queue_timeout_s = 0.05
Tạo nhiều request song song:
import asyncio
import httpx
async def call(i: int) -> None:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(
"http://localhost:8000/query",
json={"question": f"concurrency test {i}", "tenant_id": "demo"},
)
print(i, response.status_code, response.text[:120])
async def main() -> None:
await asyncio.gather(*(call(i) for i in range(10)))
asyncio.run(main())
Kỳ vọng: một số request thành công, một số request trả 503 CONCURRENCY_LIMIT.
13. Benchmark batching trade-off
Nếu bạn dùng vLLM/TGI/Triton hoặc runtime có batching, chạy ít nhất 3 cấu hình:
| Config | Mục tiêu |
|---|---|
| Batch nhỏ, wait thấp | Baseline latency |
| Batch vừa | Cân bằng latency/throughput |
| Batch lớn, wait cao | Throughput tối đa |
Ghi lại:
| Metric | Batch nhỏ | Batch vừa | Batch lớn |
|---|---|---|---|
| request/sec | |||
| tokens/sec | |||
| p50 latency | |||
| p95 latency | |||
| p99 latency | |||
| time-to-first-token | |||
| timeout rate | |||
| GPU memory peak |
Viết kết luận:
Với interactive chat, chọn config ... vì p95 và time-to-first-token tốt hơn.
Với offline batch, chọn config ... vì throughput cao hơn và SLA latency không chặt.
14. Thay fake runtime bằng runtime thật
Option A: RAG pipeline từ Day 40
Map flow:
QueryRequest
-> normalize question
-> retrieve with tenant/ACL filter
-> rerank
-> build context
-> generate answer
-> validate citation
-> QueryResponse
Yêu cầu:
tenant_idphải đi vào retriever filter.top_kphải có upper bound.- Response phải có citation thật.
- Trace latency phải tách retrieval, rerank, generation.
Option B: vLLM/TGI hoặc managed LLM
Map flow:
QueryRequest
-> prompt builder
-> HTTP client with timeout
-> parse completion or stream chunks
-> QueryResponse/SSE events
Yêu cầu:
- HTTP client có timeout.
- Không retry vô hạn.
- Map upstream timeout thành
MODEL_TIMEOUT. - Map upstream rate limit thành
UPSTREAM_RATE_LIMITEDhoặcRATE_LIMITED. - Không leak API key/provider error raw ra client.
Option C: Triton/BentoML/TorchServe
Map flow:
QueryRequest
-> preprocess
-> model server request
-> postprocess
-> QueryResponse
Yêu cầu:
- Version model rõ.
- Preprocess/postprocess có test.
- Timeout và batch behavior được đo.
15. README production readiness
README phải có một section:
## Production Readiness
### Dùng được trong production không?
Pattern API này có thể dùng trong production, nhưng bản lab hiện tại chưa đủ
production-ready.
### Điều kiện cần để production-ready
- Thay in-memory rate limiter bằng Redis/API Gateway/quota service.
- Thêm authentication và tenant authorization thật.
- Không log raw prompt/response hoặc phải redact PII.
- Chạy load test để sizing timeout, concurrency và batching.
- Thêm metrics, dashboard và alert cho latency, timeout, error rate, queue depth.
- Nếu self-host LLM, đặt FastAPI trước vLLM/TGI/Triton thay vì tự batch trong FastAPI.
- Thêm canary deploy, rollback model version và contract test trong CI.
16. Câu hỏi review
Trả lời các câu sau sau khi hoàn thành lab:
- Contract của
/querycó field nào giúp debug production? - Vì sao
/healthvà/readykhông nên giống nhau? - Vì sao in-memory rate limiter sai khi chạy nhiều replicas?
- Concurrency limit bảo vệ điều gì trong LLM serving?
- Batching cải thiện metric nào và có thể làm xấu metric nào?
- Khi nào nên chọn FastAPI-only?
- Khi nào nên chọn FastAPI + vLLM/TGI?
- Khi nào Triton hợp lý hơn FastAPI-only?
- SSE có lợi gì so với non-streaming HTTP cho chat UI?
- Bản lab của bạn còn thiếu gì để production-ready?
17. Rubric tự chấm
| Mức | Tiêu chí |
|---|---|
| Đạt | Có /query, /query/stream, validation, timeout và README production readiness |
| Khá | Có rate/concurrency limit, structured logs, smoke test và stream client |
| Tốt | Có runtime thật, trace latency theo stage, benchmark p95/TTFT và error contract đầy đủ |
| Gần production | Có auth, Redis/API Gateway limiter, metrics/dashboard, Docker, CI contract test và rollout/rollback plan |