Published on

Day 8: Mini-project - Customer Churn ML Pipeline

Authors

Mục Tiêu

Day 8 là mini-project tổng hợp Phase 1. Mục tiêu không phải đạt score cao nhất, mà là xây được một ML pipeline đúng quy trình, có thể kiểm thử, có thể lặp lại và đủ gần production để một Senior Software Engineer hiểu được toàn bộ vòng đời từ dữ liệu đến inference.

Kết thúc bài này, bạn cần làm được:

  • Định nghĩa bài toán customer churn bằng ngôn ngữ business và ML.
  • Dùng Telco Customer Churn hoặc dataset tương đương để thực hiện EDA.
  • Thiết kế feature engineering không gây data leakage.
  • Train và so sánh ít nhất 3 models: Logistic Regression baseline, Random Forest, Gradient Boosting.
  • Đánh giá bằng nhiều metrics: ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix và latency.
  • Tune threshold theo mục tiêu business thay vì dùng mặc định 0.5.
  • Làm error analysis trên false positives, false negatives và từng customer segment.
  • Save model artifact kèm metadata, threshold, schema và metrics.
  • Viết inference function có input contract rõ ràng.
  • Trả lời rõ: "Dùng được trong production không? Nếu có thì cần điều kiện gì?"

TL;DR

Customer churn prediction là bài toán binary classification: dự đoán xác suất một customer sẽ rời bỏ dịch vụ trong một horizon cụ thể, ví dụ 30 ngày hoặc cuối kỳ hợp đồng. Một pipeline tốt cần nhất quán giữa training và serving:

Raw data
-> schema validation
-> EDA
-> train/validation/test split
-> feature engineering trong pipeline
-> preprocessing bằng ColumnTransformer
-> train nhiều models
-> evaluate bằng ranking metrics và threshold metrics
-> tune threshold trên validation set
-> error analysis trên test set
-> save artifact kèm metadata
-> inference function có contract

Best default cho Day 8:

  • Dùng Pipeline để gói feature engineering, preprocessing và model.
  • Dùng ColumnTransformer để xử lý numerical/categorical columns tách biệt.
  • Dùng OneHotEncoder(handle_unknown="ignore") cho category mới ở inference.
  • Dùng Logistic Regression làm baseline bắt buộc.
  • So sánh với Random Forest và Gradient Boosting.
  • Chọn model theo PR-AUC, F1 tại threshold đã tune, latency và explainability.
  • Nếu score cải thiện không đáng kể, ưu tiên baseline đơn giản hơn cho v1.

Dùng Được Trong Production Không?

Có thể dùng làm v1 production nếu thỏa các điều kiện sau:

  • Label churn được định nghĩa rõ: churn là gì, horizon bao lâu, thời điểm tạo label là khi nào.
  • Dataset training đại diện cho traffic thật và có split phù hợp. Nếu dữ liệu có timestamp, ưu tiên time-based split thay vì random split.
  • Feature tại training đều available tại prediction time. Không dùng target proxy hoặc future information.
  • Preprocessing nằm trong artifact, không viết lại thủ công ở service.
  • Có schema validation trước training và inference.
  • Threshold được chọn theo cost FP/FN hoặc mục tiêu business như recall tối thiểu.
  • Artifact lưu model, threshold, feature schema, package versions, training date, metrics và limitation.
  • Có monitoring: input null rate, unknown category rate, prediction distribution, positive rate, latency, drift và business outcome.
  • Có quy trình rollback model và đánh giá định kỳ.

Chưa nên dùng production nếu:

  • Chỉ train trong notebook, không lưu artifact reproducible.
  • Chưa có validation/test set độc lập.
  • Chưa làm error analysis theo segment quan trọng.
  • Chưa biết action sau prediction là gì.
  • Chưa có policy cho PII, logging và retention campaign.
  • Dùng random split trong khi data thật có strong time drift mà chưa kiểm chứng.

1. Problem Framing

Churn prediction không phải câu hỏi "customer có churn không" một cách mơ hồ. Bài toán cần được đóng khung như sau:

Dựa trên thông tin có sẵn tại prediction_time,
dự đoán xác suất customer sẽ churn trong prediction_horizon.

Ví dụ output nên là probability và decision metadata, không chỉ là class:

{
  "customer_id": "CUST_000123",
  "churn_probability": 0.73,
  "risk_tier": "high",
  "decision": "send_retention_offer",
  "model_version": "churn-v1",
  "threshold": 0.45
}

Các câu hỏi business cần chốt trước khi train:

Câu hỏiVì sao quan trọng
Churn horizon là 30 ngày, 60 ngày hay cuối hợp đồng?Horizon khác nhau tạo label khác nhau, model khác nhau
Customer nào nằm trong population scoring?Không nên score customer không thể hành động hoặc không đủ dữ liệu
Action sau prediction là gì?Threshold phụ thuộc vào campaign capacity và cost
False positive tốn gì?Gửi offer cho người không churn gây lãng phí hoặc giảm revenue
False negative tốn gì?Bỏ lỡ customer sắp churn
Có constraint về fairness/segment không?Model có thể over-target một nhóm customer

Best solution theo context Day 8: giả định horizon là "churn trong kỳ tiếp theo", dùng dataset Telco để học quy trình. Khi áp dụng ở công ty thật, phần định nghĩa label và point-in-time data quan trọng hơn việc đổi model.

2. Dataset Và Schema

Dataset gợi ý: Telco Customer Churn. Nếu không có CSV, bạn có thể dùng synthetic dataset có schema tương tự để luyện pipeline, nhưng khi đánh giá portfolio nên dùng dataset thật.

Một số cột thường gặp:

ColumnType gợi ýÝ nghĩaProduction note
customerIDstringID customerKhông dùng làm feature
gendercategoricalGiới tínhCần cân nhắc fairness
SeniorCitizennumeric/binaryKhách hàng cao tuổiCó thể là sensitive-ish feature
PartnercategoricalCó partner hay khôngCategory mới cần xử lý
DependentscategoricalCó người phụ thuộc hay khôngCategory mới cần xử lý
tenurenumericSố tháng sử dụngFeature mạnh, nhưng cần kiểm tra missing/outlier
PhoneServicecategoricalCó phone service hay khôngCategorical
InternetServicecategoricalDSL/Fiber/NoSegment quan trọng
ContractcategoricalMonth-to-month/One year/Two yearFeature rất mạnh
PaperlessBillingcategoricalBilling điện tửCategorical
PaymentMethodcategoricalPhương thức thanh toánCó thể liên quan đến churn
MonthlyChargesnumericPhí hàng thángCần xử lý outlier
TotalChargesnumeric/stringTổng phí đã trảHay bị đọc thành string hoặc blank
ChurntargetYes/No hoặc 1/0Không được xuất hiện trong inference input

