Published on

Day 4: Python ML Stack

Authors

Mục tiêu

Sau bài này, bạn cần làm được các việc sau:

  • Dùng NumPy để hiểu ndarray, shape, dtype, broadcasting và vectorization ở mức đủ để debug ML code.
  • Dùng Pandas cho EDA, data cleaning, groupby, merge, missing value analysis và kiểm tra schema dữ liệu.
  • Dùng scikit-learn PipelineColumnTransformer để đóng gói preprocessing cùng model, tránh train-serving skew.
  • Train và so sánh Logistic Regression, Random Forest, Gradient Boosting trên Titanic-style dataset.
  • Lưu pipeline artifact kèm metadata, rồi viết inference function có input validation.

TL;DR

Python ML stack phổ biến gồm NumPy cho numerical compute, Pandas cho data wrangling, scikit-learn cho training pipeline, Matplotlib/Seaborn cho visualization và joblib cho artifact nhỏ-vừa. Với Senior SE, hãy xem ML pipeline như một service build artifact: input có schema, transform có version, model có metrics, artifact có metadata và inference có contract rõ ràng. Notebook rất tốt cho exploration, nhưng production cần logic repeatable trong script/package, test được và có monitoring.

1. Bức tranh tổng thể

Một ML workflow tabular cơ bản thường đi qua các bước:

raw data
  -> data loading
  -> schema check
  -> EDA
  -> train/test split
  -> preprocessing
  -> model training
  -> evaluation
  -> artifact packaging
  -> batch/API inference
  -> monitoring

Map về tư duy software engineering:

ML stackSE analogyĐiều cần kiểm soát
NumPy arrayBinary buffer/vector payloadShape, dtype, memory layout
Pandas DataFrameIn-memory table/batch ETLColumn schema, null, join cardinality
scikit-learn transformerPure function có fit/transformFit chỉ trên train data
scikit-learn estimatorBusiness logic được học từ dataParams, random seed, metrics
Pipeline artifactDeployable binaryVersion, compatibility, rollback
NotebookSpike/experiment documentKhông phải source of truth duy nhất

Best solution trong Phase 1: dùng Pandas + scikit-learn Pipeline cho tabular ML nhỏ-vừa, vì API đơn giản, ecosystem mạnh, dễ chuyển từ notebook sang script. Chưa cần Spark, MLflow hay feature store trừ khi dữ liệu lớn, team đông hoặc cần governance nghiêm ngặt.

2. NumPy: nền tảng của numerical compute

ndarray là mảng N chiều đồng nhất kiểu dữ liệu. Trong ML, dữ liệu thường có shape:

X: (n_samples, n_features)
y: (n_samples,)

Ví dụ:

import numpy as np

X = np.array(
    [
        [22, 7.25],
        [38, 71.28],
        [26, 7.92],
    ],
    dtype=np.float64,
)
w = np.array([0.03, 0.01])

scores = X @ w
print(X.shape)      # (3, 2)
print(X.dtype)      # float64
print(scores)       # vector score cho 3 rows

Điểm cần nhớ:

  • shape là contract. Nếu training dùng 8 features mà inference đưa 7 features, kết quả phải fail sớm.
  • dtype ảnh hưởng memory và tốc độ. float64 chính xác hơn nhưng tốn memory hơn float32.
  • Vectorization đẩy compute xuống native code, thường nhanh hơn Python loop rất nhiều.
  • Broadcasting tiện nhưng dễ tạo bug nếu shape không rõ ràng.
  • Slicing có thể tạo view chia sẻ memory; copy/view không rõ có thể gây mutation ngoài ý muốn.

Ví dụ broadcasting:

X = np.array([[1.0, 10.0], [2.0, 20.0], [3.0, 30.0]])
mean = X.mean(axis=0)
std = X.std(axis=0)
scaled = (X - mean) / std

Ở production, bạn hiếm khi tự scale bằng NumPy nếu dùng scikit-learn. Nhưng hiểu cơ chế này giúp debug StandardScaler, feature matrix và lỗi shape.

3. Pandas: data wrangling như batch ETL

Pandas DataFrame là bảng có index, column name và dtype. Với ML tabular, Pandas phù hợp cho:

  • Inspect schema.
  • Thống kê missing values.
  • Tạo feature đơn giản.
  • Join dữ liệu nguồn.
  • EDA trước khi đóng gói preprocessing vào pipeline.

Các thao tác cốt lõi:

import pandas as pd

df = pd.DataFrame(
    {
        "pclass": [3, 1, 3],
        "sex": ["male", "female", "female"],
        "age": [22.0, 38.0, None],
        "fare": [7.25, 71.28, 7.92],
        "survived": [0, 1, 1],
    }
)

selected = df[["pclass", "sex", "age"]]
adults = df[df["age"].fillna(0) >= 18]
survival_by_class = df.groupby("pclass")["survived"].agg(["count", "mean"])
missing_ratio = df.isna().mean().sort_values(ascending=False)

