- Published on
Day 4: Python ML Stack
- Authors

- Name
- Trần Mạnh Thắng
- @TranManhThang96
Mục tiêu
Sau bài này, bạn cần làm được các việc sau:
- Dùng NumPy để hiểu
ndarray,shape,dtype, broadcasting và vectorization ở mức đủ để debug ML code. - Dùng Pandas cho EDA, data cleaning,
groupby,merge, missing value analysis và kiểm tra schema dữ liệu. - Dùng scikit-learn
PipelinevàColumnTransformerđể đóng gói preprocessing cùng model, tránh train-serving skew. - Train và so sánh Logistic Regression, Random Forest, Gradient Boosting trên Titanic-style dataset.
- Lưu pipeline artifact kèm metadata, rồi viết inference function có input validation.
TL;DR
Python ML stack phổ biến gồm NumPy cho numerical compute, Pandas cho data wrangling, scikit-learn cho training pipeline, Matplotlib/Seaborn cho visualization và joblib cho artifact nhỏ-vừa. Với Senior SE, hãy xem ML pipeline như một service build artifact: input có schema, transform có version, model có metrics, artifact có metadata và inference có contract rõ ràng. Notebook rất tốt cho exploration, nhưng production cần logic repeatable trong script/package, test được và có monitoring.
1. Bức tranh tổng thể
Một ML workflow tabular cơ bản thường đi qua các bước:
raw data
-> data loading
-> schema check
-> EDA
-> train/test split
-> preprocessing
-> model training
-> evaluation
-> artifact packaging
-> batch/API inference
-> monitoring
Map về tư duy software engineering:
| ML stack | SE analogy | Điều cần kiểm soát |
|---|---|---|
| NumPy array | Binary buffer/vector payload | Shape, dtype, memory layout |
| Pandas DataFrame | In-memory table/batch ETL | Column schema, null, join cardinality |
| scikit-learn transformer | Pure function có fit/transform | Fit chỉ trên train data |
| scikit-learn estimator | Business logic được học từ data | Params, random seed, metrics |
| Pipeline artifact | Deployable binary | Version, compatibility, rollback |
| Notebook | Spike/experiment document | Không phải source of truth duy nhất |
Best solution trong Phase 1: dùng Pandas + scikit-learn Pipeline cho tabular ML nhỏ-vừa, vì API đơn giản, ecosystem mạnh, dễ chuyển từ notebook sang script. Chưa cần Spark, MLflow hay feature store trừ khi dữ liệu lớn, team đông hoặc cần governance nghiêm ngặt.
2. NumPy: nền tảng của numerical compute
ndarray là mảng N chiều đồng nhất kiểu dữ liệu. Trong ML, dữ liệu thường có shape:
X: (n_samples, n_features)
y: (n_samples,)
Ví dụ:
import numpy as np
X = np.array(
[
[22, 7.25],
[38, 71.28],
[26, 7.92],
],
dtype=np.float64,
)
w = np.array([0.03, 0.01])
scores = X @ w
print(X.shape) # (3, 2)
print(X.dtype) # float64
print(scores) # vector score cho 3 rows
Điểm cần nhớ:
shapelà contract. Nếu training dùng 8 features mà inference đưa 7 features, kết quả phải fail sớm.dtypeảnh hưởng memory và tốc độ.float64chính xác hơn nhưng tốn memory hơnfloat32.- Vectorization đẩy compute xuống native code, thường nhanh hơn Python loop rất nhiều.
- Broadcasting tiện nhưng dễ tạo bug nếu shape không rõ ràng.
- Slicing có thể tạo view chia sẻ memory; copy/view không rõ có thể gây mutation ngoài ý muốn.
Ví dụ broadcasting:
X = np.array([[1.0, 10.0], [2.0, 20.0], [3.0, 30.0]])
mean = X.mean(axis=0)
std = X.std(axis=0)
scaled = (X - mean) / std
Ở production, bạn hiếm khi tự scale bằng NumPy nếu dùng scikit-learn. Nhưng hiểu cơ chế này giúp debug StandardScaler, feature matrix và lỗi shape.
3. Pandas: data wrangling như batch ETL
Pandas DataFrame là bảng có index, column name và dtype. Với ML tabular, Pandas phù hợp cho:
- Inspect schema.
- Thống kê missing values.
- Tạo feature đơn giản.
- Join dữ liệu nguồn.
- EDA trước khi đóng gói preprocessing vào pipeline.
Các thao tác cốt lõi:
import pandas as pd
df = pd.DataFrame(
{
"pclass": [3, 1, 3],
"sex": ["male", "female", "female"],
"age": [22.0, 38.0, None],
"fare": [7.25, 71.28, 7.92],
"survived": [0, 1, 1],
}
)
selected = df[["pclass", "sex", "age"]]
adults = df[df["age"].fillna(0) >= 18]
survival_by_class = df.groupby("pclass")["survived"].agg(["count", "mean"])
missing_ratio = df.isna().mean().sort_values(ascending=False)
Map Pandas sang SQL:
| Pandas | SQL |
|---|---|
df[cols] | SELECT cols |
df[df["age"] >= 18] | WHERE age >= 18 |
groupby().agg() | GROUP BY |
merge() | JOIN |
sort_values() | ORDER BY |
drop_duplicates() | DISTINCT |
Production concern:
- Pandas load data vào RAM. Nếu dataset lớn hơn memory, cân nhắc DuckDB, Polars, database query, Spark hoặc chunk processing.
objectdtype tốn memory và dễ lẫn kiểu. Với dữ liệu category, kiểm tra unique count và null ratio.- Join có thể nhân bản dòng nếu key không unique. Luôn kiểm tra row count trước/sau
merge. - Không hard-code EDA transformation vào notebook rồi quên đưa vào training pipeline.
4. scikit-learn mental model
scikit-learn xoay quanh 3 interface chính:
transformer.fit(X_train)
X_train_transformed = transformer.transform(X_train)
X_test_transformed = transformer.transform(X_test)
model.fit(X_train_transformed, y_train)
y_pred = model.predict(X_test_transformed)
Pipeline gộp các bước lại:
raw DataFrame
-> impute missing values
-> scale numerical columns
-> encode categorical columns
-> classifier
Điểm cực kỳ quan trọng: các bước có học tham số từ dữ liệu như imputer, scaler, encoder phải fit trên train set, sau đó chỉ transform validation/test/production data. Nếu fit trên toàn bộ dataset trước khi split, bạn đã leak thông tin từ test set vào training.
5. ColumnTransformer cho dữ liệu mixed type
Dữ liệu business thường có numerical và categorical columns. Mỗi nhóm cần preprocessing khác nhau:
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
numeric_features = ["age", "sibsp", "parch", "fare"]
categorical_features = ["pclass", "sex", "embarked"]
numeric_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
]
)
categorical_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("encoder", OneHotEncoder(handle_unknown="ignore")),
]
)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_pipeline, numeric_features),
("cat", categorical_pipeline, categorical_features),
]
)
Vì sao handle_unknown="ignore" quan trọng? Production luôn có category mới: một cảng mới, plan mới, campaign mới, country code mới. Nếu encoder fail toàn bộ request vì category mới, service sẽ brittle. ignore giúp request vẫn chạy, nhưng bạn cần monitor category drift vì quality có thể giảm.
6. Notebook-to-production workflow
Notebook tốt cho:
- EDA nhanh.
- Plot distribution.
- So sánh hypothesis.
- Ghi chú reasoning.
- Demo kết quả.
Notebook không nên là production source duy nhất vì:
- Cell execution order dễ sai.
- Config và seed không rõ.
- Khó test tự động.
- Khó review diff.
- Artifact tạo ra không có metadata.
Workflow đề xuất:
01_explore.ipynb
-> ghi insight, plot, assumption
src/data.py
-> load data, validate schema, split
src/features.py
-> build preprocessing pipeline
src/train.py
-> train, evaluate, save artifact
src/predict.py
-> load artifact, validate request, predict
tests/
-> schema test, prediction smoke test
Trong repo học này, bạn chưa cần tạo package đầy đủ ngay. Nhưng exercise sẽ mô phỏng các thành phần quan trọng bằng một script gần production.
7. Chọn model baseline
Với Titanic-style tabular classification, ba model hợp lý để so sánh:
| Model | Khi nên dùng | Điểm mạnh | Hạn chế |
|---|---|---|---|
| Logistic Regression | Baseline đầu tiên | Nhanh, dễ giải thích, ít overfit | Cần feature engineering cho quan hệ phi tuyến |
| Random Forest | Baseline tree mạnh | Bắt nonlinear pattern, ít preprocessing numerical hơn | Artifact lớn hơn, latency cao hơn linear model |
| HistGradientBoosting | Tabular mạnh, training nhanh | Hiệu quả với numeric pattern | Categorical vẫn cần encode, tuning cần cẩn thận |
Best solution theo context:
- Học nền tảng/need explainability: bắt đầu với Logistic Regression.
- Tabular data nhỏ-vừa và muốn quality tốt nhanh: thử Random Forest hoặc Gradient Boosting.
- Production latency rất thấp hoặc cần explainability cao: ưu tiên linear model hoặc tree nhỏ.
- Dataset lớn, feature cardinality cao: đo memory của one-hot; cân nhắc hashing, target encoding có kiểm soát hoặc model khác.
Trade-offs
| Lựa chọn | Nên dùng khi | Không nên dùng khi | Guidance cụ thể |
|---|---|---|---|
| Pandas | EDA, training batch nhỏ-vừa, dữ liệu fit RAM | Dataset quá RAM, realtime low-latency path | Dùng mặc định trong Day 4, nhưng không đưa Pandas-heavy transform vào hot path nếu latency nghiêm ngặt |
| NumPy vectorization | Compute lặp trên array lớn | Logic branch phức tạp per row | Ưu tiên vectorize feature computation thay vì Python loop |
| scikit-learn Pipeline | Preprocessing + model tabular | Deep learning training loop phức tạp | Mặc định dùng để tránh train-serving skew |
| OneHotEncoder | Categorical low/medium cardinality | Hàng chục nghìn category | Monitor feature explosion; dùng sparse output khi phù hợp |
| Random split | Data IID, không phụ thuộc thời gian | Time-series, user lifecycle, event stream | Với production có thời gian, dùng time-based split |
| joblib artifact | Internal artifact từ training trusted | Artifact không rõ nguồn | Không load artifact từ untrusted source |
Best practices từ industry
- Split data trước khi fit preprocessing để tránh leakage.
- Luôn lưu preprocessing và model trong cùng một
Pipeline. - Lưu metadata cạnh artifact: feature list, target, metrics, dataset source, package version, random seed.
- Validate schema ở cả training và inference. Fail rõ ràng khi thiếu cột hoặc sai kiểu nghiêm trọng.
- Log metrics không chỉ accuracy. Với classification, luôn xem precision, recall, F1 và ROC-AUC nếu có probability.
- Pin dependency version cho artifact quan trọng vì pickle/joblib phụ thuộc Python và package version.
- Không dùng notebook làm cron job production; chuyển sang script/package có CLI, config và test.
Performance considerations
- Pandas giữ DataFrame trong RAM;
objectstring column có overhead lớn. Dùngdf.info(memory_usage="deep")để ước lượng tốt hơn. - One-hot encoding có thể làm feature matrix phình mạnh. Nếu 1 triệu rows và 50.000 category, one-hot dense là không khả thi.
- Sparse matrix tiết kiệm memory cho one-hot/text, nhưng không phải model nào cũng xử lý sparse tối ưu như nhau.
n_jobs=-1tăng throughput training/prediction với một số estimator, nhưng có thể tranh CPU với service khác.- Batch prediction thường nhanh hơn single-row prediction vì giảm overhead Python.
- Scaling numerical features rất quan trọng với Logistic Regression/SVM/KNN, ít quan trọng hơn với tree-based models.
Production concerns
Data quality
- Missing value ratio tăng đột biến có thể là upstream pipeline bug.
- Category mới tăng nhanh là signal drift.
- Numeric range vượt xa training distribution cần alert.
- Duplicate key sau join có thể làm label distribution sai.
Reliability
- Training script phải deterministic ở mức hợp lý: set
random_state, lưu split strategy, lưu config. - Artifact cần versioning và rollback.
- Inference phải trả lỗi rõ ràng cho invalid payload, không silently reorder/missing feature.
Security
pickle/joblibcó thể thực thi code khi load object độc hại. Chỉ load artifact do pipeline build đáng tin cậy tạo ra.- Không log raw PII trong prediction logs. Với Titanic exercise không có PII thật, nhưng production customer data thường có.
Observability
Tối thiểu cần log:
- Model version.
- Input schema version.
- Prediction latency.
- Missing/unknown category rate.
- Prediction distribution.
- Business outcome khi label về sau có sẵn.
Dùng được trong production không? Nếu có thì cần điều kiện gì?
Có, stack Pandas + scikit-learn dùng được trong production cho nhiều bài toán tabular batch hoặc low/medium throughput API, nếu thỏa các điều kiện sau:
- Dataset và feature transformation fit với memory/latency budget.
- Training và inference dùng chung
Pipelineartifact. - Có schema validation, dependency pinning, artifact versioning và rollback.
- Artifact chỉ được load từ nguồn trusted.
- Có monitoring cho data drift, missing values, prediction distribution, latency và model quality.
- Có quy trình retraining/evaluation trước khi promote model mới.
Không nên dùng nguyên xi nếu workload là streaming real-time cực lớn, feature join phức tạp cần feature store, model deep learning, hoặc latency SLA rất thấp mà Pandas overhead không chấp nhận được. Khi đó cần thiết kế lại serving path bằng feature service, compiled transform, online store hoặc framework serving chuyên biệt.
Hands-on trong 60-90 phút
Làm bài trong exercise.md. Output tối thiểu:
- Train được 3 models trên Titanic hoặc fallback dataset.
- Có bảng metrics.
- Lưu được
model.joblibvàmetadata.json. - Load lại artifact và predict thử một payload.
- Trả lời câu hỏi production readiness ở cuối exercise.
Tự kiểm tra
- Vì sao
fit_transformpreprocessing trên toàn bộ dataset trước khi split là data leakage? Pipelinekhác gì so với việc gọi từng bước thủ công trong notebook?- Khi nào Pandas không còn phù hợp cho training hoặc serving?
OneHotEncoder(handle_unknown="ignore")giải quyết vấn đề gì và không giải quyết vấn đề gì?- Vì sao cần lưu feature list và package version trong artifact metadata?
- Accuracy cao có đủ để chọn model không? Vì sao?
- Risk bảo mật chính của joblib/pickle artifact là gì?
Checklist hoàn thành hôm nay
- Hiểu
ndarray,shape,dtype, broadcasting và vectorization. - Dùng được Pandas select/filter/groupby/merge/missing value analysis.
- Hiểu
Estimator,Transformer,Pipeline,ColumnTransformer. - Tách được numerical và categorical preprocessing.
- Train được ít nhất 3 model bằng cùng preprocessing contract.
- Lưu và load pipeline artifact.
- Viết inference function có schema validation.
- Trả lời được câu hỏi production readiness.
Tài liệu tham khảo
- NumPy documentation:
ndarray, broadcasting, matrix multiplication, views vs copies. - Pandas documentation: DataFrame selection, boolean indexing, groupby aggregation,
info(memory_usage=...). - scikit-learn documentation:
Pipeline,ColumnTransformer,OneHotEncoder,fetch_openml, model persistence. - Keywords:
train-serving skew,data leakage,model artifact metadata,categorical drift.
Tài liệu
File này là reference nhanh để tra trong lúc làm exercise. Bài học chính nằm ở lession.md, bài thực hành nằm ở exercise.md.
1. NumPy reference
ndarray
ndarray là container N chiều, có shape, dtype, ndim, size.
import numpy as np
a = np.arange(15).reshape(3, 5)
print(a.shape) # (3, 5)
print(a.ndim) # 2
print(a.dtype.name) # int64 hoặc int32 tùy platform
print(a.size) # 15
Trong ML:
X_train.shape = (số dòng train, số feature)
y_train.shape = (số dòng train,)
Nếu shape sai, hãy debug ngay tại boundary của function. Đừng để shape sai đi sâu vào model.
Matrix multiplication
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
v = np.array([1, 2])
print(A @ B) # matrix multiplication
print(A @ v) # matrix-vector multiplication
print(A * B) # element-wise multiplication
Rule thực tế:
- Dùng
@cho linear algebra. - Dùng
*cho element-wise operation. - Luôn kiểm tra shape khi kết quả bất ngờ.
Broadcasting
X = np.array([[1.0, 10.0], [2.0, 20.0], [3.0, 30.0]])
mean = X.mean(axis=0)
scaled = X - mean
mean có shape (2,) được broadcast qua 3 rows. Broadcasting rất mạnh nhưng cần viết code rõ ràng, tránh “may mắn chạy được”.
View vs copy
arr = np.array([[1, 2, 3], [4, 5, 6]])
view = arr[0, :]
print(np.shares_memory(arr, view)) # True trong trường hợp này
Production concern: mutation trên view có thể làm thay đổi array gốc. Nếu cần tách độc lập, dùng .copy().
2. Pandas reference
Inspect DataFrame
print(df.head())
print(df.info(memory_usage="deep"))
print(df.describe(include="all"))
print(df.isna().mean().sort_values(ascending=False))
Checklist khi nhận dataset mới:
- Có đủ columns expected không?
- Target có null không?
- Dtype có hợp lý không?
- Missing ratio column nào cao bất thường?
- Label distribution có quá lệch không?
- Có duplicate row/key không?
Select/filter/groupby
cols = ["pclass", "sex", "age", "fare"]
X = df[cols]
adults = df[df["age"].fillna(0) >= 18]
summary = (
df.groupby("pclass")["survived"]
.agg(["count", "mean"])
.sort_values("mean", ascending=False)
)
Merge
features = passengers.merge(tickets, on="ticket_id", how="left", validate="many_to_one")
Nên dùng validate khi biết relationship:
| Relationship | validate |
|---|---|
| Mỗi bên unique key | one_to_one |
| Left nhiều, right unique | many_to_one |
| Left unique, right nhiều | one_to_many |
| Cả hai có duplicate | many_to_many |
Nếu join làm row count tăng ngoài dự kiến, model có thể học từ dữ liệu bị duplicate sai.
Missing values
Trong EDA, bạn có thể inspect/fill tạm:
df["age_preview"] = df["age"].fillna(df["age"].median())
Trong training production-style, nên đưa imputation vào scikit-learn Pipeline, không fill thủ công trên toàn dataset trước split.
3. scikit-learn reference
Estimator API
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_score = model.predict_proba(X_test)[:, 1]
Transformer API
transformer.fit(X_train)
X_train_t = transformer.transform(X_train)
X_test_t = transformer.transform(X_test)
Không gọi fit trên test data.
Pipeline
from sklearn.pipeline import Pipeline
pipeline = Pipeline(
steps=[
("preprocessor", preprocessor),
("model", model),
]
)
pipeline.fit(X_train, y_train)
pipeline.predict(X_test)
Pipeline giúp:
- Đóng gói preprocessing và model.
- Tránh quên bước transform ở inference.
- Giảm train-serving skew.
- Dễ save/load artifact.
ColumnTransformer
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
numeric_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
]
)
categorical_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("encoder", OneHotEncoder(handle_unknown="ignore")),
]
)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_pipeline, numeric_features),
("cat", categorical_pipeline, categorical_features),
]
)
OneHotEncoder(handle_unknown="ignore")
Ý nghĩa:
- Khi inference gặp category chưa thấy trong training, encoder không throw error.
- Các cột one-hot tương ứng category đã biết sẽ là 0.
Trade-off:
- Tăng robustness của API.
- Nhưng category mới không có signal riêng, quality có thể giảm.
- Cần monitor unknown/category drift.
train_test_split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size=0.2,
random_state=42,
stratify=y,
)
stratify=y giữ tỷ lệ class gần giống giữa train/test cho classification. Không dùng random split cho time-series hoặc dữ liệu có leakage theo thời gian.
Metrics
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, zero_division=0),
"recall": recall_score(y_test, y_pred, zero_division=0),
"f1": f1_score(y_test, y_pred, zero_division=0),
"roc_auc": roc_auc_score(y_test, y_score),
}
Guidance:
- Accuracy dễ hiểu nhưng nguy hiểm khi class imbalance.
- Precision quan trọng khi false positive đắt.
- Recall quan trọng khi false negative đắt.
- F1 cân bằng precision/recall.
- ROC-AUC hữu ích khi cần so sánh ranking/probability quality.
4. Model artifact với joblib
import joblib
joblib.dump(pipeline, "model.joblib")
loaded = joblib.load("model.joblib")
Nên lưu thêm metadata:
{
"model_name": "logistic_regression",
"schema_version": "day4-titanic-v1",
"numeric_features": ["age", "sibsp", "parch", "fare", "family_size", "is_alone"],
"categorical_features": ["pclass", "sex", "embarked"],
"target": "survived",
"metrics": {
"roc_auc": 0.84,
"f1": 0.76
},
"random_state": 42
}
Security rule: chỉ load joblib/pickle artifact từ nguồn trusted. Không nhận file model do user upload rồi joblib.load trực tiếp.
5. Production readiness checklist
- Có schema validation cho input.
- Có split strategy phù hợp business.
- Preprocessing và model nằm chung pipeline.
- Có baseline và ít nhất một model so sánh.
- Có metrics phù hợp business cost.
- Có artifact metadata.
- Có dependency version hoặc lockfile.
- Có smoke test load artifact và predict.
- Có monitoring plan cho missing values, category drift, prediction drift và latency.
- Có rollback plan khi model mới tệ hơn.
6. Context7 docs đã tham khảo
/numpy/numpy:ndarray,shape,dtype, matrix multiplication, views/copies./websites/pandas_pydata: DataFrame boolean indexing, groupby aggregate,info(), missing value handling./websites/scikit-learn_stable:Pipeline,ColumnTransformer,OneHotEncoder(handle_unknown="ignore"), examples for preprocessing pipelines.
Bài tập
Mục tiêu exercise
Bạn sẽ viết một training script gần production cho bài toán binary classification:
Input passenger features -> predict survived probability
Yêu cầu:
- Load Titanic từ OpenML nếu có network/cache.
- Có fallback synthetic dataset để bài vẫn chạy offline.
- Validate schema trước training và trước inference.
- Dùng
ColumnTransformer+Pipeline. - Train ít nhất 3 models.
- So sánh metrics và latency.
- Lưu artifact
model.joblibvàmetadata.json. - Load lại artifact và predict một request mẫu.
1. Chuẩn bị môi trường
python3 -m venv .venv
source .venv/bin/activate
pip install numpy pandas scikit-learn joblib
Nếu dùng uv:
uv venv
source .venv/bin/activate
uv pip install numpy pandas scikit-learn joblib
2. Tạo file train_titanic_pipeline.py
Bạn có thể đặt file này ở workspace tạm hoặc trong project riêng của bạn. Nội dung:
from __future__ import annotations
import json
import platform
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import joblib
import numpy as np
import pandas as pd
import sklearn
from sklearn.compose import ColumnTransformer
from sklearn.datasets import fetch_openml, make_classification
from sklearn.ensemble import HistGradientBoostingClassifier, RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
RANDOM_STATE = 42
ARTIFACT_DIR = Path("artifacts/day4_titanic")
MODEL_PATH = ARTIFACT_DIR / "model.joblib"
METADATA_PATH = ARTIFACT_DIR / "metadata.json"
RAW_NUMERIC_FEATURES = ["age", "sibsp", "parch", "fare"]
DERIVED_NUMERIC_FEATURES = ["family_size", "is_alone"]
NUMERIC_FEATURES = RAW_NUMERIC_FEATURES + DERIVED_NUMERIC_FEATURES
CATEGORICAL_FEATURES = ["pclass", "sex", "embarked"]
FEATURES = NUMERIC_FEATURES + CATEGORICAL_FEATURES
TARGET = "survived"
@dataclass(frozen=True)
class SchemaIssue:
field: str
message: str
class SchemaValidationError(ValueError):
def __init__(self, issues: list[SchemaIssue]) -> None:
self.issues = issues
detail = "; ".join(f"{issue.field}: {issue.message}" for issue in issues)
super().__init__(detail)
def load_titanic_or_fallback() -> tuple[pd.DataFrame, str]:
"""Load Titanic from OpenML; fallback keeps the exercise runnable offline."""
try:
titanic = fetch_openml("titanic", version=1, as_frame=True, parser="auto")
frame = titanic.frame
df = frame[["pclass", "sex", "age", "sibsp", "parch", "fare", "embarked", "survived"]].copy()
df["survived"] = df["survived"].astype(int)
return df, "openml_titanic_v1"
except Exception as exc:
print(f"OpenML unavailable, using synthetic fallback. Reason: {exc}")
X, y = make_classification(
n_samples=1309,
n_features=6,
n_informative=4,
n_redundant=1,
random_state=RANDOM_STATE,
)
df = pd.DataFrame(X, columns=["age_raw", "fare_raw", "sibsp_raw", "parch_raw", "class_raw", "sex_raw"])
df["age"] = np.clip((df["age_raw"] * 12 + 32).round(1), 0, 80)
df["fare"] = np.clip((df["fare_raw"] * 20 + 35).round(2), 0, 300)
df["sibsp"] = np.clip(np.abs(df["sibsp_raw"]).round().astype(int), 0, 5)
df["parch"] = np.clip(np.abs(df["parch_raw"]).round().astype(int), 0, 5)
df["pclass"] = pd.cut(df["class_raw"], bins=3, labels=["1", "2", "3"]).astype(str)
df["sex"] = np.where(df["sex_raw"] > 0, "male", "female")
df["embarked"] = np.select(
[df["fare"] < 20, df["fare"] < 80],
["S", "C"],
default="Q",
)
df["survived"] = y.astype(int)
return df[["pclass", "sex", "age", "sibsp", "parch", "fare", "embarked", "survived"]], "synthetic_fallback"
def add_features(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["family_size"] = df["sibsp"].fillna(0) + df["parch"].fillna(0) + 1
df["is_alone"] = (df["family_size"] == 1).astype(int)
df["pclass"] = df["pclass"].astype("string")
df["sex"] = df["sex"].astype("string")
df["embarked"] = df["embarked"].astype("string")
return df
def validate_training_frame(df: pd.DataFrame) -> None:
required = set(RAW_NUMERIC_FEATURES + CATEGORICAL_FEATURES + [TARGET])
issues: list[SchemaIssue] = []
missing = sorted(required - set(df.columns))
for column in missing:
issues.append(SchemaIssue(column, "missing required column"))
if TARGET in df.columns:
labels = set(pd.Series(df[TARGET]).dropna().astype(int).unique().tolist())
if not labels.issubset({0, 1}):
issues.append(SchemaIssue(TARGET, f"expected binary labels 0/1, got {sorted(labels)}"))
for column in RAW_NUMERIC_FEATURES:
if column in df.columns and not pd.api.types.is_numeric_dtype(df[column]):
issues.append(SchemaIssue(column, f"expected numeric dtype, got {df[column].dtype}"))
if issues:
raise SchemaValidationError(issues)
def validate_inference_payload(payload: dict[str, Any]) -> pd.DataFrame:
issues: list[SchemaIssue] = []
required = set(RAW_NUMERIC_FEATURES + CATEGORICAL_FEATURES)
for column in sorted(required - set(payload)):
issues.append(SchemaIssue(column, "missing required field"))
row: dict[str, Any] = {}
for column in RAW_NUMERIC_FEATURES:
value = payload.get(column)
if value is None:
row[column] = np.nan
continue
try:
row[column] = float(value)
except (TypeError, ValueError):
issues.append(SchemaIssue(column, f"expected numeric value, got {value!r}"))
for column in CATEGORICAL_FEATURES:
value = payload.get(column)
if value is None or str(value).strip() == "":
row[column] = pd.NA
else:
row[column] = str(value)
if issues:
raise SchemaValidationError(issues)
return add_features(pd.DataFrame([row]))[FEATURES]
def build_preprocessor() -> ColumnTransformer:
numeric_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
]
)
categorical_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("encoder", OneHotEncoder(handle_unknown="ignore")),
]
)
return ColumnTransformer(
transformers=[
("num", numeric_pipeline, NUMERIC_FEATURES),
("cat", categorical_pipeline, CATEGORICAL_FEATURES),
]
)
def build_model_specs() -> list[tuple[str, Any]]:
return [
(
"logistic_regression",
LogisticRegression(max_iter=1000, random_state=RANDOM_STATE),
),
(
"random_forest",
RandomForestClassifier(
n_estimators=250,
max_depth=7,
min_samples_leaf=3,
random_state=RANDOM_STATE,
n_jobs=-1,
),
),
(
"hist_gradient_boosting",
HistGradientBoostingClassifier(
max_iter=200,
learning_rate=0.05,
random_state=RANDOM_STATE,
),
),
]
def evaluate_pipeline(
name: str,
pipeline: Pipeline,
X_train: pd.DataFrame,
X_test: pd.DataFrame,
y_train: pd.Series,
y_test: pd.Series,
) -> tuple[dict[str, float | str], Pipeline]:
train_started = time.perf_counter()
pipeline.fit(X_train, y_train)
train_ms = (time.perf_counter() - train_started) * 1000
predict_started = time.perf_counter()
y_pred = pipeline.predict(X_test)
predict_ms = (time.perf_counter() - predict_started) * 1000
if hasattr(pipeline, "predict_proba"):
y_score = pipeline.predict_proba(X_test)[:, 1]
else:
y_score = y_pred
metrics: dict[str, float | str] = {
"model": name,
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, zero_division=0),
"recall": recall_score(y_test, y_pred, zero_division=0),
"f1": f1_score(y_test, y_pred, zero_division=0),
"roc_auc": roc_auc_score(y_test, y_score),
"train_ms": train_ms,
"predict_ms_per_row": predict_ms / len(X_test),
}
return metrics, pipeline
def train() -> None:
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
raw_df, dataset_source = load_titanic_or_fallback()
validate_training_frame(raw_df)
df = add_features(raw_df)
print("Dataset source:", dataset_source)
print("Shape:", df.shape)
print("Missing ratio:")
print((df[FEATURES + [TARGET]].isna().mean() * 100).sort_values(ascending=False))
print("Target distribution:")
print(df[TARGET].value_counts(normalize=True).sort_index())
X = df[FEATURES]
y = df[TARGET].astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size=0.2,
random_state=RANDOM_STATE,
stratify=y,
)
results: list[dict[str, float | str]] = []
trained: dict[str, Pipeline] = {}
for name, model in build_model_specs():
pipeline = Pipeline(
steps=[
("preprocessor", build_preprocessor()),
("model", model),
]
)
metrics, fitted = evaluate_pipeline(name, pipeline, X_train, X_test, y_train, y_test)
results.append(metrics)
trained[name] = fitted
summary = pd.DataFrame(results).sort_values(["roc_auc", "f1"], ascending=False)
print(summary.to_string(index=False))
best_name = str(summary.iloc[0]["model"])
best_pipeline = trained[best_name]
joblib.dump(best_pipeline, MODEL_PATH)
metadata = {
"schema_version": "day4-titanic-v1",
"dataset_source": dataset_source,
"best_model": best_name,
"target": TARGET,
"raw_numeric_features": RAW_NUMERIC_FEATURES,
"numeric_features": NUMERIC_FEATURES,
"categorical_features": CATEGORICAL_FEATURES,
"features": FEATURES,
"random_state": RANDOM_STATE,
"test_size": 0.2,
"metrics": summary.to_dict(orient="records"),
"runtime": {
"python": platform.python_version(),
"platform": platform.platform(),
"numpy": np.__version__,
"pandas": pd.__version__,
"scikit_learn": sklearn.__version__,
},
"security_note": "Only load this joblib artifact from trusted storage.",
}
METADATA_PATH.write_text(json.dumps(metadata, indent=2), encoding="utf-8")
print(f"Saved model: {MODEL_PATH}")
print(f"Saved metadata: {METADATA_PATH}")
def predict_survival(payload: dict[str, Any]) -> dict[str, Any]:
if not MODEL_PATH.exists():
raise FileNotFoundError(f"Model artifact not found: {MODEL_PATH}. Run train() first.")
pipeline: Pipeline = joblib.load(MODEL_PATH)
X = validate_inference_payload(payload)
probability = float(pipeline.predict_proba(X)[0, 1])
prediction = int(probability >= 0.5)
return {
"prediction": prediction,
"survived_probability": probability,
"threshold": 0.5,
"schema_version": "day4-titanic-v1",
}
if __name__ == "__main__":
train()
sample_payload = {
"pclass": "3",
"sex": "female",
"age": 29,
"sibsp": 0,
"parch": 0,
"fare": 7.9,
"embarked": "S",
}
print("Sample prediction:")
print(json.dumps(predict_survival(sample_payload), indent=2))
3. Chạy script
python3 train_titanic_pipeline.py
Kết quả kỳ vọng:
- In dataset source:
openml_titanic_v1hoặcsynthetic_fallback. - In missing ratio và target distribution.
- In bảng metrics của 3 models.
- Tạo
artifacts/day4_titanic/model.joblib. - Tạo
artifacts/day4_titanic/metadata.json. - In sample prediction dạng JSON.
4. Review kết quả
Mở metadata.json và kiểm tra:
- Model nào được chọn?
roc_auc,f1,precision,recallcủa model tốt nhất là bao nhiêu?predict_ms_per_rowcó khác nhiều giữa các model không?dataset_sourcelà OpenML hay fallback?- Feature list có đủ raw và derived features không?
Không chọn model chỉ vì accuracy cao. Với bài toán sinh tồn Titanic, đây là exercise học stack; trong business thật, bạn phải định nghĩa false positive/false negative cost.
5. Bài tập mở rộng
Bài 1: Thêm schema range check
Trong validate_inference_payload, thêm rule:
agenằm trong[0, 120]nếu không null.farekhông âm.sibsp,parchkhông âm và là số gần integer.
Trade-off: strict validation giúp bắt input xấu sớm, nhưng nếu quá strict có thể reject dữ liệu hợp lệ ngoài distribution cũ. Với production, nên phân biệt hard validation và drift alert.
Bài 2: Thêm threshold tuning
Thay vì hard-code threshold 0.5, thử các threshold từ 0.2 đến 0.8, chọn threshold theo mục tiêu:
- Ưu tiên recall nếu bỏ sót positive rất đắt.
- Ưu tiên precision nếu false alarm rất đắt.
- Ưu tiên F1 nếu muốn cân bằng.
Gợi ý:
for threshold in np.arange(0.2, 0.85, 0.05):
y_pred = (y_score >= threshold).astype(int)
print(threshold, precision_score(y_test, y_pred), recall_score(y_test, y_pred), f1_score(y_test, y_pred))
Bài 3: Viết smoke test
Tạo test đơn giản:
def test_predict_survival_smoke():
payload = {
"pclass": "1",
"sex": "female",
"age": 38,
"sibsp": 1,
"parch": 0,
"fare": 71.28,
"embarked": "C",
}
result = predict_survival(payload)
assert 0 <= result["survived_probability"] <= 1
assert result["prediction"] in {0, 1}
Bài 4: Tách thành package nhỏ
Nếu muốn gần production hơn, tách file:
day4_project/
pyproject.toml
src/day4_titanic/
data.py
features.py
train.py
predict.py
schema.py
tests/
test_predict.py
Đây là bước chuyển từ notebook/script sang maintainable codebase.
6. Câu hỏi bắt buộc: dùng được trong production không?
Trả lời mẫu:
Có thể dùng pattern này trong production cho tabular classification nhỏ-vừa, nhưng không bê nguyên script demo vào production. Điều kiện cần:
- Dữ liệu training đại diện cho production traffic.
- Split strategy phản ánh cách model sẽ gặp dữ liệu thật, đặc biệt nếu có yếu tố thời gian.
- Pipeline artifact được build trong CI/training job có kiểm soát.
- Có schema validation ở API/batch boundary.
- Có artifact registry hoặc storage versioned.
- Có monitoring drift và model quality.
- Có rollback khi metrics production giảm.
- Có security policy: không load
joblibtừ nguồn untrusted.
Nếu inference throughput rất cao, cần benchmark kỹ Pandas single-row overhead. Có thể cần batch inference, feature service tối ưu hơn, hoặc chuyển một phần transform ra service/language phù hợp.
7. Checklist nộp bài
- Script chạy được không cần chỉnh tay.
- Có fallback khi không tải được OpenML.
- Có schema validation.
- Có
Pipelinechứa preprocessing và model. - Có ít nhất 3 models.
- Có metrics table.
- Có
model.joblib. - Có
metadata.json. - Có sample prediction.
- Có câu trả lời production readiness.