Data quality issues thường gặp:

  • TotalCharges có blank string, cần convert bằng pd.to_numeric(errors="coerce").
  • Categorical values có whitespace.
  • Class imbalance: churn thường ít hơn non-churn.
  • Category ở production có thể không có trong training.
  • Một số feature có thể không available tại prediction time trong hệ thống thật.

3. Kiến Trúc Pipeline

Pipeline mục tiêu:

data/telco.csv
  -> load + schema validation
  -> EDA report
  -> split train/validation/test
  -> sklearn Pipeline
       -> feature builder
       -> ColumnTransformer
            -> numeric: median imputation + scaling
            -> categorical: most-frequent imputation + one-hot
       -> classifier
  -> model comparison
  -> threshold tuning trên validation set
  -> final evaluation trên test set
  -> error analysis
  -> joblib artifact
  -> predict_customer_churn(customer: dict) -> dict

Điểm quan trọng: mọi transformer có .fit() chỉ được fit trên training data. Không fit scaler, encoder hoặc imputer trên toàn bộ dataset trước khi split.

4. EDA Tối Thiểu Cần Có

EDA trong bài này không phải vẽ biểu đồ cho đẹp. Mục tiêu là phát hiện rủi ro dữ liệu trước khi train.

Checklist EDA:

  • Shape: số rows, số columns.
  • Target distribution: churn rate.
  • Missing rate theo column.
  • Duplicate customerID.
  • Numeric summary: min, p25, median, p75, max.
  • Cardinality của categorical columns.
  • Churn rate theo segment: Contract, InternetService, PaymentMethod, tenure_group.
  • Outlier rõ ràng: tenure < 0, MonthlyCharges < 0, TotalCharges blank.

Ví dụ interpretation:

  • Nếu churn rate rất thấp, PR-AUC quan trọng hơn ROC-AUC.
  • Nếu Contract=Month-to-month có churn rate cao, đây là segment cần error analysis riêng.
  • Nếu TotalCharges missing chủ yếu ở tenure=0, missing không hẳn là lỗi, có thể mang ý nghĩa nghiệp vụ.

5. Feature Engineering

Feature engineering nên đơn giản, kiểm thử được và không leak dữ liệu. Với Telco, các feature hợp lý:

FeatureCách tạoLợi íchRủi ro
avg_monthly_charge_observedTotalCharges / tenure, fallback MonthlyCharges khi tenure=0Bắt quan hệ giữa tenure và spendCần xử lý chia cho 0
tenure_groupbin tenure thành nhómDễ error analysis và model học non-linear nhẹBinning mất thông tin
monthly_charge_bandbin MonthlyChargesGiúp slice analysisNgưỡng bin có thể không ổn theo thị trường
Raw categorical one-hotContract, InternetService, PaymentMethodBaseline mạnh cho tabularFeature space tăng theo cardinality

Không nên làm trong Day 8:

  • Target encoding nếu chưa có cross-validation đúng cách, vì dễ leakage.
  • Feature dùng event sau thời điểm churn.
  • Dùng customerID làm feature.
  • Tune feature thủ công theo test set.

Best solution: để feature engineering deterministic nằm trong Pipeline hoặc ít nhất trong function được dùng chung cho training và inference. Không copy logic feature engineering sang service bằng tay.

6. Model Choices

ModelVai tròMạnhYếuKhi chọn
Logistic RegressionBaseline bắt buộcNhanh, dễ giải thích, latency thấpKhó bắt interaction phức tạpV1 production khi score đủ tốt
Random ForestCandidate non-linearRobust, ít cần scaling, bắt interactionArtifact lớn hơn, probability có thể chưa calibratedKhi quality tăng rõ và latency chấp nhận
Gradient BoostingCandidate mạnh cho tabularThường tốt trên tabular vừa/nhỏCần tuning, dễ overfit nếu thiếu disciplineKhi validation/test ổn định và monitoring đủ

Trade-off thực tế:

  • Nếu Logistic Regression kém PR-AUC 1-2 điểm nhưng nhanh, dễ debug và đủ đáp ứng business, chọn Logistic Regression cho v1.
  • Nếu Gradient Boosting cải thiện recall ở cùng precision rõ ràng, có thể chọn Gradient Boosting.
  • Nếu Random Forest artifact lớn và p99 latency cao trong API synchronous, dùng batch scoring thay vì realtime.

7. Metrics Và Threshold

Không dùng accuracy làm metric chính cho churn nếu class imbalance. Một model dự đoán tất cả là "No churn" có thể accuracy cao nhưng vô dụng.

Metrics cần report:

MetricDùng để trả lờiLưu ý
ROC-AUCModel rank positive cao hơn negative tốt không?Có thể lạc quan khi positive class nhỏ
PR-AUCTrong các case model cho điểm cao, positive thật nhiều không?Hữu ích cho churn/retention
PrecisionGửi campaign cho người được flag thì bao nhiêu người thật sự churn?Liên quan cost FP
RecallBắt được bao nhiêu churners thật?Liên quan cost FN
F1Cân bằng precision/recallKhông thay thế business cost
Confusion matrixTP/FP/FN/TN cụ thểDễ giải thích với stakeholder
LatencyDự đoán mất bao lâuQuan trọng nếu serve realtime

Threshold tuning:

1. Train model trên train set.
2. Predict probability trên validation set.
3. Quét threshold từ 0.10 đến 0.90.
4. Chọn threshold theo objective, ví dụ recall >= 0.75 rồi maximize precision.
5. Chỉ sau khi chọn xong mới đánh giá test set.

Không tune threshold trên test set. Test set là dữ liệu "chưa đụng tới" để ước lượng performance cuối.

8. Error Analysis

Error analysis là deliverable bắt buộc, không phải phần trang trí.

Cần xem:

  • Top false positives: model rất tự tin customer churn nhưng thực tế không churn.
  • Top false negatives: model bỏ sót customer churn thật.
  • Slice metrics theo Contract, InternetService, PaymentMethod, tenure_group.
  • Segment có recall thấp bất thường.
  • Segment có predicted positive rate lệch xa actual positive rate.

Ví dụ câu hỏi:

  • Model bỏ sót nhiều customer Month-to-month mới đăng ký không?
  • Model over-predict churn cho Fiber optic vì lịch sử training bias không?
  • Threshold chung có làm một segment bị recall quá thấp không?

Nếu phát hiện một segment quan trọng có performance kém, các hướng xử lý:

  • Thu thập thêm dữ liệu cho segment đó.
  • Thêm feature liên quan đến behavior trước churn.
  • Tune threshold theo segment, nhưng cần kiểm soát fairness và complexity.
  • Chạy campaign thí điểm để đo business lift thay vì chỉ nhìn offline metrics.

9. Artifact Và Inference Contract

Model artifact không chỉ là model weights. Artifact cần chứa:

  • Fitted Pipeline.
  • threshold.
  • model_name.
  • schema_version.
  • raw_feature_columns.
  • numeric_featurescategorical_features sau feature engineering.
  • Metrics trên validation/test.
  • Training time, package versions, random seed.
  • Limitation và intended use.