Map Pandas sang SQL:

PandasSQL
df[cols]SELECT cols
df[df["age"] >= 18]WHERE age >= 18
groupby().agg()GROUP BY
merge()JOIN
sort_values()ORDER BY
drop_duplicates()DISTINCT

Production concern:

  • Pandas load data vào RAM. Nếu dataset lớn hơn memory, cân nhắc DuckDB, Polars, database query, Spark hoặc chunk processing.
  • object dtype tốn memory và dễ lẫn kiểu. Với dữ liệu category, kiểm tra unique count và null ratio.
  • Join có thể nhân bản dòng nếu key không unique. Luôn kiểm tra row count trước/sau merge.
  • Không hard-code EDA transformation vào notebook rồi quên đưa vào training pipeline.

4. scikit-learn mental model

scikit-learn xoay quanh 3 interface chính:

transformer.fit(X_train)
X_train_transformed = transformer.transform(X_train)
X_test_transformed = transformer.transform(X_test)

model.fit(X_train_transformed, y_train)
y_pred = model.predict(X_test_transformed)

Pipeline gộp các bước lại:

raw DataFrame
  -> impute missing values
  -> scale numerical columns
  -> encode categorical columns
  -> classifier

Điểm cực kỳ quan trọng: các bước có học tham số từ dữ liệu như imputer, scaler, encoder phải fit trên train set, sau đó chỉ transform validation/test/production data. Nếu fit trên toàn bộ dataset trước khi split, bạn đã leak thông tin từ test set vào training.

5. ColumnTransformer cho dữ liệu mixed type

Dữ liệu business thường có numerical và categorical columns. Mỗi nhóm cần preprocessing khác nhau:

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

numeric_features = ["age", "sibsp", "parch", "fare"]
categorical_features = ["pclass", "sex", "embarked"]

numeric_pipeline = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler()),
    ]
)

categorical_pipeline = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("encoder", OneHotEncoder(handle_unknown="ignore")),
    ]
)

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_pipeline, numeric_features),
        ("cat", categorical_pipeline, categorical_features),
    ]
)

Vì sao handle_unknown="ignore" quan trọng? Production luôn có category mới: một cảng mới, plan mới, campaign mới, country code mới. Nếu encoder fail toàn bộ request vì category mới, service sẽ brittle. ignore giúp request vẫn chạy, nhưng bạn cần monitor category drift vì quality có thể giảm.

6. Notebook-to-production workflow

Notebook tốt cho:

  • EDA nhanh.
  • Plot distribution.
  • So sánh hypothesis.
  • Ghi chú reasoning.
  • Demo kết quả.

Notebook không nên là production source duy nhất vì:

  • Cell execution order dễ sai.
  • Config và seed không rõ.
  • Khó test tự động.
  • Khó review diff.
  • Artifact tạo ra không có metadata.

Workflow đề xuất:

01_explore.ipynb
  -> ghi insight, plot, assumption

src/data.py
  -> load data, validate schema, split

src/features.py
  -> build preprocessing pipeline

src/train.py
  -> train, evaluate, save artifact

src/predict.py
  -> load artifact, validate request, predict

tests/
  -> schema test, prediction smoke test

Trong repo học này, bạn chưa cần tạo package đầy đủ ngay. Nhưng exercise sẽ mô phỏng các thành phần quan trọng bằng một script gần production.

7. Chọn model baseline

Với Titanic-style tabular classification, ba model hợp lý để so sánh:

ModelKhi nên dùngĐiểm mạnhHạn chế
Logistic RegressionBaseline đầu tiênNhanh, dễ giải thích, ít overfitCần feature engineering cho quan hệ phi tuyến
Random ForestBaseline tree mạnhBắt nonlinear pattern, ít preprocessing numerical hơnArtifact lớn hơn, latency cao hơn linear model
HistGradientBoostingTabular mạnh, training nhanhHiệu quả với numeric patternCategorical vẫn cần encode, tuning cần cẩn thận

Best solution theo context:

  • Học nền tảng/need explainability: bắt đầu với Logistic Regression.
  • Tabular data nhỏ-vừa và muốn quality tốt nhanh: thử Random Forest hoặc Gradient Boosting.
  • Production latency rất thấp hoặc cần explainability cao: ưu tiên linear model hoặc tree nhỏ.
  • Dataset lớn, feature cardinality cao: đo memory của one-hot; cân nhắc hashing, target encoding có kiểm soát hoặc model khác.

Trade-offs

