- Published on
Day 8: Mini-project - Customer Churn ML Pipeline
- Authors

- Name
- Trần Mạnh Thắng
- @TranManhThang96
Mục Tiêu
Day 8 là mini-project tổng hợp Phase 1. Mục tiêu không phải đạt score cao nhất, mà là xây được một ML pipeline đúng quy trình, có thể kiểm thử, có thể lặp lại và đủ gần production để một Senior Software Engineer hiểu được toàn bộ vòng đời từ dữ liệu đến inference.
Kết thúc bài này, bạn cần làm được:
- Định nghĩa bài toán customer churn bằng ngôn ngữ business và ML.
- Dùng Telco Customer Churn hoặc dataset tương đương để thực hiện EDA.
- Thiết kế feature engineering không gây data leakage.
- Train và so sánh ít nhất 3 models: Logistic Regression baseline, Random Forest, Gradient Boosting.
- Đánh giá bằng nhiều metrics: ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix và latency.
- Tune threshold theo mục tiêu business thay vì dùng mặc định
0.5. - Làm error analysis trên false positives, false negatives và từng customer segment.
- Save model artifact kèm metadata, threshold, schema và metrics.
- Viết inference function có input contract rõ ràng.
- Trả lời rõ: "Dùng được trong production không? Nếu có thì cần điều kiện gì?"
TL;DR
Customer churn prediction là bài toán binary classification: dự đoán xác suất một customer sẽ rời bỏ dịch vụ trong một horizon cụ thể, ví dụ 30 ngày hoặc cuối kỳ hợp đồng. Một pipeline tốt cần nhất quán giữa training và serving:
Raw data
-> schema validation
-> EDA
-> train/validation/test split
-> feature engineering trong pipeline
-> preprocessing bằng ColumnTransformer
-> train nhiều models
-> evaluate bằng ranking metrics và threshold metrics
-> tune threshold trên validation set
-> error analysis trên test set
-> save artifact kèm metadata
-> inference function có contract
Best default cho Day 8:
- Dùng
Pipelineđể gói feature engineering, preprocessing và model. - Dùng
ColumnTransformerđể xử lý numerical/categorical columns tách biệt. - Dùng
OneHotEncoder(handle_unknown="ignore")cho category mới ở inference. - Dùng Logistic Regression làm baseline bắt buộc.
- So sánh với Random Forest và Gradient Boosting.
- Chọn model theo PR-AUC, F1 tại threshold đã tune, latency và explainability.
- Nếu score cải thiện không đáng kể, ưu tiên baseline đơn giản hơn cho v1.
Dùng Được Trong Production Không?
Có thể dùng làm v1 production nếu thỏa các điều kiện sau:
- Label churn được định nghĩa rõ: churn là gì, horizon bao lâu, thời điểm tạo label là khi nào.
- Dataset training đại diện cho traffic thật và có split phù hợp. Nếu dữ liệu có timestamp, ưu tiên time-based split thay vì random split.
- Feature tại training đều available tại prediction time. Không dùng target proxy hoặc future information.
- Preprocessing nằm trong artifact, không viết lại thủ công ở service.
- Có schema validation trước training và inference.
- Threshold được chọn theo cost FP/FN hoặc mục tiêu business như recall tối thiểu.
- Artifact lưu model, threshold, feature schema, package versions, training date, metrics và limitation.
- Có monitoring: input null rate, unknown category rate, prediction distribution, positive rate, latency, drift và business outcome.
- Có quy trình rollback model và đánh giá định kỳ.
Chưa nên dùng production nếu:
- Chỉ train trong notebook, không lưu artifact reproducible.
- Chưa có validation/test set độc lập.
- Chưa làm error analysis theo segment quan trọng.
- Chưa biết action sau prediction là gì.
- Chưa có policy cho PII, logging và retention campaign.
- Dùng random split trong khi data thật có strong time drift mà chưa kiểm chứng.
1. Problem Framing
Churn prediction không phải câu hỏi "customer có churn không" một cách mơ hồ. Bài toán cần được đóng khung như sau:
Dựa trên thông tin có sẵn tại prediction_time,
dự đoán xác suất customer sẽ churn trong prediction_horizon.
Ví dụ output nên là probability và decision metadata, không chỉ là class:
{
"customer_id": "CUST_000123",
"churn_probability": 0.73,
"risk_tier": "high",
"decision": "send_retention_offer",
"model_version": "churn-v1",
"threshold": 0.45
}
Các câu hỏi business cần chốt trước khi train:
| Câu hỏi | Vì sao quan trọng |
|---|---|
| Churn horizon là 30 ngày, 60 ngày hay cuối hợp đồng? | Horizon khác nhau tạo label khác nhau, model khác nhau |
| Customer nào nằm trong population scoring? | Không nên score customer không thể hành động hoặc không đủ dữ liệu |
| Action sau prediction là gì? | Threshold phụ thuộc vào campaign capacity và cost |
| False positive tốn gì? | Gửi offer cho người không churn gây lãng phí hoặc giảm revenue |
| False negative tốn gì? | Bỏ lỡ customer sắp churn |
| Có constraint về fairness/segment không? | Model có thể over-target một nhóm customer |
Best solution theo context Day 8: giả định horizon là "churn trong kỳ tiếp theo", dùng dataset Telco để học quy trình. Khi áp dụng ở công ty thật, phần định nghĩa label và point-in-time data quan trọng hơn việc đổi model.
2. Dataset Và Schema
Dataset gợi ý: Telco Customer Churn. Nếu không có CSV, bạn có thể dùng synthetic dataset có schema tương tự để luyện pipeline, nhưng khi đánh giá portfolio nên dùng dataset thật.
Một số cột thường gặp:
| Column | Type gợi ý | Ý nghĩa | Production note |
|---|---|---|---|
customerID | string | ID customer | Không dùng làm feature |
gender | categorical | Giới tính | Cần cân nhắc fairness |
SeniorCitizen | numeric/binary | Khách hàng cao tuổi | Có thể là sensitive-ish feature |
Partner | categorical | Có partner hay không | Category mới cần xử lý |
Dependents | categorical | Có người phụ thuộc hay không | Category mới cần xử lý |
tenure | numeric | Số tháng sử dụng | Feature mạnh, nhưng cần kiểm tra missing/outlier |
PhoneService | categorical | Có phone service hay không | Categorical |
InternetService | categorical | DSL/Fiber/No | Segment quan trọng |
Contract | categorical | Month-to-month/One year/Two year | Feature rất mạnh |
PaperlessBilling | categorical | Billing điện tử | Categorical |
PaymentMethod | categorical | Phương thức thanh toán | Có thể liên quan đến churn |
MonthlyCharges | numeric | Phí hàng tháng | Cần xử lý outlier |
TotalCharges | numeric/string | Tổng phí đã trả | Hay bị đọc thành string hoặc blank |
Churn | target | Yes/No hoặc 1/0 | Không được xuất hiện trong inference input |
Data quality issues thường gặp:
TotalChargescó blank string, cần convert bằngpd.to_numeric(errors="coerce").- Categorical values có whitespace.
- Class imbalance: churn thường ít hơn non-churn.
- Category ở production có thể không có trong training.
- Một số feature có thể không available tại prediction time trong hệ thống thật.
3. Kiến Trúc Pipeline
Pipeline mục tiêu:
data/telco.csv
-> load + schema validation
-> EDA report
-> split train/validation/test
-> sklearn Pipeline
-> feature builder
-> ColumnTransformer
-> numeric: median imputation + scaling
-> categorical: most-frequent imputation + one-hot
-> classifier
-> model comparison
-> threshold tuning trên validation set
-> final evaluation trên test set
-> error analysis
-> joblib artifact
-> predict_customer_churn(customer: dict) -> dict
Điểm quan trọng: mọi transformer có .fit() chỉ được fit trên training data. Không fit scaler, encoder hoặc imputer trên toàn bộ dataset trước khi split.
4. EDA Tối Thiểu Cần Có
EDA trong bài này không phải vẽ biểu đồ cho đẹp. Mục tiêu là phát hiện rủi ro dữ liệu trước khi train.
Checklist EDA:
- Shape: số rows, số columns.
- Target distribution: churn rate.
- Missing rate theo column.
- Duplicate
customerID. - Numeric summary: min, p25, median, p75, max.
- Cardinality của categorical columns.
- Churn rate theo segment:
Contract,InternetService,PaymentMethod,tenure_group. - Outlier rõ ràng:
tenure < 0,MonthlyCharges < 0,TotalChargesblank.
Ví dụ interpretation:
- Nếu churn rate rất thấp, PR-AUC quan trọng hơn ROC-AUC.
- Nếu
Contract=Month-to-monthcó churn rate cao, đây là segment cần error analysis riêng. - Nếu
TotalChargesmissing chủ yếu ởtenure=0, missing không hẳn là lỗi, có thể mang ý nghĩa nghiệp vụ.
5. Feature Engineering
Feature engineering nên đơn giản, kiểm thử được và không leak dữ liệu. Với Telco, các feature hợp lý:
| Feature | Cách tạo | Lợi ích | Rủi ro |
|---|---|---|---|
avg_monthly_charge_observed | TotalCharges / tenure, fallback MonthlyCharges khi tenure=0 | Bắt quan hệ giữa tenure và spend | Cần xử lý chia cho 0 |
tenure_group | bin tenure thành nhóm | Dễ error analysis và model học non-linear nhẹ | Binning mất thông tin |
monthly_charge_band | bin MonthlyCharges | Giúp slice analysis | Ngưỡng bin có thể không ổn theo thị trường |
| Raw categorical one-hot | Contract, InternetService, PaymentMethod | Baseline mạnh cho tabular | Feature space tăng theo cardinality |
Không nên làm trong Day 8:
- Target encoding nếu chưa có cross-validation đúng cách, vì dễ leakage.
- Feature dùng event sau thời điểm churn.
- Dùng
customerIDlàm feature. - Tune feature thủ công theo test set.
Best solution: để feature engineering deterministic nằm trong Pipeline hoặc ít nhất trong function được dùng chung cho training và inference. Không copy logic feature engineering sang service bằng tay.
6. Model Choices
| Model | Vai trò | Mạnh | Yếu | Khi chọn |
|---|---|---|---|---|
| Logistic Regression | Baseline bắt buộc | Nhanh, dễ giải thích, latency thấp | Khó bắt interaction phức tạp | V1 production khi score đủ tốt |
| Random Forest | Candidate non-linear | Robust, ít cần scaling, bắt interaction | Artifact lớn hơn, probability có thể chưa calibrated | Khi quality tăng rõ và latency chấp nhận |
| Gradient Boosting | Candidate mạnh cho tabular | Thường tốt trên tabular vừa/nhỏ | Cần tuning, dễ overfit nếu thiếu discipline | Khi validation/test ổn định và monitoring đủ |
Trade-off thực tế:
- Nếu Logistic Regression kém PR-AUC 1-2 điểm nhưng nhanh, dễ debug và đủ đáp ứng business, chọn Logistic Regression cho v1.
- Nếu Gradient Boosting cải thiện recall ở cùng precision rõ ràng, có thể chọn Gradient Boosting.
- Nếu Random Forest artifact lớn và p99 latency cao trong API synchronous, dùng batch scoring thay vì realtime.
7. Metrics Và Threshold
Không dùng accuracy làm metric chính cho churn nếu class imbalance. Một model dự đoán tất cả là "No churn" có thể accuracy cao nhưng vô dụng.
Metrics cần report:
| Metric | Dùng để trả lời | Lưu ý |
|---|---|---|
| ROC-AUC | Model rank positive cao hơn negative tốt không? | Có thể lạc quan khi positive class nhỏ |
| PR-AUC | Trong các case model cho điểm cao, positive thật nhiều không? | Hữu ích cho churn/retention |
| Precision | Gửi campaign cho người được flag thì bao nhiêu người thật sự churn? | Liên quan cost FP |
| Recall | Bắt được bao nhiêu churners thật? | Liên quan cost FN |
| F1 | Cân bằng precision/recall | Không thay thế business cost |
| Confusion matrix | TP/FP/FN/TN cụ thể | Dễ giải thích với stakeholder |
| Latency | Dự đoán mất bao lâu | Quan trọng nếu serve realtime |
Threshold tuning:
1. Train model trên train set.
2. Predict probability trên validation set.
3. Quét threshold từ 0.10 đến 0.90.
4. Chọn threshold theo objective, ví dụ recall >= 0.75 rồi maximize precision.
5. Chỉ sau khi chọn xong mới đánh giá test set.
Không tune threshold trên test set. Test set là dữ liệu "chưa đụng tới" để ước lượng performance cuối.
8. Error Analysis
Error analysis là deliverable bắt buộc, không phải phần trang trí.
Cần xem:
- Top false positives: model rất tự tin customer churn nhưng thực tế không churn.
- Top false negatives: model bỏ sót customer churn thật.
- Slice metrics theo
Contract,InternetService,PaymentMethod,tenure_group. - Segment có recall thấp bất thường.
- Segment có predicted positive rate lệch xa actual positive rate.
Ví dụ câu hỏi:
- Model bỏ sót nhiều customer
Month-to-monthmới đăng ký không? - Model over-predict churn cho
Fiber opticvì lịch sử training bias không? - Threshold chung có làm một segment bị recall quá thấp không?
Nếu phát hiện một segment quan trọng có performance kém, các hướng xử lý:
- Thu thập thêm dữ liệu cho segment đó.
- Thêm feature liên quan đến behavior trước churn.
- Tune threshold theo segment, nhưng cần kiểm soát fairness và complexity.
- Chạy campaign thí điểm để đo business lift thay vì chỉ nhìn offline metrics.
9. Artifact Và Inference Contract
Model artifact không chỉ là model weights. Artifact cần chứa:
- Fitted
Pipeline. threshold.model_name.schema_version.raw_feature_columns.numeric_featuresvàcategorical_featuressau feature engineering.- Metrics trên validation/test.
- Training time, package versions, random seed.
- Limitation và intended use.
Inference input contract:
{
"customerID": "CUST_000123",
"gender": "Female",
"SeniorCitizen": 0,
"Partner": "Yes",
"Dependents": "No",
"tenure": 12,
"PhoneService": "Yes",
"InternetService": "Fiber optic",
"Contract": "Month-to-month",
"PaperlessBilling": "Yes",
"PaymentMethod": "Electronic check",
"MonthlyCharges": 89.9,
"TotalCharges": 1020.3
}
Inference output contract:
{
"customer_id": "CUST_000123",
"churn_probability": 0.73,
"will_churn": true,
"risk_tier": "high",
"threshold": 0.45,
"model_name": "gradient_boosting",
"schema_version": "telco-churn-v1"
}
Production note: với API thật, không nên log raw PII. Log request ID, schema version, missing count, model version, latency và prediction score bucket là đủ trong đa số trường hợp.
10. Performance Và Deployment
Performance cần nhìn theo context:
- Batch daily scoring: latency từng record ít quan trọng hơn throughput, cost và reproducibility.
- Realtime API: cần đo p50/p95/p99 latency, artifact load time và memory footprint.
- Logistic Regression thường nhanh nhất.
- Random Forest có latency tăng theo số cây và depth.
- Gradient Boosting có latency phụ thuộc số estimators, thường vẫn ổn với tabular nhỏ.
Best solution cho churn trong nhiều công ty: batch scoring mỗi ngày hoặc mỗi vài giờ, lưu score vào database/CRM để retention team dùng. Realtime API chỉ cần nếu action xảy ra ngay trong user session, ví dụ hiển thị offer tại thời điểm customer vào trang hủy dịch vụ.
11. README Và Model Card Tối Thiểu
README của mini-project cần có:
- Problem statement.
- Dataset và target definition.
- Feature list và feature engineering.
- Split strategy.
- Models đã thử.
- Metrics và threshold objective.
- Error analysis summary.
- Production readiness.
- Cách train và cách inference.
- Limitations.
Model card tối thiểu:
Model: customer-churn-v1
Intended use: prioritize retention outreach
Not intended use: fully automated denial, pricing discrimination
Training data: Telco Customer Churn or equivalent
Target: churn in defined horizon
Primary metric: PR-AUC + recall at selected threshold
Threshold: chosen on validation set
Known limitations: offline dataset, no live feedback, possible segment bias
Monitoring: input drift, score drift, outcome drift, latency
12. Checklist Hoàn Thành Day 8
- Có dataset Telco hoặc synthetic fallback tương đương.
- Có EDA report.
- Có schema validation cho training và inference.
- Feature engineering không dùng target hoặc future data.
- Dùng
PipelinevàColumnTransformer. - Dùng
OneHotEncoder(handle_unknown="ignore"). - Train ít nhất 3 models.
- Report ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix.
- Tune threshold trên validation set.
- Đánh giá cuối trên test set.
- Có error analysis FP/FN và slice metrics.
- Save artifact bằng
joblib. - Artifact có metadata.
- Có inference function trả probability, decision, threshold và risk tier.
- Có performance measurement.
- Có README/trade-off/production notes.
Lỗi Hay Gặp
- Fit encoder/scaler trước split.
- Dùng test set để chọn threshold.
- Chỉ report accuracy.
- Save model nhưng quên save threshold.
- Training có feature engineering, inference lại không có.
- Không xử lý category mới ở production.
- Dùng
customerIDlàm feature. - Không kiểm tra
TotalChargesblank. - Không phân tích false negatives trong segment giá trị cao.
- Nói "production-ready" khi chưa có monitoring và rollback.
Tự Kiểm Tra
- Vì sao cần validation set riêng cho threshold tuning?
- Khi nào PR-AUC quan trọng hơn ROC-AUC?
- Nếu Gradient Boosting tốt hơn Logistic Regression rất ít, bạn chọn model nào cho v1 và vì sao?
- Artifact cần lưu gì ngoài model object?
- Nếu inference input có category mới chưa từng thấy, pipeline nên xử lý thế nào?
- Vì sao batch scoring thường hợp lý hơn realtime API cho churn?
- Điều kiện tối thiểu để gọi pipeline này là production-ready là gì?
Tài liệu
Tài liệu này là phần tra cứu nhanh khi làm mini-project. Mục tiêu là giúp bạn triển khai nhất quán, tránh bỏ sót schema, metrics, artifact metadata và production notes.
1. Project Structure Gợi Ý
Nếu làm thành repository riêng cho portfolio:
customer-churn-ml-pipeline/
README.md
requirements.txt
pyproject.toml
data/
telco_customer_churn.csv
src/
churn_pipeline.py
artifacts/
customer_churn_model.joblib
metrics_report.json
error_analysis/
false_positives.csv
false_negatives.csv
slice_metrics.csv
tests/
test_schema.py
test_inference_contract.py
Trong repo bài học này, bạn không cần commit dataset hoặc model artifact thật. Dataset và artifact thường lớn hoặc có rủi ro PII. Hãy hướng dẫn cách tạo lại artifact bằng command train.
2. Dataset Contract
Schema mặc định cho Telco Customer Churn:
| Column | Required | Training | Inference | Type | Ghi chú |
|---|---|---|---|---|---|
customerID | Có | Có | Có | string | ID, không dùng làm feature |
gender | Có | Có | Có | categorical | Cần fairness review nếu dùng cho decision nhạy cảm |
SeniorCitizen | Có | Có | Có | numeric/binary | Có thể đọc từ CSV là 0/1 hoặc string |
Partner | Có | Có | Có | categorical | Yes/No |
Dependents | Có | Có | Có | categorical | Yes/No |
tenure | Có | Có | Có | numeric | Số tháng dùng dịch vụ |
PhoneService | Có | Có | Có | categorical | Yes/No |
InternetService | Có | Có | Có | categorical | DSL/Fiber optic/No |
Contract | Có | Có | Có | categorical | Segment rất quan trọng |
PaperlessBilling | Có | Có | Có | categorical | Yes/No |
PaymentMethod | Có | Có | Có | categorical | Có whitespace trong một số dataset |
MonthlyCharges | Có | Có | Có | numeric | Phí hàng tháng |
TotalCharges | Có | Có | Có | numeric/string | Cần convert numeric, blank thành missing |
Churn | Có | Có | Không | target | Yes/No hoặc 1/0 |
Quy tắc validation tối thiểu:
- Thiếu required column thì fail fast.
- Strip whitespace ở column name và categorical value.
- Convert numeric columns bằng
pd.to_numeric(errors="coerce"). - Target chỉ được là
0/1sau khi normalize. - Duplicate
customerIDcần được report. - Missing rate quá cao ở feature quan trọng cần được cảnh báo.
3. Feature Engineering Contract
Feature raw không dùng trực tiếp một cách tùy tiện. Nên có một function hoặc transformer duy nhất tạo feature cho cả training và inference.
Feature gợi ý:
| Feature | Nhóm | Nguồn | Giải thích |
|---|---|---|---|
tenure | numeric | raw | Số tháng sử dụng |
MonthlyCharges | numeric | raw | Spend hiện tại |
TotalCharges | numeric | raw | Spend tích lũy |
avg_monthly_charge_observed | numeric | engineered | TotalCharges / tenure, fallback MonthlyCharges |
SeniorCitizen | numeric | raw | Binary |
tenure_group | categorical | engineered | 0-6, 7-12, 13-24, 25-48, 49+ |
monthly_charge_band | categorical | engineered | Low/medium/high spend |
Contract | categorical | raw | Rất quan trọng cho churn |
InternetService | categorical | raw | Segment |
PaymentMethod | categorical | raw | Segment |
Các cột Yes/No | categorical | raw | Partner, Dependents, PhoneService, PaperlessBilling |
Không dùng:
customerIDlàm feature.Churnhoặc bất kỳ target proxy nào.- Feature phát sinh sau khi customer đã churn.
- Aggregate không có cutoff time.
4. Requirements Template
requirements.txt tối thiểu:
numpy>=1.26
pandas>=2.1
scikit-learn>=1.4,<2.0
joblib>=1.3
Nếu dùng pyproject.toml:
[project]
name = "customer-churn-ml-pipeline"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"numpy>=1.26",
"pandas>=2.1",
"scikit-learn>=1.4,<2.0",
"joblib>=1.3",
]
[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]
Ghi chú API: bài này dùng API hiện tại của scikit-learn stable như Pipeline, ColumnTransformer, OneHotEncoder(handle_unknown="ignore"), LogisticRegression, RandomForestClassifier, GradientBoostingClassifier và các metrics classification.
5. README Template
# Customer Churn ML Pipeline
## Problem
Predict the probability that a customer will churn within the defined prediction horizon so the retention team can prioritize outreach.
## Dataset
Telco Customer Churn or an equivalent customer subscription dataset.
## Target
`Churn`: 1 if customer churned in the target horizon, otherwise 0.
## Approach
- Validate schema and clean numeric/categorical values.
- Run EDA: target distribution, missing values, cardinality, segment churn rate.
- Use sklearn `Pipeline` and `ColumnTransformer`.
- Train Logistic Regression, Random Forest, Gradient Boosting.
- Evaluate ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix.
- Tune threshold on validation set.
- Run error analysis on false positives, false negatives and customer segments.
- Save artifact with model, threshold, schema and metadata.
## Production Readiness
This project is usable as a v1 production pattern only if label definition, point-in-time feature correctness, validation, monitoring, rollback and PII policy are implemented in the target system.
## Run
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python src/churn_pipeline.py --csv data/telco_customer_churn.csv
```
## Inference
```python
from churn_pipeline import predict_customer_churn
result = predict_customer_churn(customer_payload)
```
## Limitations
- Offline dataset may not represent live customers.
- Random split is not enough if production data has time drift.
- Model probability may require calibration before high-stakes automation.
- Campaign lift must be validated with an experiment.
6. Artifact Contract
Artifact nên được save bằng joblib dưới dạng dictionary:
artifact = {
"model": fitted_pipeline,
"metadata": {
"model_name": "gradient_boosting",
"model_version": "customer-churn-v1",
"schema_version": "telco-churn-v1",
"threshold": 0.45,
"raw_feature_columns": [...],
"numeric_features": [...],
"categorical_features": [...],
"validation_metrics": {...},
"test_metrics": {...},
"created_at_utc": "...",
"python_version": "...",
"sklearn_version": "...",
"random_state": 42,
"training_note": "Validate on time-based split before production."
}
}
Không chỉ save fitted estimator. Nếu thiếu threshold hoặc schema, service inference sẽ phải hard-code logic bên ngoài artifact, dễ gây train-serving skew.
7. Inference Contract
Request:
{
"customerID": "CUST_000123",
"gender": "Female",
"SeniorCitizen": 0,
"Partner": "Yes",
"Dependents": "No",
"tenure": 12,
"PhoneService": "Yes",
"InternetService": "Fiber optic",
"Contract": "Month-to-month",
"PaperlessBilling": "Yes",
"PaymentMethod": "Electronic check",
"MonthlyCharges": 89.9,
"TotalCharges": 1020.3
}
Response:
{
"customer_id": "CUST_000123",
"churn_probability": 0.7312,
"will_churn": true,
"risk_tier": "high",
"threshold": 0.45,
"model_name": "gradient_boosting",
"model_version": "customer-churn-v1",
"schema_version": "telco-churn-v1"
}
Validation behavior:
- Missing required field: return validation error, không gọi model.
- Unknown categorical value: cho phép đi qua
OneHotEncoder(handle_unknown="ignore"), đồng thời log unknown category rate ở service. - Numeric không parse được: convert thành missing và để imputer xử lý nếu field không bắt buộc strict. Với production nghiêm ngặt, nên reject nếu numeric core field invalid.
8. Metric Decision Matrix
| Business context | Metric chính | Threshold objective | Lý do |
|---|---|---|---|
| Retention team có capacity thấp | Precision, PR-AUC | Maximize precision với recall tối thiểu | Không muốn lãng phí offer |
| Churn rất đắt | Recall, PR-AUC | Recall >= target rồi maximize precision | Chấp nhận nhiều FP để giảm FN |
| Chỉ dùng để rank list gọi điện | PR-AUC, lift@K | Top K customers | Class threshold ít quan trọng |
| Realtime offer | F1, latency, calibration | Balance precision/recall và p99 latency | Cần quyết định ngay |
| Model monitoring | Positive rate, score drift | Alert theo distribution | Offline metric không đủ |
9. Model Selection Matrix
| Điều kiện | Chọn |
|---|---|
| Logistic Regression gần bằng model khác, latency/explainability quan trọng | Logistic Regression |
| Non-linear pattern rõ, model size chấp nhận | Gradient Boosting |
| Cần robust baseline tree ensemble, ít tuning | Random Forest |
| Dataset lớn, nhiều categorical high-cardinality | Cân nhắc histogram boosting hoặc specialized tabular model, nhưng ngoài phạm vi Day 8 |
| Probability dùng cho quyết định tiền lớn | Thêm calibration và business experiment |
10. Monitoring Checklist
Training-time:
- Data snapshot version.
- Target rate.
- Missing rate.
- Cardinality.
- Metrics by segment.
- Artifact checksum.
Serving-time:
- Request count.
- Validation error rate.
- Null rate theo field.
- Unknown category rate.
- Score distribution.
- Predicted positive rate.
- Latency p50/p95/p99.
- Model version đang serve.
Post-serving:
- Actual churn rate sau khi label mature.
- Precision/recall thực tế.
- Campaign conversion/lift.
- Segment bias.
- Drift giữa train và live data.
11. Review Checklist Cho Portfolio
- README có giải thích business problem, không chỉ code.
- Code có
main()hoặc CLI rõ ràng. - Có seed và split strategy.
- Có validation và inference contract.
- Có ít nhất 3 models và leaderboard.
- Có threshold report.
- Có error analysis file.
- Có artifact metadata.
- Có câu trả lời production readiness.
- Không commit data nhạy cảm hoặc artifact lớn không cần thiết.
12. Tài Liệu Tham Khảo
- scikit-learn stable documentation:
Pipeline,ColumnTransformer, preprocessing, classification metrics, ensemble classifiers. - Telco Customer Churn dataset.
- Google Machine Learning Crash Course: Classification.
- Chip Huyen, Designing Machine Learning Systems.
- Hidden Technical Debt in Machine Learning Systems.
Bài tập
Mục tiêu của bài tập là viết một script training gần production, không chỉ train model trong notebook. Bạn có thể dùng Telco Customer Churn CSV hoặc để script tạo synthetic dataset có schema tương tự.
1. Yêu Cầu Hoàn Thành
Bạn cần nộp được:
README.mdmô tả problem, dataset, metrics, threshold, trade-off và production readiness.requirements.txthoặcpyproject.toml.- Script train pipeline.
- Metrics report.
- Error analysis report.
- Model artifact
.joblib. - Inference function nhận
dictvà trảdict.
Trong repo bài học, bạn có thể làm trong folder riêng của mình. Không cần commit dataset thật hoặc artifact lớn.
2. Cài Đặt
python -m venv .venv
source .venv/bin/activate
pip install numpy pandas scikit-learn joblib
Nếu dùng requirements.txt:
numpy>=1.26
pandas>=2.1
scikit-learn>=1.4,<2.0
joblib>=1.3
3. Code Mẫu Hoàn Chỉnh
Tạo file src/churn_pipeline.py trong project riêng của bạn. Code dưới đây cố ý viết theo style dễ tách module, có schema validation, pipeline, threshold tuning, artifact metadata và inference contract.
from __future__ import annotations
import argparse
import json
import platform
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import joblib
import numpy as np
import pandas as pd
import sklearn
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
average_precision_score,
classification_report,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
RANDOM_STATE = 42
TARGET_COL = "Churn"
ID_COL = "customerID"
SCHEMA_VERSION = "telco-churn-v1"
MODEL_VERSION = "customer-churn-v1"
RAW_FEATURE_COLUMNS = [
"customerID",
"gender",
"SeniorCitizen",
"Partner",
"Dependents",
"tenure",
"PhoneService",
"InternetService",
"Contract",
"PaperlessBilling",
"PaymentMethod",
"MonthlyCharges",
"TotalCharges",
]
REQUIRED_TRAINING_COLUMNS = RAW_FEATURE_COLUMNS + [TARGET_COL]
NUMERIC_FEATURES = [
"SeniorCitizen",
"tenure",
"MonthlyCharges",
"TotalCharges",
"avg_monthly_charge_observed",
]
CATEGORICAL_FEATURES = [
"gender",
"Partner",
"Dependents",
"PhoneService",
"InternetService",
"Contract",
"PaperlessBilling",
"PaymentMethod",
"tenure_group",
"monthly_charge_band",
]
@dataclass
class ThresholdResult:
threshold: float
precision: float
recall: float
f1: float
predicted_positive_rate: float
tp: int
fp: int
fn: int
tn: int
def json_default(value: Any) -> Any:
if isinstance(value, np.integer):
return int(value)
if isinstance(value, np.floating):
return float(value)
if isinstance(value, np.ndarray):
return value.tolist()
if isinstance(value, Path):
return str(value)
return str(value)
def write_json(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
json.dumps(payload, indent=2, ensure_ascii=False, default=json_default),
encoding="utf-8",
)
def normalize_string_values(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df.columns = [str(column).strip() for column in df.columns]
for column in df.columns:
if pd.api.types.is_object_dtype(df[column]) or pd.api.types.is_string_dtype(df[column]):
df[column] = df[column].map(lambda value: value.strip() if isinstance(value, str) else value)
df[column] = df[column].replace("", np.nan)
return df
def validate_columns(df: pd.DataFrame, required_columns: list[str]) -> None:
missing = sorted(set(required_columns) - set(df.columns))
if missing:
raise ValueError(f"Missing required columns: {missing}")
def normalize_telco_frame(df: pd.DataFrame, require_target: bool) -> pd.DataFrame:
df = normalize_string_values(df)
required_columns = REQUIRED_TRAINING_COLUMNS if require_target else RAW_FEATURE_COLUMNS
validate_columns(df, required_columns)
df = df.copy()
for column in ["SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"]:
df[column] = pd.to_numeric(df[column], errors="coerce")
if require_target:
if not pd.api.types.is_numeric_dtype(df[TARGET_COL]):
df[TARGET_COL] = df[TARGET_COL].map({"Yes": 1, "No": 0, "yes": 1, "no": 0})
df[TARGET_COL] = pd.to_numeric(df[TARGET_COL], errors="coerce")
if df[TARGET_COL].isna().any():
raise ValueError("Target column contains values other than Yes/No or 1/0.")
unique_targets = set(df[TARGET_COL].astype(int).unique())
if not unique_targets.issubset({0, 1}):
raise ValueError(f"Target must be binary 0/1, got {sorted(unique_targets)}.")
df[TARGET_COL] = df[TARGET_COL].astype(int)
return df
def add_engineered_features(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
tenure = df["tenure"].clip(lower=0)
safe_tenure = tenure.replace(0, np.nan)
avg_charge = df["TotalCharges"] / safe_tenure
df["avg_monthly_charge_observed"] = avg_charge.replace([np.inf, -np.inf], np.nan)
df["avg_monthly_charge_observed"] = df["avg_monthly_charge_observed"].fillna(df["MonthlyCharges"])
df["tenure_group"] = pd.cut(
tenure,
bins=[-0.1, 6, 12, 24, 48, np.inf],
labels=["0-6", "7-12", "13-24", "25-48", "49+"],
).astype("object")
df["monthly_charge_band"] = pd.cut(
df["MonthlyCharges"],
bins=[-np.inf, 35, 70, np.inf],
labels=["low", "medium", "high"],
).astype("object")
return df[NUMERIC_FEATURES + CATEGORICAL_FEATURES]
class TelcoFeatureBuilder(BaseEstimator, TransformerMixin):
def fit(self, X: pd.DataFrame, y: pd.Series | None = None) -> "TelcoFeatureBuilder":
return self
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
normalized = normalize_telco_frame(X, require_target=False)
return add_engineered_features(normalized)
def generate_synthetic_telco(n_rows: int = 7000) -> pd.DataFrame:
rng = np.random.default_rng(RANDOM_STATE)
tenure = rng.integers(0, 73, size=n_rows)
contract = rng.choice(["Month-to-month", "One year", "Two year"], size=n_rows, p=[0.55, 0.25, 0.20])
internet = rng.choice(["DSL", "Fiber optic", "No"], size=n_rows, p=[0.35, 0.50, 0.15])
payment = rng.choice(
["Electronic check", "Mailed check", "Bank transfer", "Credit card"],
size=n_rows,
p=[0.45, 0.15, 0.20, 0.20],
)
monthly_base = np.where(internet == "Fiber optic", 82, np.where(internet == "DSL", 58, 28))
monthly_charges = np.clip(rng.normal(monthly_base, 12), 18, 120).round(2)
total_charges = (monthly_charges * np.maximum(tenure, 1) * rng.normal(1.0, 0.08, size=n_rows)).round(2)
total_charges = total_charges.astype(object)
total_charges[tenure == 0] = " "
paperless = rng.choice(["Yes", "No"], size=n_rows, p=[0.6, 0.4])
senior = rng.choice([0, 1], size=n_rows, p=[0.84, 0.16])
partner = rng.choice(["Yes", "No"], size=n_rows, p=[0.48, 0.52])
dependents = rng.choice(["Yes", "No"], size=n_rows, p=[0.30, 0.70])
phone = rng.choice(["Yes", "No"], size=n_rows, p=[0.90, 0.10])
gender = rng.choice(["Female", "Male"], size=n_rows)
logit = (
-1.7
+ 1.15 * (contract == "Month-to-month")
+ 0.55 * (internet == "Fiber optic")
+ 0.35 * (payment == "Electronic check")
+ 0.25 * (paperless == "Yes")
+ 0.30 * senior
- 0.035 * tenure
+ 0.25 * (monthly_charges > 85)
- 0.20 * (partner == "Yes")
- 0.25 * (dependents == "Yes")
)
probability = 1 / (1 + np.exp(-logit))
churn = rng.binomial(1, probability)
return pd.DataFrame(
{
"customerID": [f"CUST_{i:06d}" for i in range(n_rows)],
"gender": gender,
"SeniorCitizen": senior,
"Partner": partner,
"Dependents": dependents,
"tenure": tenure,
"PhoneService": phone,
"InternetService": internet,
"Contract": contract,
"PaperlessBilling": paperless,
"PaymentMethod": payment,
"MonthlyCharges": monthly_charges,
"TotalCharges": total_charges,
"Churn": np.where(churn == 1, "Yes", "No"),
}
)
def load_dataset(csv_path: Path | None, synthetic_rows: int) -> pd.DataFrame:
if csv_path is not None:
if not csv_path.exists():
raise FileNotFoundError(f"CSV not found: {csv_path}")
return pd.read_csv(csv_path)
return generate_synthetic_telco(synthetic_rows)
def run_eda(df: pd.DataFrame, output_dir: Path) -> dict[str, Any]:
output_dir.mkdir(parents=True, exist_ok=True)
report: dict[str, Any] = {
"rows": int(len(df)),
"columns": int(df.shape[1]),
"target_rate": float(df[TARGET_COL].mean()),
"duplicate_customer_id": int(df[ID_COL].duplicated().sum()) if ID_COL in df.columns else None,
"missing_rate": df.isna().mean().sort_values(ascending=False).to_dict(),
"numeric_summary": df[["tenure", "MonthlyCharges", "TotalCharges"]].describe().to_dict(),
}
segment_cols = ["Contract", "InternetService", "PaymentMethod", "PaperlessBilling"]
segment_report = {}
for column in segment_cols:
segment_report[column] = (
df.groupby(column, dropna=False)[TARGET_COL]
.agg(["count", "mean"])
.sort_values("mean", ascending=False)
.reset_index()
.to_dict(orient="records")
)
report["segment_churn_rate"] = segment_report
write_json(output_dir / "eda_report.json", report)
return report
def build_preprocessor() -> ColumnTransformer:
numeric_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
]
)
categorical_pipeline = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
]
)
return ColumnTransformer(
transformers=[
("num", numeric_pipeline, NUMERIC_FEATURES),
("cat", categorical_pipeline, CATEGORICAL_FEATURES),
],
remainder="drop",
)
def make_pipeline_for(model: Any) -> Pipeline:
return Pipeline(
steps=[
("features", TelcoFeatureBuilder()),
("preprocess", build_preprocessor()),
("model", model),
]
)
def build_models() -> dict[str, Pipeline]:
return {
"logistic_regression": make_pipeline_for(
LogisticRegression(
max_iter=2000,
class_weight="balanced",
random_state=RANDOM_STATE,
)
),
"random_forest": make_pipeline_for(
RandomForestClassifier(
n_estimators=300,
max_depth=9,
min_samples_leaf=20,
class_weight="balanced",
random_state=RANDOM_STATE,
n_jobs=-1,
)
),
"gradient_boosting": make_pipeline_for(
GradientBoostingClassifier(
n_estimators=180,
learning_rate=0.05,
max_depth=3,
random_state=RANDOM_STATE,
)
),
}
def metrics_at_threshold(y_true: pd.Series, proba: np.ndarray, threshold: float) -> ThresholdResult:
y_pred = (proba >= threshold).astype(int)
tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
return ThresholdResult(
threshold=float(threshold),
precision=float(precision_score(y_true, y_pred, zero_division=0)),
recall=float(recall_score(y_true, y_pred, zero_division=0)),
f1=float(f1_score(y_true, y_pred, zero_division=0)),
predicted_positive_rate=float(y_pred.mean()),
tp=int(tp),
fp=int(fp),
fn=int(fn),
tn=int(tn),
)
def tune_threshold(
y_true: pd.Series,
proba: np.ndarray,
min_recall: float,
) -> tuple[ThresholdResult, pd.DataFrame]:
rows = [asdict(metrics_at_threshold(y_true, proba, threshold)) for threshold in np.arange(0.10, 0.91, 0.02)]
threshold_report = pd.DataFrame(rows)
candidates = threshold_report[threshold_report["recall"] >= min_recall]
if candidates.empty:
best = threshold_report.sort_values(["f1", "precision"], ascending=False).iloc[0]
else:
best = candidates.sort_values(["precision", "f1"], ascending=False).iloc[0]
return ThresholdResult(**best.to_dict()), threshold_report
def evaluate_predictions(y_true: pd.Series, proba: np.ndarray, threshold: float) -> dict[str, Any]:
threshold_metrics = metrics_at_threshold(y_true, proba, threshold)
y_pred = (proba >= threshold).astype(int)
return {
"roc_auc": float(roc_auc_score(y_true, proba)),
"pr_auc": float(average_precision_score(y_true, proba)),
"threshold_metrics": asdict(threshold_metrics),
"classification_report": classification_report(y_true, y_pred, output_dict=True, zero_division=0),
}
def measure_prediction_latency(model: Pipeline, X: pd.DataFrame, repeats: int = 10) -> dict[str, float]:
sample = X.head(min(len(X), 512))
durations = []
for _ in range(repeats):
start = time.perf_counter()
model.predict_proba(sample)[:, 1]
durations.append((time.perf_counter() - start) * 1000)
per_row = [duration / len(sample) for duration in durations]
return {
"batch_size": float(len(sample)),
"p50_ms_per_row": float(np.percentile(per_row, 50)),
"p95_ms_per_row": float(np.percentile(per_row, 95)),
}
def run_error_analysis(
model: Pipeline,
X_test: pd.DataFrame,
y_test: pd.Series,
threshold: float,
output_dir: Path,
min_slice_size: int = 30,
) -> dict[str, Any]:
output_dir.mkdir(parents=True, exist_ok=True)
proba = model.predict_proba(X_test)[:, 1]
y_pred = (proba >= threshold).astype(int)
scored = X_test.copy()
scored["y_true"] = y_test.to_numpy()
scored["proba"] = proba
scored["y_pred"] = y_pred
scored["error_type"] = np.select(
[
(scored["y_true"] == 0) & (scored["y_pred"] == 1),
(scored["y_true"] == 1) & (scored["y_pred"] == 0),
],
["false_positive", "false_negative"],
default="correct",
)
false_positives = scored[scored["error_type"] == "false_positive"].sort_values("proba", ascending=False)
false_negatives = scored[scored["error_type"] == "false_negative"].sort_values("proba", ascending=True)
false_positives.head(50).to_csv(output_dir / "false_positives.csv", index=False)
false_negatives.head(50).to_csv(output_dir / "false_negatives.csv", index=False)
feature_view = TelcoFeatureBuilder().transform(X_test)
feature_view["y_true"] = y_test.to_numpy()
feature_view["y_pred"] = y_pred
slice_rows = []
for column in ["Contract", "InternetService", "PaymentMethod", "tenure_group", "monthly_charge_band"]:
for value, group in feature_view.groupby(column, dropna=False):
if len(group) < min_slice_size:
continue
slice_rows.append(
{
"slice_column": column,
"slice_value": str(value),
"count": int(len(group)),
"actual_positive_rate": float(group["y_true"].mean()),
"predicted_positive_rate": float(group["y_pred"].mean()),
"precision": float(precision_score(group["y_true"], group["y_pred"], zero_division=0)),
"recall": float(recall_score(group["y_true"], group["y_pred"], zero_division=0)),
"f1": float(f1_score(group["y_true"], group["y_pred"], zero_division=0)),
}
)
slice_metrics = pd.DataFrame(slice_rows).sort_values(["f1", "count"], ascending=[True, False])
slice_metrics.to_csv(output_dir / "slice_metrics.csv", index=False)
return {
"false_positive_count": int(len(false_positives)),
"false_negative_count": int(len(false_negatives)),
"worst_slices": slice_metrics.head(10).to_dict(orient="records"),
}
def train_pipeline(
csv_path: Path | None,
artifact_path: Path,
report_dir: Path,
synthetic_rows: int,
min_recall: float,
) -> dict[str, Any]:
raw_df = load_dataset(csv_path, synthetic_rows)
df = normalize_telco_frame(raw_df, require_target=True)
run_eda(df, report_dir)
X = df[RAW_FEATURE_COLUMNS]
y = df[TARGET_COL]
X_train_full, X_test, y_train_full, y_test = train_test_split(
X,
y,
test_size=0.20,
stratify=y,
random_state=RANDOM_STATE,
)
X_train, X_val, y_train, y_val = train_test_split(
X_train_full,
y_train_full,
test_size=0.25,
stratify=y_train_full,
random_state=RANDOM_STATE,
)
model_reports = []
trained_models = {}
threshold_reports = {}
for model_name, model in build_models().items():
print(f"Training {model_name}...")
model.fit(X_train, y_train)
trained_models[model_name] = model
val_proba = model.predict_proba(X_val)[:, 1]
best_threshold, threshold_report = tune_threshold(y_val, val_proba, min_recall=min_recall)
threshold_reports[model_name] = threshold_report
threshold_report.to_csv(report_dir / f"threshold_report_{model_name}.csv", index=False)
val_metrics = evaluate_predictions(y_val, val_proba, best_threshold.threshold)
latency = measure_prediction_latency(model, X_val)
model_reports.append(
{
"model_name": model_name,
"threshold": best_threshold.threshold,
"validation_metrics": val_metrics,
"latency": latency,
}
)
leaderboard = pd.DataFrame(
[
{
"model_name": report["model_name"],
"threshold": report["threshold"],
"pr_auc": report["validation_metrics"]["pr_auc"],
"roc_auc": report["validation_metrics"]["roc_auc"],
"precision": report["validation_metrics"]["threshold_metrics"]["precision"],
"recall": report["validation_metrics"]["threshold_metrics"]["recall"],
"f1": report["validation_metrics"]["threshold_metrics"]["f1"],
"p95_ms_per_row": report["latency"]["p95_ms_per_row"],
}
for report in model_reports
]
).sort_values(["pr_auc", "f1"], ascending=False)
report_dir.mkdir(parents=True, exist_ok=True)
leaderboard.to_csv(report_dir / "leaderboard.csv", index=False)
best_model_name = str(leaderboard.iloc[0]["model_name"])
best_threshold = float(leaderboard.iloc[0]["threshold"])
best_model = trained_models[best_model_name]
test_proba = best_model.predict_proba(X_test)[:, 1]
test_metrics = evaluate_predictions(y_test, test_proba, best_threshold)
latency = measure_prediction_latency(best_model, X_test)
error_report = run_error_analysis(best_model, X_test, y_test, best_threshold, report_dir / "error_analysis")
metadata = {
"model_name": best_model_name,
"model_version": MODEL_VERSION,
"schema_version": SCHEMA_VERSION,
"threshold": best_threshold,
"raw_feature_columns": RAW_FEATURE_COLUMNS,
"numeric_features": NUMERIC_FEATURES,
"categorical_features": CATEGORICAL_FEATURES,
"validation_leaderboard": leaderboard.to_dict(orient="records"),
"test_metrics": test_metrics,
"latency": latency,
"error_analysis": error_report,
"created_at_utc": datetime.now(timezone.utc).isoformat(),
"python_version": platform.python_version(),
"sklearn_version": sklearn.__version__,
"random_state": RANDOM_STATE,
"training_rows": int(len(X_train)),
"validation_rows": int(len(X_val)),
"test_rows": int(len(X_test)),
"training_note": "Validate with a time-based split and live monitoring before production deployment.",
}
artifact = {
"model": best_model,
"metadata": metadata,
}
artifact_path.parent.mkdir(parents=True, exist_ok=True)
joblib.dump(artifact, artifact_path)
write_json(report_dir / "metrics_report.json", metadata)
print(f"Saved artifact: {artifact_path}")
print(leaderboard)
return metadata
def risk_tier(probability: float) -> str:
if probability >= 0.70:
return "high"
if probability >= 0.40:
return "medium"
return "low"
def predict_customer_churn(customer: dict[str, Any], artifact_path: Path) -> dict[str, Any]:
artifact = joblib.load(artifact_path)
model: Pipeline = artifact["model"]
metadata: dict[str, Any] = artifact["metadata"]
threshold = float(metadata["threshold"])
input_df = pd.DataFrame([customer])
normalized = normalize_telco_frame(input_df, require_target=False)
probability = float(model.predict_proba(normalized[RAW_FEATURE_COLUMNS])[:, 1][0])
return {
"customer_id": str(customer.get(ID_COL, "")),
"churn_probability": round(probability, 4),
"will_churn": bool(probability >= threshold),
"risk_tier": risk_tier(probability),
"threshold": threshold,
"model_name": metadata["model_name"],
"model_version": metadata["model_version"],
"schema_version": metadata["schema_version"],
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Train and evaluate a customer churn ML pipeline.")
parser.add_argument("--csv", type=Path, default=None, help="Path to Telco Customer Churn CSV.")
parser.add_argument("--artifact", type=Path, default=Path("artifacts/customer_churn_model.joblib"))
parser.add_argument("--report-dir", type=Path, default=Path("artifacts/reports"))
parser.add_argument("--synthetic-rows", type=int, default=7000)
parser.add_argument("--min-recall", type=float, default=0.75)
return parser.parse_args()
def main() -> None:
args = parse_args()
train_pipeline(
csv_path=args.csv,
artifact_path=args.artifact,
report_dir=args.report_dir,
synthetic_rows=args.synthetic_rows,
min_recall=args.min_recall,
)
if __name__ == "__main__":
main()
4. Chạy Training
Không có CSV:
python src/churn_pipeline.py
Có Telco CSV:
python src/churn_pipeline.py --csv data/telco_customer_churn.csv
Kết quả mong đợi:
artifacts/customer_churn_model.joblib
artifacts/reports/eda_report.json
artifacts/reports/leaderboard.csv
artifacts/reports/metrics_report.json
artifacts/reports/threshold_report_logistic_regression.csv
artifacts/reports/threshold_report_random_forest.csv
artifacts/reports/threshold_report_gradient_boosting.csv
artifacts/reports/error_analysis/false_positives.csv
artifacts/reports/error_analysis/false_negatives.csv
artifacts/reports/error_analysis/slice_metrics.csv
5. Thử Inference
Sau khi train xong:
from pathlib import Path
from churn_pipeline import predict_customer_churn
customer = {
"customerID": "CUST_999999",
"gender": "Female",
"SeniorCitizen": 0,
"Partner": "Yes",
"Dependents": "No",
"tenure": 12,
"PhoneService": "Yes",
"InternetService": "Fiber optic",
"Contract": "Month-to-month",
"PaperlessBilling": "Yes",
"PaymentMethod": "Electronic check",
"MonthlyCharges": 89.9,
"TotalCharges": 1020.3,
}
result = predict_customer_churn(customer, Path("artifacts/customer_churn_model.joblib"))
print(result)
Response cần có dạng:
{
"customer_id": "CUST_999999",
"churn_probability": 0.7312,
"will_churn": true,
"risk_tier": "high",
"threshold": 0.45,
"model_name": "gradient_boosting",
"model_version": "customer-churn-v1",
"schema_version": "telco-churn-v1"
}
6. Bài Tập Mở Rộng
Thêm unit test cho
normalize_telco_frame():- Missing column phải raise
ValueError. TotalCharges=" "phải thành missing.Churn="Yes"phải thành1.
- Missing column phải raise
Thêm test cho inference contract:
- Payload đủ field trả về đầy đủ keys.
- Payload thiếu
MonthlyChargesphải fail. - Category mới trong
PaymentMethodkhông làm model crash.
So sánh threshold objectives:
min_recall=0.60min_recall=0.75min_recall=0.90
Thêm calibration:
- Dùng
CalibratedClassifierCVcho model tốt nhất. - So sánh calibration curve hoặc Brier score.
- Dùng
Thêm time-based split nếu dataset của bạn có timestamp:
- Train trên tháng cũ.
- Validation trên tháng kế tiếp.
- Test trên tháng mới nhất.
Viết README thật:
- Không chỉ liệt kê command.
- Phải có trade-off, limitation và production readiness.
7. Câu Hỏi Review
- Vì sao
TelcoFeatureBuildernằm trongPipelinethay vì gọi thủ công ở notebook? - Vì sao threshold được tune trên validation set, không phải test set?
- Nếu model có PR-AUC cao nhưng recall thấp tại threshold đã chọn, bạn xử lý thế nào?
OneHotEncoder(handle_unknown="ignore")giải quyết vấn đề gì và không giải quyết vấn đề gì?- Nếu service nhận category mới liên tục, bạn monitor metric nào?
- Nếu Random Forest tốt hơn Logistic Regression 0.5 điểm PR-AUC nhưng latency p95 gấp 20 lần, bạn chọn gì trong batch scoring và realtime API?
- Điều kiện nào còn thiếu trước khi gọi pipeline này là production-ready?
8. Tiêu Chí Chấm
| Hạng mục | Đạt |
|---|---|
| Schema validation | Fail fast khi thiếu cột, normalize type đúng |
| EDA | Có target distribution, missing, segment churn rate |
| Pipeline | Có Pipeline, ColumnTransformer, transformer chung training/inference |
| Models | Ít nhất 3 models, có Logistic Regression baseline |
| Metrics | Có ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix |
| Threshold | Tune trên validation set, có business objective |
| Error analysis | Có FP/FN và slice metrics |
| Artifact | Save model + metadata + threshold + schema |
| Inference | Function nhận dict, trả contract rõ ràng |
| Production notes | Có trade-off, limitation, monitoring, rollback |