Inference input contract:

{
  "customerID": "CUST_000123",
  "gender": "Female",
  "SeniorCitizen": 0,
  "Partner": "Yes",
  "Dependents": "No",
  "tenure": 12,
  "PhoneService": "Yes",
  "InternetService": "Fiber optic",
  "Contract": "Month-to-month",
  "PaperlessBilling": "Yes",
  "PaymentMethod": "Electronic check",
  "MonthlyCharges": 89.9,
  "TotalCharges": 1020.3
}

Inference output contract:

{
  "customer_id": "CUST_000123",
  "churn_probability": 0.73,
  "will_churn": true,
  "risk_tier": "high",
  "threshold": 0.45,
  "model_name": "gradient_boosting",
  "schema_version": "telco-churn-v1"
}

Production note: với API thật, không nên log raw PII. Log request ID, schema version, missing count, model version, latency và prediction score bucket là đủ trong đa số trường hợp.

10. Performance Và Deployment

Performance cần nhìn theo context:

  • Batch daily scoring: latency từng record ít quan trọng hơn throughput, cost và reproducibility.
  • Realtime API: cần đo p50/p95/p99 latency, artifact load time và memory footprint.
  • Logistic Regression thường nhanh nhất.
  • Random Forest có latency tăng theo số cây và depth.
  • Gradient Boosting có latency phụ thuộc số estimators, thường vẫn ổn với tabular nhỏ.

Best solution cho churn trong nhiều công ty: batch scoring mỗi ngày hoặc mỗi vài giờ, lưu score vào database/CRM để retention team dùng. Realtime API chỉ cần nếu action xảy ra ngay trong user session, ví dụ hiển thị offer tại thời điểm customer vào trang hủy dịch vụ.

11. README Và Model Card Tối Thiểu

README của mini-project cần có:

  • Problem statement.
  • Dataset và target definition.
  • Feature list và feature engineering.
  • Split strategy.
  • Models đã thử.
  • Metrics và threshold objective.
  • Error analysis summary.
  • Production readiness.
  • Cách train và cách inference.
  • Limitations.

Model card tối thiểu:

Model: customer-churn-v1
Intended use: prioritize retention outreach
Not intended use: fully automated denial, pricing discrimination
Training data: Telco Customer Churn or equivalent
Target: churn in defined horizon
Primary metric: PR-AUC + recall at selected threshold
Threshold: chosen on validation set
Known limitations: offline dataset, no live feedback, possible segment bias
Monitoring: input drift, score drift, outcome drift, latency

12. Checklist Hoàn Thành Day 8

  • Có dataset Telco hoặc synthetic fallback tương đương.
  • Có EDA report.
  • Có schema validation cho training và inference.
  • Feature engineering không dùng target hoặc future data.
  • Dùng PipelineColumnTransformer.
  • Dùng OneHotEncoder(handle_unknown="ignore").
  • Train ít nhất 3 models.
  • Report ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix.
  • Tune threshold trên validation set.
  • Đánh giá cuối trên test set.
  • Có error analysis FP/FN và slice metrics.
  • Save artifact bằng joblib.
  • Artifact có metadata.
  • Có inference function trả probability, decision, threshold và risk tier.
  • Có performance measurement.
  • Có README/trade-off/production notes.

Lỗi Hay Gặp

  • Fit encoder/scaler trước split.
  • Dùng test set để chọn threshold.
  • Chỉ report accuracy.
  • Save model nhưng quên save threshold.
  • Training có feature engineering, inference lại không có.
  • Không xử lý category mới ở production.
  • Dùng customerID làm feature.
  • Không kiểm tra TotalCharges blank.
  • Không phân tích false negatives trong segment giá trị cao.
  • Nói "production-ready" khi chưa có monitoring và rollback.

Tự Kiểm Tra

  1. Vì sao cần validation set riêng cho threshold tuning?
  2. Khi nào PR-AUC quan trọng hơn ROC-AUC?
  3. Nếu Gradient Boosting tốt hơn Logistic Regression rất ít, bạn chọn model nào cho v1 và vì sao?
  4. Artifact cần lưu gì ngoài model object?
  5. Nếu inference input có category mới chưa từng thấy, pipeline nên xử lý thế nào?
  6. Vì sao batch scoring thường hợp lý hơn realtime API cho churn?
  7. Điều kiện tối thiểu để gọi pipeline này là production-ready là gì?

Tài liệu

Tài liệu này là phần tra cứu nhanh khi làm mini-project. Mục tiêu là giúp bạn triển khai nhất quán, tránh bỏ sót schema, metrics, artifact metadata và production notes.

1. Project Structure Gợi Ý

Nếu làm thành repository riêng cho portfolio:

customer-churn-ml-pipeline/
  README.md
  requirements.txt
  pyproject.toml
  data/
    telco_customer_churn.csv
  src/
    churn_pipeline.py
  artifacts/
    customer_churn_model.joblib
    metrics_report.json
    error_analysis/
      false_positives.csv
      false_negatives.csv
      slice_metrics.csv
  tests/
    test_schema.py
    test_inference_contract.py

Trong repo bài học này, bạn không cần commit dataset hoặc model artifact thật. Dataset và artifact thường lớn hoặc có rủi ro PII. Hãy hướng dẫn cách tạo lại artifact bằng command train.

2. Dataset Contract

Schema mặc định cho Telco Customer Churn:

ColumnRequiredTrainingInferenceTypeGhi chú
customerIDstringID, không dùng làm feature
gendercategoricalCần fairness review nếu dùng cho decision nhạy cảm
SeniorCitizennumeric/binaryCó thể đọc từ CSV là 0/1 hoặc string
PartnercategoricalYes/No
DependentscategoricalYes/No
tenurenumericSố tháng dùng dịch vụ
PhoneServicecategoricalYes/No
InternetServicecategoricalDSL/Fiber optic/No
ContractcategoricalSegment rất quan trọng
PaperlessBillingcategoricalYes/No
PaymentMethodcategoricalCó whitespace trong một số dataset
MonthlyChargesnumericPhí hàng tháng
TotalChargesnumeric/stringCần convert numeric, blank thành missing
ChurnKhôngtargetYes/No hoặc 1/0

Quy tắc validation tối thiểu:

  • Thiếu required column thì fail fast.
  • Strip whitespace ở column name và categorical value.
  • Convert numeric columns bằng pd.to_numeric(errors="coerce").
  • Target chỉ được là 0/1 sau khi normalize.
  • Duplicate customerID cần được report.
  • Missing rate quá cao ở feature quan trọng cần được cảnh báo.

3. Feature Engineering Contract

Feature raw không dùng trực tiếp một cách tùy tiện. Nên có một function hoặc transformer duy nhất tạo feature cho cả training và inference.

Feature gợi ý:

FeatureNhómNguồnGiải thích
tenurenumericrawSố tháng sử dụng
MonthlyChargesnumericrawSpend hiện tại
TotalChargesnumericrawSpend tích lũy
avg_monthly_charge_observednumericengineeredTotalCharges / tenure, fallback MonthlyCharges
SeniorCitizennumericrawBinary
tenure_groupcategoricalengineered0-6, 7-12, 13-24, 25-48, 49+
monthly_charge_bandcategoricalengineeredLow/medium/high spend
ContractcategoricalrawRất quan trọng cho churn
InternetServicecategoricalrawSegment
PaymentMethodcategoricalrawSegment
Các cột Yes/NocategoricalrawPartner, Dependents, PhoneService, PaperlessBilling

Không dùng:

  • customerID làm feature.
  • Churn hoặc bất kỳ target proxy nào.
  • Feature phát sinh sau khi customer đã churn.
  • Aggregate không có cutoff time.

4. Requirements Template

requirements.txt tối thiểu:

numpy>=1.26
pandas>=2.1
scikit-learn>=1.4,<2.0
joblib>=1.3

Nếu dùng pyproject.toml:

[project]
name = "customer-churn-ml-pipeline"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
  "numpy>=1.26",
  "pandas>=2.1",
  "scikit-learn>=1.4,<2.0",
  "joblib>=1.3",
]

[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]

Ghi chú API: bài này dùng API hiện tại của scikit-learn stable như Pipeline, ColumnTransformer, OneHotEncoder(handle_unknown="ignore"), LogisticRegression, RandomForestClassifier, GradientBoostingClassifier và các metrics classification.

5. README Template

# Customer Churn ML Pipeline

## Problem
Predict the probability that a customer will churn within the defined prediction horizon so the retention team can prioritize outreach.

## Dataset
Telco Customer Churn or an equivalent customer subscription dataset.

## Target
`Churn`: 1 if customer churned in the target horizon, otherwise 0.

## Approach
- Validate schema and clean numeric/categorical values.
- Run EDA: target distribution, missing values, cardinality, segment churn rate.
- Use sklearn `Pipeline` and `ColumnTransformer`.
- Train Logistic Regression, Random Forest, Gradient Boosting.
- Evaluate ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix.
- Tune threshold on validation set.
- Run error analysis on false positives, false negatives and customer segments.
- Save artifact with model, threshold, schema and metadata.

## Production Readiness
This project is usable as a v1 production pattern only if label definition, point-in-time feature correctness, validation, monitoring, rollback and PII policy are implemented in the target system.

## Run
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python src/churn_pipeline.py --csv data/telco_customer_churn.csv
```

## Inference
```python
from churn_pipeline import predict_customer_churn