Lựa chọnNên dùng khiKhông nên dùng khiGuidance cụ thể
PandasEDA, training batch nhỏ-vừa, dữ liệu fit RAMDataset quá RAM, realtime low-latency pathDùng mặc định trong Day 4, nhưng không đưa Pandas-heavy transform vào hot path nếu latency nghiêm ngặt
NumPy vectorizationCompute lặp trên array lớnLogic branch phức tạp per rowƯu tiên vectorize feature computation thay vì Python loop
scikit-learn PipelinePreprocessing + model tabularDeep learning training loop phức tạpMặc định dùng để tránh train-serving skew
OneHotEncoderCategorical low/medium cardinalityHàng chục nghìn categoryMonitor feature explosion; dùng sparse output khi phù hợp
Random splitData IID, không phụ thuộc thời gianTime-series, user lifecycle, event streamVới production có thời gian, dùng time-based split
joblib artifactInternal artifact từ training trustedArtifact không rõ nguồnKhông load artifact từ untrusted source

Best practices từ industry

  1. Split data trước khi fit preprocessing để tránh leakage.
  2. Luôn lưu preprocessing và model trong cùng một Pipeline.
  3. Lưu metadata cạnh artifact: feature list, target, metrics, dataset source, package version, random seed.
  4. Validate schema ở cả training và inference. Fail rõ ràng khi thiếu cột hoặc sai kiểu nghiêm trọng.
  5. Log metrics không chỉ accuracy. Với classification, luôn xem precision, recall, F1 và ROC-AUC nếu có probability.
  6. Pin dependency version cho artifact quan trọng vì pickle/joblib phụ thuộc Python và package version.
  7. Không dùng notebook làm cron job production; chuyển sang script/package có CLI, config và test.

Performance considerations

  • Pandas giữ DataFrame trong RAM; object string column có overhead lớn. Dùng df.info(memory_usage="deep") để ước lượng tốt hơn.
  • One-hot encoding có thể làm feature matrix phình mạnh. Nếu 1 triệu rows và 50.000 category, one-hot dense là không khả thi.
  • Sparse matrix tiết kiệm memory cho one-hot/text, nhưng không phải model nào cũng xử lý sparse tối ưu như nhau.
  • n_jobs=-1 tăng throughput training/prediction với một số estimator, nhưng có thể tranh CPU với service khác.
  • Batch prediction thường nhanh hơn single-row prediction vì giảm overhead Python.
  • Scaling numerical features rất quan trọng với Logistic Regression/SVM/KNN, ít quan trọng hơn với tree-based models.

Production concerns

Data quality

  • Missing value ratio tăng đột biến có thể là upstream pipeline bug.
  • Category mới tăng nhanh là signal drift.
  • Numeric range vượt xa training distribution cần alert.
  • Duplicate key sau join có thể làm label distribution sai.

Reliability

  • Training script phải deterministic ở mức hợp lý: set random_state, lưu split strategy, lưu config.
  • Artifact cần versioning và rollback.
  • Inference phải trả lỗi rõ ràng cho invalid payload, không silently reorder/missing feature.

Security

  • pickle/joblib có thể thực thi code khi load object độc hại. Chỉ load artifact do pipeline build đáng tin cậy tạo ra.
  • Không log raw PII trong prediction logs. Với Titanic exercise không có PII thật, nhưng production customer data thường có.

Observability

Tối thiểu cần log:

  • Model version.
  • Input schema version.
  • Prediction latency.
  • Missing/unknown category rate.
  • Prediction distribution.
  • Business outcome khi label về sau có sẵn.

Dùng được trong production không? Nếu có thì cần điều kiện gì?

Có, stack Pandas + scikit-learn dùng được trong production cho nhiều bài toán tabular batch hoặc low/medium throughput API, nếu thỏa các điều kiện sau:

  • Dataset và feature transformation fit với memory/latency budget.
  • Training và inference dùng chung Pipeline artifact.
  • Có schema validation, dependency pinning, artifact versioning và rollback.
  • Artifact chỉ được load từ nguồn trusted.
  • Có monitoring cho data drift, missing values, prediction distribution, latency và model quality.
  • Có quy trình retraining/evaluation trước khi promote model mới.

Không nên dùng nguyên xi nếu workload là streaming real-time cực lớn, feature join phức tạp cần feature store, model deep learning, hoặc latency SLA rất thấp mà Pandas overhead không chấp nhận được. Khi đó cần thiết kế lại serving path bằng feature service, compiled transform, online store hoặc framework serving chuyên biệt.

Hands-on trong 60-90 phút

Làm bài trong exercise.md. Output tối thiểu:

  • Train được 3 models trên Titanic hoặc fallback dataset.
  • Có bảng metrics.
  • Lưu được model.joblibmetadata.json.
  • Load lại artifact và predict thử một payload.
  • Trả lời câu hỏi production readiness ở cuối exercise.

Tự kiểm tra

  1. Vì sao fit_transform preprocessing trên toàn bộ dataset trước khi split là data leakage?
  2. Pipeline khác gì so với việc gọi từng bước thủ công trong notebook?
  3. Khi nào Pandas không còn phù hợp cho training hoặc serving?
  4. OneHotEncoder(handle_unknown="ignore") giải quyết vấn đề gì và không giải quyết vấn đề gì?
  5. Vì sao cần lưu feature list và package version trong artifact metadata?
  6. Accuracy cao có đủ để chọn model không? Vì sao?
  7. Risk bảo mật chính của joblib/pickle artifact là gì?

Checklist hoàn thành hôm nay

  • Hiểu ndarray, shape, dtype, broadcasting và vectorization.
  • Dùng được Pandas select/filter/groupby/merge/missing value analysis.
  • Hiểu Estimator, Transformer, Pipeline, ColumnTransformer.
  • Tách được numerical và categorical preprocessing.
  • Train được ít nhất 3 model bằng cùng preprocessing contract.
  • Lưu và load pipeline artifact.
  • Viết inference function có schema validation.
  • Trả lời được câu hỏi production readiness.

Tài liệu tham khảo

  • NumPy documentation: ndarray, broadcasting, matrix multiplication, views vs copies.
  • Pandas documentation: DataFrame selection, boolean indexing, groupby aggregation, info(memory_usage=...).
  • scikit-learn documentation: Pipeline, ColumnTransformer, OneHotEncoder, fetch_openml, model persistence.
  • Keywords: train-serving skew, data leakage, model artifact metadata, categorical drift.

Tài liệu

File này là reference nhanh để tra trong lúc làm exercise. Bài học chính nằm ở lession.md, bài thực hành nằm ở exercise.md.

1. NumPy reference

ndarray

ndarray là container N chiều, có shape, dtype, ndim, size.

import numpy as np

a = np.arange(15).reshape(3, 5)
print(a.shape)      # (3, 5)
print(a.ndim)       # 2
print(a.dtype.name) # int64 hoặc int32 tùy platform
print(a.size)       # 15

Trong ML:

X_train.shape = (số dòng train, số feature)
y_train.shape = (số dòng train,)

Nếu shape sai, hãy debug ngay tại boundary của function. Đừng để shape sai đi sâu vào model.

Matrix multiplication

A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
v = np.array([1, 2])

print(A @ B)    # matrix multiplication
print(A @ v)    # matrix-vector multiplication
print(A * B)    # element-wise multiplication

Rule thực tế:

  • Dùng @ cho linear algebra.
  • Dùng * cho element-wise operation.
  • Luôn kiểm tra shape khi kết quả bất ngờ.

Broadcasting

X = np.array([[1.0, 10.0], [2.0, 20.0], [3.0, 30.0]])
mean = X.mean(axis=0)
scaled = X - mean

mean có shape (2,) được broadcast qua 3 rows. Broadcasting rất mạnh nhưng cần viết code rõ ràng, tránh “may mắn chạy được”.

View vs copy

arr = np.array([[1, 2, 3], [4, 5, 6]])
view = arr[0, :]
print(np.shares_memory(arr, view))  # True trong trường hợp này

Production concern: mutation trên view có thể làm thay đổi array gốc. Nếu cần tách độc lập, dùng .copy().

2. Pandas reference

Inspect DataFrame

print(df.head())
print(df.info(memory_usage="deep"))
print(df.describe(include="all"))
print(df.isna().mean().sort_values(ascending=False))

Checklist khi nhận dataset mới:

  • Có đủ columns expected không?
  • Target có null không?
  • Dtype có hợp lý không?
  • Missing ratio column nào cao bất thường?
  • Label distribution có quá lệch không?
  • Có duplicate row/key không?

Select/filter/groupby

cols = ["pclass", "sex", "age", "fare"]
X = df[cols]

adults = df[df["age"].fillna(0) >= 18]

summary = (
    df.groupby("pclass")["survived"]
    .agg(["count", "mean"])
    .sort_values("mean", ascending=False)
)

Merge

features = passengers.merge(tickets, on="ticket_id", how="left", validate="many_to_one")

Nên dùng validate khi biết relationship:

Relationshipvalidate
Mỗi bên unique keyone_to_one
Left nhiều, right uniquemany_to_one
Left unique, right nhiềuone_to_many
Cả hai có duplicatemany_to_many

Nếu join làm row count tăng ngoài dự kiến, model có thể học từ dữ liệu bị duplicate sai.

Missing values

Trong EDA, bạn có thể inspect/fill tạm:

df["age_preview"] = df["age"].fillna(df["age"].median())

Trong training production-style, nên đưa imputation vào scikit-learn Pipeline, không fill thủ công trên toàn dataset trước split.

3. scikit-learn reference

Estimator API

model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_score = model.predict_proba(X_test)[:, 1]

Transformer API

transformer.fit(X_train)
X_train_t = transformer.transform(X_train)
X_test_t = transformer.transform(X_test)

Không gọi fit trên test data.

Pipeline

from sklearn.pipeline import Pipeline

pipeline = Pipeline(
    steps=[
        ("preprocessor", preprocessor),
        ("model", model),
    ]
)