result = predict_customer_churn(customer_payload)
```

## Limitations
- Offline dataset may not represent live customers.
- Random split is not enough if production data has time drift.
- Model probability may require calibration before high-stakes automation.
- Campaign lift must be validated with an experiment.

6. Artifact Contract

Artifact nên được save bằng joblib dưới dạng dictionary:

artifact = {
    "model": fitted_pipeline,
    "metadata": {
        "model_name": "gradient_boosting",
        "model_version": "customer-churn-v1",
        "schema_version": "telco-churn-v1",
        "threshold": 0.45,
        "raw_feature_columns": [...],
        "numeric_features": [...],
        "categorical_features": [...],
        "validation_metrics": {...},
        "test_metrics": {...},
        "created_at_utc": "...",
        "python_version": "...",
        "sklearn_version": "...",
        "random_state": 42,
        "training_note": "Validate on time-based split before production."
    }
}

Không chỉ save fitted estimator. Nếu thiếu threshold hoặc schema, service inference sẽ phải hard-code logic bên ngoài artifact, dễ gây train-serving skew.

7. Inference Contract

Request:

{
  "customerID": "CUST_000123",
  "gender": "Female",
  "SeniorCitizen": 0,
  "Partner": "Yes",
  "Dependents": "No",
  "tenure": 12,
  "PhoneService": "Yes",
  "InternetService": "Fiber optic",
  "Contract": "Month-to-month",
  "PaperlessBilling": "Yes",
  "PaymentMethod": "Electronic check",
  "MonthlyCharges": 89.9,
  "TotalCharges": 1020.3
}

Response:

{
  "customer_id": "CUST_000123",
  "churn_probability": 0.7312,
  "will_churn": true,
  "risk_tier": "high",
  "threshold": 0.45,
  "model_name": "gradient_boosting",
  "model_version": "customer-churn-v1",
  "schema_version": "telco-churn-v1"
}

Validation behavior:

  • Missing required field: return validation error, không gọi model.
  • Unknown categorical value: cho phép đi qua OneHotEncoder(handle_unknown="ignore"), đồng thời log unknown category rate ở service.
  • Numeric không parse được: convert thành missing và để imputer xử lý nếu field không bắt buộc strict. Với production nghiêm ngặt, nên reject nếu numeric core field invalid.

8. Metric Decision Matrix

Business contextMetric chínhThreshold objectiveLý do
Retention team có capacity thấpPrecision, PR-AUCMaximize precision với recall tối thiểuKhông muốn lãng phí offer
Churn rất đắtRecall, PR-AUCRecall >= target rồi maximize precisionChấp nhận nhiều FP để giảm FN
Chỉ dùng để rank list gọi điệnPR-AUC, lift@KTop K customersClass threshold ít quan trọng
Realtime offerF1, latency, calibrationBalance precision/recall và p99 latencyCần quyết định ngay
Model monitoringPositive rate, score driftAlert theo distributionOffline metric không đủ

9. Model Selection Matrix

Điều kiệnChọn
Logistic Regression gần bằng model khác, latency/explainability quan trọngLogistic Regression
Non-linear pattern rõ, model size chấp nhậnGradient Boosting
Cần robust baseline tree ensemble, ít tuningRandom Forest
Dataset lớn, nhiều categorical high-cardinalityCân nhắc histogram boosting hoặc specialized tabular model, nhưng ngoài phạm vi Day 8
Probability dùng cho quyết định tiền lớnThêm calibration và business experiment

10. Monitoring Checklist

Training-time:

  • Data snapshot version.
  • Target rate.
  • Missing rate.
  • Cardinality.
  • Metrics by segment.
  • Artifact checksum.

Serving-time:

  • Request count.
  • Validation error rate.
  • Null rate theo field.
  • Unknown category rate.
  • Score distribution.
  • Predicted positive rate.
  • Latency p50/p95/p99.
  • Model version đang serve.

Post-serving:

  • Actual churn rate sau khi label mature.
  • Precision/recall thực tế.
  • Campaign conversion/lift.
  • Segment bias.
  • Drift giữa train và live data.

11. Review Checklist Cho Portfolio

  • README có giải thích business problem, không chỉ code.
  • Code có main() hoặc CLI rõ ràng.
  • Có seed và split strategy.
  • Có validation và inference contract.
  • Có ít nhất 3 models và leaderboard.
  • Có threshold report.
  • Có error analysis file.
  • Có artifact metadata.
  • Có câu trả lời production readiness.
  • Không commit data nhạy cảm hoặc artifact lớn không cần thiết.

12. Tài Liệu Tham Khảo

  • scikit-learn stable documentation: Pipeline, ColumnTransformer, preprocessing, classification metrics, ensemble classifiers.
  • Telco Customer Churn dataset.
  • Google Machine Learning Crash Course: Classification.
  • Chip Huyen, Designing Machine Learning Systems.
  • Hidden Technical Debt in Machine Learning Systems.

Bài tập

Mục tiêu của bài tập là viết một script training gần production, không chỉ train model trong notebook. Bạn có thể dùng Telco Customer Churn CSV hoặc để script tạo synthetic dataset có schema tương tự.

1. Yêu Cầu Hoàn Thành

Bạn cần nộp được:

  • README.md mô tả problem, dataset, metrics, threshold, trade-off và production readiness.
  • requirements.txt hoặc pyproject.toml.
  • Script train pipeline.
  • Metrics report.
  • Error analysis report.
  • Model artifact .joblib.
  • Inference function nhận dict và trả dict.

Trong repo bài học, bạn có thể làm trong folder riêng của mình. Không cần commit dataset thật hoặc artifact lớn.

2. Cài Đặt

python -m venv .venv
source .venv/bin/activate
pip install numpy pandas scikit-learn joblib

Nếu dùng requirements.txt:

numpy>=1.26
pandas>=2.1
scikit-learn>=1.4,<2.0
joblib>=1.3

3. Code Mẫu Hoàn Chỉnh

Tạo file src/churn_pipeline.py trong project riêng của bạn. Code dưới đây cố ý viết theo style dễ tách module, có schema validation, pipeline, threshold tuning, artifact metadata và inference contract.

from __future__ import annotations

import argparse
import json
import platform
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

import joblib
import numpy as np
import pandas as pd
import sklearn
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    average_precision_score,
    classification_report,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
)
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler


RANDOM_STATE = 42
TARGET_COL = "Churn"
ID_COL = "customerID"
SCHEMA_VERSION = "telco-churn-v1"
MODEL_VERSION = "customer-churn-v1"

RAW_FEATURE_COLUMNS = [
    "customerID",
    "gender",
    "SeniorCitizen",
    "Partner",
    "Dependents",
    "tenure",
    "PhoneService",
    "InternetService",
    "Contract",
    "PaperlessBilling",
    "PaymentMethod",
    "MonthlyCharges",
    "TotalCharges",
]

REQUIRED_TRAINING_COLUMNS = RAW_FEATURE_COLUMNS + [TARGET_COL]

NUMERIC_FEATURES = [
    "SeniorCitizen",
    "tenure",
    "MonthlyCharges",
    "TotalCharges",
    "avg_monthly_charge_observed",
]

CATEGORICAL_FEATURES = [
    "gender",
    "Partner",
    "Dependents",
    "PhoneService",
    "InternetService",
    "Contract",
    "PaperlessBilling",
    "PaymentMethod",
    "tenure_group",
    "monthly_charge_band",
]


@dataclass
class ThresholdResult:
    threshold: float
    precision: float
    recall: float
    f1: float
    predicted_positive_rate: float
    tp: int
    fp: int
    fn: int
    tn: int


def json_default(value: Any) -> Any:
    if isinstance(value, np.integer):
        return int(value)
    if isinstance(value, np.floating):
        return float(value)
    if isinstance(value, np.ndarray):
        return value.tolist()
    if isinstance(value, Path):
        return str(value)
    return str(value)


def write_json(path: Path, payload: dict[str, Any]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(
        json.dumps(payload, indent=2, ensure_ascii=False, default=json_default),
        encoding="utf-8",
    )


def normalize_string_values(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [str(column).strip() for column in df.columns]
    for column in df.columns:
        if pd.api.types.is_object_dtype(df[column]) or pd.api.types.is_string_dtype(df[column]):
            df[column] = df[column].map(lambda value: value.strip() if isinstance(value, str) else value)
            df[column] = df[column].replace("", np.nan)
    return df


def validate_columns(df: pd.DataFrame, required_columns: list[str]) -> None:
    missing = sorted(set(required_columns) - set(df.columns))
    if missing:
        raise ValueError(f"Missing required columns: {missing}")


def normalize_telco_frame(df: pd.DataFrame, require_target: bool) -> pd.DataFrame:
    df = normalize_string_values(df)
    required_columns = REQUIRED_TRAINING_COLUMNS if require_target else RAW_FEATURE_COLUMNS
    validate_columns(df, required_columns)

    df = df.copy()
    for column in ["SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"]:
        df[column] = pd.to_numeric(df[column], errors="coerce")

    if require_target:
        if not pd.api.types.is_numeric_dtype(df[TARGET_COL]):
            df[TARGET_COL] = df[TARGET_COL].map({"Yes": 1, "No": 0, "yes": 1, "no": 0})
        df[TARGET_COL] = pd.to_numeric(df[TARGET_COL], errors="coerce")
        if df[TARGET_COL].isna().any():
            raise ValueError("Target column contains values other than Yes/No or 1/0.")
        unique_targets = set(df[TARGET_COL].astype(int).unique())
        if not unique_targets.issubset({0, 1}):
            raise ValueError(f"Target must be binary 0/1, got {sorted(unique_targets)}.")
        df[TARGET_COL] = df[TARGET_COL].astype(int)

    return df


def add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    tenure = df["tenure"].clip(lower=0)
    safe_tenure = tenure.replace(0, np.nan)
    avg_charge = df["TotalCharges"] / safe_tenure
    df["avg_monthly_charge_observed"] = avg_charge.replace([np.inf, -np.inf], np.nan)
    df["avg_monthly_charge_observed"] = df["avg_monthly_charge_observed"].fillna(df["MonthlyCharges"])

    df["tenure_group"] = pd.cut(
        tenure,
        bins=[-0.1, 6, 12, 24, 48, np.inf],
        labels=["0-6", "7-12", "13-24", "25-48", "49+"],
    ).astype("object")

    df["monthly_charge_band"] = pd.cut(
        df["MonthlyCharges"],
        bins=[-np.inf, 35, 70, np.inf],
        labels=["low", "medium", "high"],
    ).astype("object")

    return df[NUMERIC_FEATURES + CATEGORICAL_FEATURES]


class TelcoFeatureBuilder(BaseEstimator, TransformerMixin):
    def fit(self, X: pd.DataFrame, y: pd.Series | None = None) -> "TelcoFeatureBuilder":
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        normalized = normalize_telco_frame(X, require_target=False)
        return add_engineered_features(normalized)


def generate_synthetic_telco(n_rows: int = 7000) -> pd.DataFrame:
    rng = np.random.default_rng(RANDOM_STATE)

    tenure = rng.integers(0, 73, size=n_rows)
    contract = rng.choice(["Month-to-month", "One year", "Two year"], size=n_rows, p=[0.55, 0.25, 0.20])
    internet = rng.choice(["DSL", "Fiber optic", "No"], size=n_rows, p=[0.35, 0.50, 0.15])
    payment = rng.choice(
        ["Electronic check", "Mailed check", "Bank transfer", "Credit card"],
        size=n_rows,
        p=[0.45, 0.15, 0.20, 0.20],
    )
    monthly_base = np.where(internet == "Fiber optic", 82, np.where(internet == "DSL", 58, 28))
    monthly_charges = np.clip(rng.normal(monthly_base, 12), 18, 120).round(2)
    total_charges = (monthly_charges * np.maximum(tenure, 1) * rng.normal(1.0, 0.08, size=n_rows)).round(2)
    total_charges = total_charges.astype(object)
    total_charges[tenure == 0] = " "

    paperless = rng.choice(["Yes", "No"], size=n_rows, p=[0.6, 0.4])
    senior = rng.choice([0, 1], size=n_rows, p=[0.84, 0.16])
    partner = rng.choice(["Yes", "No"], size=n_rows, p=[0.48, 0.52])
    dependents = rng.choice(["Yes", "No"], size=n_rows, p=[0.30, 0.70])
    phone = rng.choice(["Yes", "No"], size=n_rows, p=[0.90, 0.10])
    gender = rng.choice(["Female", "Male"], size=n_rows)

    logit = (
        -1.7
        + 1.15 * (contract == "Month-to-month")
        + 0.55 * (internet == "Fiber optic")
        + 0.35 * (payment == "Electronic check")
        + 0.25 * (paperless == "Yes")
        + 0.30 * senior
        - 0.035 * tenure
        + 0.25 * (monthly_charges > 85)
        - 0.20 * (partner == "Yes")
        - 0.25 * (dependents == "Yes")
    )
    probability = 1 / (1 + np.exp(-logit))
    churn = rng.binomial(1, probability)

    return pd.DataFrame(
        {
            "customerID": [f"CUST_{i:06d}" for i in range(n_rows)],
            "gender": gender,
            "SeniorCitizen": senior,
            "Partner": partner,
            "Dependents": dependents,
            "tenure": tenure,
            "PhoneService": phone,
            "InternetService": internet,
            "Contract": contract,
            "PaperlessBilling": paperless,
            "PaymentMethod": payment,
            "MonthlyCharges": monthly_charges,
            "TotalCharges": total_charges,
            "Churn": np.where(churn == 1, "Yes", "No"),
        }
    )


def load_dataset(csv_path: Path | None, synthetic_rows: int) -> pd.DataFrame:
    if csv_path is not None:
        if not csv_path.exists():
            raise FileNotFoundError(f"CSV not found: {csv_path}")
        return pd.read_csv(csv_path)
    return generate_synthetic_telco(synthetic_rows)


def run_eda(df: pd.DataFrame, output_dir: Path) -> dict[str, Any]:
    output_dir.mkdir(parents=True, exist_ok=True)
    report: dict[str, Any] = {
        "rows": int(len(df)),
        "columns": int(df.shape[1]),
        "target_rate": float(df[TARGET_COL].mean()),
        "duplicate_customer_id": int(df[ID_COL].duplicated().sum()) if ID_COL in df.columns else None,
        "missing_rate": df.isna().mean().sort_values(ascending=False).to_dict(),
        "numeric_summary": df[["tenure", "MonthlyCharges", "TotalCharges"]].describe().to_dict(),
    }

    segment_cols = ["Contract", "InternetService", "PaymentMethod", "PaperlessBilling"]
    segment_report = {}
    for column in segment_cols:
        segment_report[column] = (
            df.groupby(column, dropna=False)[TARGET_COL]
            .agg(["count", "mean"])
            .sort_values("mean", ascending=False)
            .reset_index()
            .to_dict(orient="records")
        )
    report["segment_churn_rate"] = segment_report

    write_json(output_dir / "eda_report.json", report)
    return report


def build_preprocessor() -> ColumnTransformer:
    numeric_pipeline = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]
    )
    categorical_pipeline = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
        ]
    )
    return ColumnTransformer(
        transformers=[
            ("num", numeric_pipeline, NUMERIC_FEATURES),
            ("cat", categorical_pipeline, CATEGORICAL_FEATURES),
        ],
        remainder="drop",
    )


def make_pipeline_for(model: Any) -> Pipeline:
    return Pipeline(
        steps=[
            ("features", TelcoFeatureBuilder()),
            ("preprocess", build_preprocessor()),
            ("model", model),
        ]
    )


def build_models() -> dict[str, Pipeline]:
    return {
        "logistic_regression": make_pipeline_for(
            LogisticRegression(
                max_iter=2000,
                class_weight="balanced",
                random_state=RANDOM_STATE,
            )
        ),
        "random_forest": make_pipeline_for(
            RandomForestClassifier(
                n_estimators=300,
                max_depth=9,
                min_samples_leaf=20,
                class_weight="balanced",
                random_state=RANDOM_STATE,
                n_jobs=-1,
            )
        ),
        "gradient_boosting": make_pipeline_for(
            GradientBoostingClassifier(
                n_estimators=180,
                learning_rate=0.05,
                max_depth=3,
                random_state=RANDOM_STATE,
            )
        ),
    }


def metrics_at_threshold(y_true: pd.Series, proba: np.ndarray, threshold: float) -> ThresholdResult:
    y_pred = (proba >= threshold).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
    return ThresholdResult(
        threshold=float(threshold),
        precision=float(precision_score(y_true, y_pred, zero_division=0)),
        recall=float(recall_score(y_true, y_pred, zero_division=0)),
        f1=float(f1_score(y_true, y_pred, zero_division=0)),
        predicted_positive_rate=float(y_pred.mean()),
        tp=int(tp),
        fp=int(fp),
        fn=int(fn),
        tn=int(tn),
    )


def tune_threshold(
    y_true: pd.Series,
    proba: np.ndarray,
    min_recall: float,
) -> tuple[ThresholdResult, pd.DataFrame]:
    rows = [asdict(metrics_at_threshold(y_true, proba, threshold)) for threshold in np.arange(0.10, 0.91, 0.02)]
    threshold_report = pd.DataFrame(rows)
    candidates = threshold_report[threshold_report["recall"] >= min_recall]
    if candidates.empty:
        best = threshold_report.sort_values(["f1", "precision"], ascending=False).iloc[0]
    else:
        best = candidates.sort_values(["precision", "f1"], ascending=False).iloc[0]
    return ThresholdResult(**best.to_dict()), threshold_report


def evaluate_predictions(y_true: pd.Series, proba: np.ndarray, threshold: float) -> dict[str, Any]:
    threshold_metrics = metrics_at_threshold(y_true, proba, threshold)
    y_pred = (proba >= threshold).astype(int)
    return {
        "roc_auc": float(roc_auc_score(y_true, proba)),
        "pr_auc": float(average_precision_score(y_true, proba)),
        "threshold_metrics": asdict(threshold_metrics),
        "classification_report": classification_report(y_true, y_pred, output_dict=True, zero_division=0),
    }


def measure_prediction_latency(model: Pipeline, X: pd.DataFrame, repeats: int = 10) -> dict[str, float]:
    sample = X.head(min(len(X), 512))
    durations = []
    for _ in range(repeats):
        start = time.perf_counter()
        model.predict_proba(sample)[:, 1]
        durations.append((time.perf_counter() - start) * 1000)
    per_row = [duration / len(sample) for duration in durations]
    return {
        "batch_size": float(len(sample)),
        "p50_ms_per_row": float(np.percentile(per_row, 50)),
        "p95_ms_per_row": float(np.percentile(per_row, 95)),
    }


def run_error_analysis(
    model: Pipeline,
    X_test: pd.DataFrame,
    y_test: pd.Series,
    threshold: float,
    output_dir: Path,
    min_slice_size: int = 30,
) -> dict[str, Any]:
    output_dir.mkdir(parents=True, exist_ok=True)
    proba = model.predict_proba(X_test)[:, 1]
    y_pred = (proba >= threshold).astype(int)

    scored = X_test.copy()
    scored["y_true"] = y_test.to_numpy()
    scored["proba"] = proba
    scored["y_pred"] = y_pred
    scored["error_type"] = np.select(
        [
            (scored["y_true"] == 0) & (scored["y_pred"] == 1),
            (scored["y_true"] == 1) & (scored["y_pred"] == 0),
        ],
        ["false_positive", "false_negative"],
        default="correct",
    )

    false_positives = scored[scored["error_type"] == "false_positive"].sort_values("proba", ascending=False)
    false_negatives = scored[scored["error_type"] == "false_negative"].sort_values("proba", ascending=True)
    false_positives.head(50).to_csv(output_dir / "false_positives.csv", index=False)
    false_negatives.head(50).to_csv(output_dir / "false_negatives.csv", index=False)

    feature_view = TelcoFeatureBuilder().transform(X_test)
    feature_view["y_true"] = y_test.to_numpy()
    feature_view["y_pred"] = y_pred

    slice_rows = []
    for column in ["Contract", "InternetService", "PaymentMethod", "tenure_group", "monthly_charge_band"]:
        for value, group in feature_view.groupby(column, dropna=False):
            if len(group) < min_slice_size:
                continue
            slice_rows.append(
                {
                    "slice_column": column,
                    "slice_value": str(value),
                    "count": int(len(group)),
                    "actual_positive_rate": float(group["y_true"].mean()),
                    "predicted_positive_rate": float(group["y_pred"].mean()),
                    "precision": float(precision_score(group["y_true"], group["y_pred"], zero_division=0)),
                    "recall": float(recall_score(group["y_true"], group["y_pred"], zero_division=0)),
                    "f1": float(f1_score(group["y_true"], group["y_pred"], zero_division=0)),
                }
            )

    slice_metrics = pd.DataFrame(slice_rows).sort_values(["f1", "count"], ascending=[True, False])
    slice_metrics.to_csv(output_dir / "slice_metrics.csv", index=False)

    return {
        "false_positive_count": int(len(false_positives)),
        "false_negative_count": int(len(false_negatives)),
        "worst_slices": slice_metrics.head(10).to_dict(orient="records"),
    }


def train_pipeline(
    csv_path: Path | None,
    artifact_path: Path,
    report_dir: Path,
    synthetic_rows: int,
    min_recall: float,
) -> dict[str, Any]:
    raw_df = load_dataset(csv_path, synthetic_rows)
    df = normalize_telco_frame(raw_df, require_target=True)
    run_eda(df, report_dir)

    X = df[RAW_FEATURE_COLUMNS]
    y = df[TARGET_COL]

    X_train_full, X_test, y_train_full, y_test = train_test_split(
        X,
        y,
        test_size=0.20,
        stratify=y,
        random_state=RANDOM_STATE,
    )
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_full,
        y_train_full,
        test_size=0.25,
        stratify=y_train_full,
        random_state=RANDOM_STATE,
    )

    model_reports = []
    trained_models = {}
    threshold_reports = {}

    for model_name, model in build_models().items():
        print(f"Training {model_name}...")
        model.fit(X_train, y_train)
        trained_models[model_name] = model

        val_proba = model.predict_proba(X_val)[:, 1]
        best_threshold, threshold_report = tune_threshold(y_val, val_proba, min_recall=min_recall)
        threshold_reports[model_name] = threshold_report
        threshold_report.to_csv(report_dir / f"threshold_report_{model_name}.csv", index=False)

        val_metrics = evaluate_predictions(y_val, val_proba, best_threshold.threshold)
        latency = measure_prediction_latency(model, X_val)
        model_reports.append(
            {
                "model_name": model_name,
                "threshold": best_threshold.threshold,
                "validation_metrics": val_metrics,
                "latency": latency,
            }
        )

    leaderboard = pd.DataFrame(
        [
            {
                "model_name": report["model_name"],
                "threshold": report["threshold"],
                "pr_auc": report["validation_metrics"]["pr_auc"],
                "roc_auc": report["validation_metrics"]["roc_auc"],
                "precision": report["validation_metrics"]["threshold_metrics"]["precision"],
                "recall": report["validation_metrics"]["threshold_metrics"]["recall"],
                "f1": report["validation_metrics"]["threshold_metrics"]["f1"],
                "p95_ms_per_row": report["latency"]["p95_ms_per_row"],
            }
            for report in model_reports
        ]
    ).sort_values(["pr_auc", "f1"], ascending=False)

    report_dir.mkdir(parents=True, exist_ok=True)
    leaderboard.to_csv(report_dir / "leaderboard.csv", index=False)

    best_model_name = str(leaderboard.iloc[0]["model_name"])
    best_threshold = float(leaderboard.iloc[0]["threshold"])
    best_model = trained_models[best_model_name]
    test_proba = best_model.predict_proba(X_test)[:, 1]
    test_metrics = evaluate_predictions(y_test, test_proba, best_threshold)
    latency = measure_prediction_latency(best_model, X_test)
    error_report = run_error_analysis(best_model, X_test, y_test, best_threshold, report_dir / "error_analysis")

    metadata = {
        "model_name": best_model_name,
        "model_version": MODEL_VERSION,
        "schema_version": SCHEMA_VERSION,
        "threshold": best_threshold,
        "raw_feature_columns": RAW_FEATURE_COLUMNS,
        "numeric_features": NUMERIC_FEATURES,
        "categorical_features": CATEGORICAL_FEATURES,
        "validation_leaderboard": leaderboard.to_dict(orient="records"),
        "test_metrics": test_metrics,
        "latency": latency,
        "error_analysis": error_report,
        "created_at_utc": datetime.now(timezone.utc).isoformat(),
        "python_version": platform.python_version(),
        "sklearn_version": sklearn.__version__,
        "random_state": RANDOM_STATE,
        "training_rows": int(len(X_train)),
        "validation_rows": int(len(X_val)),
        "test_rows": int(len(X_test)),
        "training_note": "Validate with a time-based split and live monitoring before production deployment.",
    }

    artifact = {
        "model": best_model,
        "metadata": metadata,
    }
    artifact_path.parent.mkdir(parents=True, exist_ok=True)
    joblib.dump(artifact, artifact_path)

    write_json(report_dir / "metrics_report.json", metadata)
    print(f"Saved artifact: {artifact_path}")
    print(leaderboard)
    return metadata


def risk_tier(probability: float) -> str:
    if probability >= 0.70:
        return "high"
    if probability >= 0.40:
        return "medium"
    return "low"


def predict_customer_churn(customer: dict[str, Any], artifact_path: Path) -> dict[str, Any]:
    artifact = joblib.load(artifact_path)
    model: Pipeline = artifact["model"]
    metadata: dict[str, Any] = artifact["metadata"]
    threshold = float(metadata["threshold"])

    input_df = pd.DataFrame([customer])
    normalized = normalize_telco_frame(input_df, require_target=False)
    probability = float(model.predict_proba(normalized[RAW_FEATURE_COLUMNS])[:, 1][0])

    return {
        "customer_id": str(customer.get(ID_COL, "")),
        "churn_probability": round(probability, 4),
        "will_churn": bool(probability >= threshold),
        "risk_tier": risk_tier(probability),
        "threshold": threshold,
        "model_name": metadata["model_name"],
        "model_version": metadata["model_version"],
        "schema_version": metadata["schema_version"],
    }


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Train and evaluate a customer churn ML pipeline.")
    parser.add_argument("--csv", type=Path, default=None, help="Path to Telco Customer Churn CSV.")
    parser.add_argument("--artifact", type=Path, default=Path("artifacts/customer_churn_model.joblib"))
    parser.add_argument("--report-dir", type=Path, default=Path("artifacts/reports"))
    parser.add_argument("--synthetic-rows", type=int, default=7000)
    parser.add_argument("--min-recall", type=float, default=0.75)
    return parser.parse_args()


def main() -> None:
    args = parse_args()
    train_pipeline(
        csv_path=args.csv,
        artifact_path=args.artifact,
        report_dir=args.report_dir,
        synthetic_rows=args.synthetic_rows,
        min_recall=args.min_recall,
    )


if __name__ == "__main__":
    main()

4. Chạy Training

Không có CSV:

python src/churn_pipeline.py

Có Telco CSV:

python src/churn_pipeline.py --csv data/telco_customer_churn.csv

Kết quả mong đợi:

artifacts/customer_churn_model.joblib
artifacts/reports/eda_report.json
artifacts/reports/leaderboard.csv
artifacts/reports/metrics_report.json
artifacts/reports/threshold_report_logistic_regression.csv
artifacts/reports/threshold_report_random_forest.csv
artifacts/reports/threshold_report_gradient_boosting.csv
artifacts/reports/error_analysis/false_positives.csv
artifacts/reports/error_analysis/false_negatives.csv
artifacts/reports/error_analysis/slice_metrics.csv

5. Thử Inference

Sau khi train xong:

from pathlib import Path

from churn_pipeline import predict_customer_churn


customer = {
    "customerID": "CUST_999999",
    "gender": "Female",
    "SeniorCitizen": 0,
    "Partner": "Yes",
    "Dependents": "No",
    "tenure": 12,
    "PhoneService": "Yes",
    "InternetService": "Fiber optic",
    "Contract": "Month-to-month",
    "PaperlessBilling": "Yes",
    "PaymentMethod": "Electronic check",
    "MonthlyCharges": 89.9,
    "TotalCharges": 1020.3,
}

result = predict_customer_churn(customer, Path("artifacts/customer_churn_model.joblib"))
print(result)

Response cần có dạng:

{
  "customer_id": "CUST_999999",
  "churn_probability": 0.7312,
  "will_churn": true,
  "risk_tier": "high",
  "threshold": 0.45,
  "model_name": "gradient_boosting",
  "model_version": "customer-churn-v1",
  "schema_version": "telco-churn-v1"
}

6. Bài Tập Mở Rộng

  1. Thêm unit test cho normalize_telco_frame():

    • Missing column phải raise ValueError.
    • TotalCharges=" " phải thành missing.
    • Churn="Yes" phải thành 1.
  2. Thêm test cho inference contract:

    • Payload đủ field trả về đầy đủ keys.
    • Payload thiếu MonthlyCharges phải fail.
    • Category mới trong PaymentMethod không làm model crash.
  3. So sánh threshold objectives:

    • min_recall=0.60
    • min_recall=0.75
    • min_recall=0.90
  4. Thêm calibration:

    • Dùng CalibratedClassifierCV cho model tốt nhất.
    • So sánh calibration curve hoặc Brier score.
  5. Thêm time-based split nếu dataset của bạn có timestamp:

    • Train trên tháng cũ.
    • Validation trên tháng kế tiếp.
    • Test trên tháng mới nhất.
  6. Viết README thật:

    • Không chỉ liệt kê command.
    • Phải có trade-off, limitation và production readiness.

7. Câu Hỏi Review

  1. Vì sao TelcoFeatureBuilder nằm trong Pipeline thay vì gọi thủ công ở notebook?
  2. Vì sao threshold được tune trên validation set, không phải test set?
  3. Nếu model có PR-AUC cao nhưng recall thấp tại threshold đã chọn, bạn xử lý thế nào?
  4. OneHotEncoder(handle_unknown="ignore") giải quyết vấn đề gì và không giải quyết vấn đề gì?
  5. Nếu service nhận category mới liên tục, bạn monitor metric nào?
  6. Nếu Random Forest tốt hơn Logistic Regression 0.5 điểm PR-AUC nhưng latency p95 gấp 20 lần, bạn chọn gì trong batch scoring và realtime API?
  7. Điều kiện nào còn thiếu trước khi gọi pipeline này là production-ready?

8. Tiêu Chí Chấm

Hạng mụcĐạt
Schema validationFail fast khi thiếu cột, normalize type đúng
EDACó target distribution, missing, segment churn rate
PipelinePipeline, ColumnTransformer, transformer chung training/inference
ModelsÍt nhất 3 models, có Logistic Regression baseline
MetricsCó ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix
ThresholdTune trên validation set, có business objective
Error analysisCó FP/FN và slice metrics
ArtifactSave model + metadata + threshold + schema
InferenceFunction nhận dict, trả contract rõ ràng
Production notesCó trade-off, limitation, monitoring, rollback