pipeline.fit(X_train, y_train)
pipeline.predict(X_test)

Pipeline giúp:

  • Đóng gói preprocessing và model.
  • Tránh quên bước transform ở inference.
  • Giảm train-serving skew.
  • Dễ save/load artifact.

ColumnTransformer

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

numeric_pipeline = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler()),
    ]
)

categorical_pipeline = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("encoder", OneHotEncoder(handle_unknown="ignore")),
    ]
)

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_pipeline, numeric_features),
        ("cat", categorical_pipeline, categorical_features),
    ]
)

OneHotEncoder(handle_unknown="ignore")

Ý nghĩa:

  • Khi inference gặp category chưa thấy trong training, encoder không throw error.
  • Các cột one-hot tương ứng category đã biết sẽ là 0.

Trade-off:

  • Tăng robustness của API.
  • Nhưng category mới không có signal riêng, quality có thể giảm.
  • Cần monitor unknown/category drift.

train_test_split

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
    stratify=y,
)

stratify=y giữ tỷ lệ class gần giống giữa train/test cho classification. Không dùng random split cho time-series hoặc dữ liệu có leakage theo thời gian.

Metrics

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

metrics = {
    "accuracy": accuracy_score(y_test, y_pred),
    "precision": precision_score(y_test, y_pred, zero_division=0),
    "recall": recall_score(y_test, y_pred, zero_division=0),
    "f1": f1_score(y_test, y_pred, zero_division=0),
    "roc_auc": roc_auc_score(y_test, y_score),
}

Guidance:

  • Accuracy dễ hiểu nhưng nguy hiểm khi class imbalance.
  • Precision quan trọng khi false positive đắt.
  • Recall quan trọng khi false negative đắt.
  • F1 cân bằng precision/recall.
  • ROC-AUC hữu ích khi cần so sánh ranking/probability quality.

4. Model artifact với joblib

import joblib

joblib.dump(pipeline, "model.joblib")
loaded = joblib.load("model.joblib")

Nên lưu thêm metadata:

{
  "model_name": "logistic_regression",
  "schema_version": "day4-titanic-v1",
  "numeric_features": ["age", "sibsp", "parch", "fare", "family_size", "is_alone"],
  "categorical_features": ["pclass", "sex", "embarked"],
  "target": "survived",
  "metrics": {
    "roc_auc": 0.84,
    "f1": 0.76
  },
  "random_state": 42
}

Security rule: chỉ load joblib/pickle artifact từ nguồn trusted. Không nhận file model do user upload rồi joblib.load trực tiếp.

5. Production readiness checklist

  • Có schema validation cho input.
  • Có split strategy phù hợp business.
  • Preprocessing và model nằm chung pipeline.
  • Có baseline và ít nhất một model so sánh.
  • Có metrics phù hợp business cost.
  • Có artifact metadata.
  • Có dependency version hoặc lockfile.
  • Có smoke test load artifact và predict.
  • Có monitoring plan cho missing values, category drift, prediction drift và latency.
  • Có rollback plan khi model mới tệ hơn.

6. Context7 docs đã tham khảo

  • /numpy/numpy: ndarray, shape, dtype, matrix multiplication, views/copies.
  • /websites/pandas_pydata: DataFrame boolean indexing, groupby aggregate, info(), missing value handling.
  • /websites/scikit-learn_stable: Pipeline, ColumnTransformer, OneHotEncoder(handle_unknown="ignore"), examples for preprocessing pipelines.

Bài tập

Mục tiêu exercise

Bạn sẽ viết một training script gần production cho bài toán binary classification:

Input passenger features -> predict survived probability

Yêu cầu:

  • Load Titanic từ OpenML nếu có network/cache.
  • Có fallback synthetic dataset để bài vẫn chạy offline.
  • Validate schema trước training và trước inference.
  • Dùng ColumnTransformer + Pipeline.
  • Train ít nhất 3 models.
  • So sánh metrics và latency.
  • Lưu artifact model.joblibmetadata.json.
  • Load lại artifact và predict một request mẫu.

1. Chuẩn bị môi trường

python3 -m venv .venv
source .venv/bin/activate
pip install numpy pandas scikit-learn joblib

Nếu dùng uv:

uv venv
source .venv/bin/activate
uv pip install numpy pandas scikit-learn joblib

2. Tạo file train_titanic_pipeline.py

Bạn có thể đặt file này ở workspace tạm hoặc trong project riêng của bạn. Nội dung:

from __future__ import annotations

import json
import platform
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any

import joblib
import numpy as np
import pandas as pd
import sklearn
from sklearn.compose import ColumnTransformer
from sklearn.datasets import fetch_openml, make_classification
from sklearn.ensemble import HistGradientBoostingClassifier, RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler


RANDOM_STATE = 42
ARTIFACT_DIR = Path("artifacts/day4_titanic")
MODEL_PATH = ARTIFACT_DIR / "model.joblib"
METADATA_PATH = ARTIFACT_DIR / "metadata.json"

RAW_NUMERIC_FEATURES = ["age", "sibsp", "parch", "fare"]
DERIVED_NUMERIC_FEATURES = ["family_size", "is_alone"]
NUMERIC_FEATURES = RAW_NUMERIC_FEATURES + DERIVED_NUMERIC_FEATURES
CATEGORICAL_FEATURES = ["pclass", "sex", "embarked"]
FEATURES = NUMERIC_FEATURES + CATEGORICAL_FEATURES
TARGET = "survived"


@dataclass(frozen=True)
class SchemaIssue:
    field: str
    message: str


class SchemaValidationError(ValueError):
    def __init__(self, issues: list[SchemaIssue]) -> None:
        self.issues = issues
        detail = "; ".join(f"{issue.field}: {issue.message}" for issue in issues)
        super().__init__(detail)


def load_titanic_or_fallback() -> tuple[pd.DataFrame, str]:
    """Load Titanic from OpenML; fallback keeps the exercise runnable offline."""
    try:
        titanic = fetch_openml("titanic", version=1, as_frame=True, parser="auto")
        frame = titanic.frame
        df = frame[["pclass", "sex", "age", "sibsp", "parch", "fare", "embarked", "survived"]].copy()
        df["survived"] = df["survived"].astype(int)
        return df, "openml_titanic_v1"
    except Exception as exc:
        print(f"OpenML unavailable, using synthetic fallback. Reason: {exc}")
        X, y = make_classification(
            n_samples=1309,
            n_features=6,
            n_informative=4,
            n_redundant=1,
            random_state=RANDOM_STATE,
        )
        df = pd.DataFrame(X, columns=["age_raw", "fare_raw", "sibsp_raw", "parch_raw", "class_raw", "sex_raw"])
        df["age"] = np.clip((df["age_raw"] * 12 + 32).round(1), 0, 80)
        df["fare"] = np.clip((df["fare_raw"] * 20 + 35).round(2), 0, 300)
        df["sibsp"] = np.clip(np.abs(df["sibsp_raw"]).round().astype(int), 0, 5)
        df["parch"] = np.clip(np.abs(df["parch_raw"]).round().astype(int), 0, 5)
        df["pclass"] = pd.cut(df["class_raw"], bins=3, labels=["1", "2", "3"]).astype(str)
        df["sex"] = np.where(df["sex_raw"] > 0, "male", "female")
        df["embarked"] = np.select(
            [df["fare"] < 20, df["fare"] < 80],
            ["S", "C"],
            default="Q",
        )
        df["survived"] = y.astype(int)
        return df[["pclass", "sex", "age", "sibsp", "parch", "fare", "embarked", "survived"]], "synthetic_fallback"


def add_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["family_size"] = df["sibsp"].fillna(0) + df["parch"].fillna(0) + 1
    df["is_alone"] = (df["family_size"] == 1).astype(int)
    df["pclass"] = df["pclass"].astype("string")
    df["sex"] = df["sex"].astype("string")
    df["embarked"] = df["embarked"].astype("string")
    return df


def validate_training_frame(df: pd.DataFrame) -> None:
    required = set(RAW_NUMERIC_FEATURES + CATEGORICAL_FEATURES + [TARGET])
    issues: list[SchemaIssue] = []

    missing = sorted(required - set(df.columns))
    for column in missing:
        issues.append(SchemaIssue(column, "missing required column"))

    if TARGET in df.columns:
        labels = set(pd.Series(df[TARGET]).dropna().astype(int).unique().tolist())
        if not labels.issubset({0, 1}):
            issues.append(SchemaIssue(TARGET, f"expected binary labels 0/1, got {sorted(labels)}"))

    for column in RAW_NUMERIC_FEATURES:
        if column in df.columns and not pd.api.types.is_numeric_dtype(df[column]):
            issues.append(SchemaIssue(column, f"expected numeric dtype, got {df[column].dtype}"))

    if issues:
        raise SchemaValidationError(issues)


def validate_inference_payload(payload: dict[str, Any]) -> pd.DataFrame:
    issues: list[SchemaIssue] = []
    required = set(RAW_NUMERIC_FEATURES + CATEGORICAL_FEATURES)

    for column in sorted(required - set(payload)):
        issues.append(SchemaIssue(column, "missing required field"))

    row: dict[str, Any] = {}
    for column in RAW_NUMERIC_FEATURES:
        value = payload.get(column)
        if value is None:
            row[column] = np.nan
            continue
        try:
            row[column] = float(value)
        except (TypeError, ValueError):
            issues.append(SchemaIssue(column, f"expected numeric value, got {value!r}"))

    for column in CATEGORICAL_FEATURES:
        value = payload.get(column)
        if value is None or str(value).strip() == "":
            row[column] = pd.NA
        else:
            row[column] = str(value)

    if issues:
        raise SchemaValidationError(issues)

    return add_features(pd.DataFrame([row]))[FEATURES]


def build_preprocessor() -> ColumnTransformer:
    numeric_pipeline = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]
    )
    categorical_pipeline = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OneHotEncoder(handle_unknown="ignore")),
        ]
    )
    return ColumnTransformer(
        transformers=[
            ("num", numeric_pipeline, NUMERIC_FEATURES),
            ("cat", categorical_pipeline, CATEGORICAL_FEATURES),
        ]
    )


def build_model_specs() -> list[tuple[str, Any]]:
    return [
        (
            "logistic_regression",
            LogisticRegression(max_iter=1000, random_state=RANDOM_STATE),
        ),
        (
            "random_forest",
            RandomForestClassifier(
                n_estimators=250,
                max_depth=7,
                min_samples_leaf=3,
                random_state=RANDOM_STATE,
                n_jobs=-1,
            ),
        ),
        (
            "hist_gradient_boosting",
            HistGradientBoostingClassifier(
                max_iter=200,
                learning_rate=0.05,
                random_state=RANDOM_STATE,
            ),
        ),
    ]


def evaluate_pipeline(
    name: str,
    pipeline: Pipeline,
    X_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_train: pd.Series,
    y_test: pd.Series,
) -> tuple[dict[str, float | str], Pipeline]:
    train_started = time.perf_counter()
    pipeline.fit(X_train, y_train)
    train_ms = (time.perf_counter() - train_started) * 1000

    predict_started = time.perf_counter()
    y_pred = pipeline.predict(X_test)
    predict_ms = (time.perf_counter() - predict_started) * 1000

    if hasattr(pipeline, "predict_proba"):
        y_score = pipeline.predict_proba(X_test)[:, 1]
    else:
        y_score = y_pred

    metrics: dict[str, float | str] = {
        "model": name,
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred, zero_division=0),
        "recall": recall_score(y_test, y_pred, zero_division=0),
        "f1": f1_score(y_test, y_pred, zero_division=0),
        "roc_auc": roc_auc_score(y_test, y_score),
        "train_ms": train_ms,
        "predict_ms_per_row": predict_ms / len(X_test),
    }
    return metrics, pipeline


def train() -> None:
    ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)

    raw_df, dataset_source = load_titanic_or_fallback()
    validate_training_frame(raw_df)
    df = add_features(raw_df)

    print("Dataset source:", dataset_source)
    print("Shape:", df.shape)
    print("Missing ratio:")
    print((df[FEATURES + [TARGET]].isna().mean() * 100).sort_values(ascending=False))
    print("Target distribution:")
    print(df[TARGET].value_counts(normalize=True).sort_index())

    X = df[FEATURES]
    y = df[TARGET].astype(int)

    X_train, X_test, y_train, y_test = train_test_split(
        X,
        y,
        test_size=0.2,
        random_state=RANDOM_STATE,
        stratify=y,
    )

    results: list[dict[str, float | str]] = []
    trained: dict[str, Pipeline] = {}

    for name, model in build_model_specs():
        pipeline = Pipeline(
            steps=[
                ("preprocessor", build_preprocessor()),
                ("model", model),
            ]
        )
        metrics, fitted = evaluate_pipeline(name, pipeline, X_train, X_test, y_train, y_test)
        results.append(metrics)
        trained[name] = fitted

    summary = pd.DataFrame(results).sort_values(["roc_auc", "f1"], ascending=False)
    print(summary.to_string(index=False))

    best_name = str(summary.iloc[0]["model"])
    best_pipeline = trained[best_name]
    joblib.dump(best_pipeline, MODEL_PATH)

    metadata = {
        "schema_version": "day4-titanic-v1",
        "dataset_source": dataset_source,
        "best_model": best_name,
        "target": TARGET,
        "raw_numeric_features": RAW_NUMERIC_FEATURES,
        "numeric_features": NUMERIC_FEATURES,
        "categorical_features": CATEGORICAL_FEATURES,
        "features": FEATURES,
        "random_state": RANDOM_STATE,
        "test_size": 0.2,
        "metrics": summary.to_dict(orient="records"),
        "runtime": {
            "python": platform.python_version(),
            "platform": platform.platform(),
            "numpy": np.__version__,
            "pandas": pd.__version__,
            "scikit_learn": sklearn.__version__,
        },
        "security_note": "Only load this joblib artifact from trusted storage.",
    }
    METADATA_PATH.write_text(json.dumps(metadata, indent=2), encoding="utf-8")

    print(f"Saved model: {MODEL_PATH}")
    print(f"Saved metadata: {METADATA_PATH}")


def predict_survival(payload: dict[str, Any]) -> dict[str, Any]:
    if not MODEL_PATH.exists():
        raise FileNotFoundError(f"Model artifact not found: {MODEL_PATH}. Run train() first.")

    pipeline: Pipeline = joblib.load(MODEL_PATH)
    X = validate_inference_payload(payload)
    probability = float(pipeline.predict_proba(X)[0, 1])
    prediction = int(probability >= 0.5)
    return {
        "prediction": prediction,
        "survived_probability": probability,
        "threshold": 0.5,
        "schema_version": "day4-titanic-v1",
    }


if __name__ == "__main__":
    train()
    sample_payload = {
        "pclass": "3",
        "sex": "female",
        "age": 29,
        "sibsp": 0,
        "parch": 0,
        "fare": 7.9,
        "embarked": "S",
    }
    print("Sample prediction:")
    print(json.dumps(predict_survival(sample_payload), indent=2))

3. Chạy script

python3 train_titanic_pipeline.py

Kết quả kỳ vọng:

  • In dataset source: openml_titanic_v1 hoặc synthetic_fallback.
  • In missing ratio và target distribution.
  • In bảng metrics của 3 models.
  • Tạo artifacts/day4_titanic/model.joblib.
  • Tạo artifacts/day4_titanic/metadata.json.
  • In sample prediction dạng JSON.

4. Review kết quả

Mở metadata.json và kiểm tra:

  • Model nào được chọn?
  • roc_auc, f1, precision, recall của model tốt nhất là bao nhiêu?
  • predict_ms_per_row có khác nhiều giữa các model không?
  • dataset_source là OpenML hay fallback?
  • Feature list có đủ raw và derived features không?

Không chọn model chỉ vì accuracy cao. Với bài toán sinh tồn Titanic, đây là exercise học stack; trong business thật, bạn phải định nghĩa false positive/false negative cost.

5. Bài tập mở rộng

Bài 1: Thêm schema range check

Trong validate_inference_payload, thêm rule:

  • age nằm trong [0, 120] nếu không null.
  • fare không âm.
  • sibsp, parch không âm và là số gần integer.

Trade-off: strict validation giúp bắt input xấu sớm, nhưng nếu quá strict có thể reject dữ liệu hợp lệ ngoài distribution cũ. Với production, nên phân biệt hard validation và drift alert.

Bài 2: Thêm threshold tuning

Thay vì hard-code threshold 0.5, thử các threshold từ 0.2 đến 0.8, chọn threshold theo mục tiêu:

  • Ưu tiên recall nếu bỏ sót positive rất đắt.
  • Ưu tiên precision nếu false alarm rất đắt.
  • Ưu tiên F1 nếu muốn cân bằng.

Gợi ý:

for threshold in np.arange(0.2, 0.85, 0.05):
    y_pred = (y_score >= threshold).astype(int)
    print(threshold, precision_score(y_test, y_pred), recall_score(y_test, y_pred), f1_score(y_test, y_pred))

Bài 3: Viết smoke test

Tạo test đơn giản:

def test_predict_survival_smoke():
    payload = {
        "pclass": "1",
        "sex": "female",
        "age": 38,
        "sibsp": 1,
        "parch": 0,
        "fare": 71.28,
        "embarked": "C",
    }
    result = predict_survival(payload)
    assert 0 <= result["survived_probability"] <= 1
    assert result["prediction"] in {0, 1}

Bài 4: Tách thành package nhỏ

Nếu muốn gần production hơn, tách file:

day4_project/
  pyproject.toml
  src/day4_titanic/
    data.py
    features.py
    train.py
    predict.py
    schema.py
  tests/
    test_predict.py

Đây là bước chuyển từ notebook/script sang maintainable codebase.

6. Câu hỏi bắt buộc: dùng được trong production không?

Trả lời mẫu:

Có thể dùng pattern này trong production cho tabular classification nhỏ-vừa, nhưng không bê nguyên script demo vào production. Điều kiện cần:

  • Dữ liệu training đại diện cho production traffic.
  • Split strategy phản ánh cách model sẽ gặp dữ liệu thật, đặc biệt nếu có yếu tố thời gian.
  • Pipeline artifact được build trong CI/training job có kiểm soát.
  • Có schema validation ở API/batch boundary.
  • Có artifact registry hoặc storage versioned.
  • Có monitoring drift và model quality.
  • Có rollback khi metrics production giảm.
  • Có security policy: không load joblib từ nguồn untrusted.

Nếu inference throughput rất cao, cần benchmark kỹ Pandas single-row overhead. Có thể cần batch inference, feature service tối ưu hơn, hoặc chuyển một phần transform ra service/language phù hợp.

7. Checklist nộp bài

  • Script chạy được không cần chỉnh tay.
  • Có fallback khi không tải được OpenML.
  • Có schema validation.
  • Pipeline chứa preprocessing và model.
  • Có ít nhất 3 models.
  • Có metrics table.
  • model.joblib.
  • metadata.json.
  • Có sample prediction.
  • Có câu trả lời production readiness.