Published on

Day 5: Feature Engineering

Authors

Mục Tiêu

Sau Day 3, bạn đã biết train/validation/test split, overfitting, baseline model và bias-variance. Sau Day 4, bạn đã dùng NumPy, Pandas và scikit-learn để train model cơ bản. Day 5 nối hai bài đó lại bằng phần hay bị xem nhẹ nhất trong ML production: biến raw data thành feature ổn định, có thể kiểm thử, có thể deploy và không leak dữ liệu tương lai.

Kết thúc bài này, bạn cần làm được:

  • Thiết kế feature cho numerical, categorical, text và datetime data.
  • Dùng PipelineColumnTransformer để giữ logic preprocessing nhất quán giữa training/inference.
  • Chọn imputation, scaling, encoding và text vectorization theo trade-off cụ thể.
  • Nhận diện data leakage, train-serving skew, schema drift và point-in-time bug.
  • Viết được validation checks trước khi gọi model.
  • Trả lời rõ: "Dùng được trong production không? Nếu có thì cần điều kiện gì?"

TL;DR

Feature engineering là lớp contract giữa data system và model. Với Senior SE, hãy nghĩ feature như API schema: tên cột, type, nullability, semantics và thời điểm dữ liệu đều phải rõ. Model tốt không cứu được feature sai thời điểm, feature bị leak, category drift hoặc preprocessing khác nhau giữa train và serve.

Best default cho bài toán tabular ở giai đoạn đầu:

Split đúng thời gian hoặc stratified split
-> fit preprocessing chỉ trên train
-> ColumnTransformer theo nhóm feature
-> Pipeline(preprocess, model)
-> validate schema trước inference
-> log missing rate/cardinality/distribution

1. Feature Là Gì?

Raw data là dữ liệu nghiệp vụ. Feature là biểu diễn mà model có thể học.

Ví dụ churn prediction:

Raw dataFeature có thể dùngGhi chú production
signup_ataccount_age_daysTính tại prediction_time, không dùng thời gian hiện tại ngầm định trong training
last_login_atdays_since_last_loginNếu missing có thể nghĩa là chưa từng login
monthly_chargesmonthly_charges_scaled, log_monthly_chargesCần xử lý outlier và missing
contract_typeone-hot columnsCần handle category mới ở inference
support_ticket_textTF-IDF vectorCần lưu vocabulary cùng model
ticket_countticket_count_30dChỉ tính từ event trước thời điểm dự đoán

Map với backend:

Backend conceptML equivalent
API request schemaFeature schema
Contract testInput validation và schema validation
ETL jobFeature generation job
Cache invalidationFeature freshness
Backward compatibilityFeature versioning
ObservabilityDrift, missing rate, cardinality, outlier monitoring

2. Nguyên Tắc Không Leak Dữ Liệu

Data leakage xảy ra khi feature chứa thông tin mà tại thời điểm prediction thật sự chưa thể biết. Đây là lỗi nghiêm trọng hơn chọn sai model, vì offline metrics sẽ đẹp giả.

Ba dạng leakage hay gặp:

  1. Fit preprocessing trên toàn bộ dataset trước khi split. Ví dụ fit StandardScaler, OneHotEncoder, TfidfVectorizer, imputer trên cả train và test.
  2. Dùng future information. Ví dụ predict churn ngày 2026-05-01 nhưng dùng ticket_count_30d tính cả ticket ngày 2026-05-10.
  3. Dùng target proxy. Ví dụ feature cancellation_ticket_created xuất hiện sau khi khách đã quyết định churn.

Rule thực tế:

Mọi transformer có .fit() phải fit trên training data.
Mọi aggregate feature phải có cutoff time.
Mọi join phải chứng minh point-in-time correctness.

3. Numerical Features

Numerical features thường gặp vấn đề: missing value, scale lệch lớn, outlier, distribution skewed.

Scaling

Kỹ thuậtKhi nên dùngTrade-off
StandardScalerLogistic Regression, SVM, KNN, Neural Network; data tương đối ổnNhạy với outlier
MinMaxScalerFeature cần nằm trong khoảng cố định [0, 1]Rất nhạy với outlier
RobustScalerRevenue, transaction amount, latency, usage count có outlierCó thể kém hơn nếu data sạch và gần normal
Không scaleTree-based model như Random Forest, Gradient BoostingNếu đổi sang linear model có thể phải sửa pipeline

Default production hợp lý cho churn tabular bằng Logistic Regression: median imputation + missing indicator + RobustScaler.

from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import RobustScaler

numeric_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median", add_indicator=True)),
    ("scaler", RobustScaler()),
])

Log Transform

Dùng cho feature lệch phải như revenue, charge, request count, session duration.

import numpy as np

df["log_monthly_charges"] = np.log1p(df["monthly_charges"])

Trade-off: log transform giảm tác động outlier nhưng làm feature khó giải thích hơn với stakeholder không quen toán.

Binning

Ví dụ:

days_since_last_login:
0-7     -> active
8-30    -> cooling
31-90   -> at_risk
>90     -> dormant

Binning hữu ích khi business threshold rõ, nhưng mất thông tin liên tục. Không nên binning chỉ vì model chưa tốt; trước tiên hãy kiểm tra leakage, missing và baseline.

4. Categorical Features

One-hot Encoding

One-hot an toàn cho category không có thứ tự và cardinality thấp-vừa.

from sklearn.preprocessing import OneHotEncoder

encoder = OneHotEncoder(
    handle_unknown="ignore",
    min_frequency=10,
)

handle_unknown="ignore" giúp inference không crash khi gặp category mới, nhưng category mới sẽ thành vector toàn 0 trong nhóm đó. Với production, vẫn phải log unknown rate.

min_frequency hoặc grouping infrequent categories giúp giảm số chiều khi có nhiều category hiếm.

Label/Ordinal Encoding

Chỉ dùng khi category có thứ tự thật:

free < basic < pro < enterprise

Không nên encode payment_method = credit_card, bank_transfer, wallet thành 0, 1, 2 cho Logistic Regression, vì model sẽ hiểu nhầm khoảng cách số học.

Target Encoding

Target encoding thay category bằng thống kê target lịch sử, ví dụ city -> churn_rate. Nó mạnh với high-cardinality feature như city, merchant, campaign, device model, nhưng rất dễ leakage.

Điều kiện tối thiểu nếu dùng target encoding:

  • Tính encoding trong cross-validation fold, không dùng toàn bộ train trực tiếp cho từng row.
  • Có smoothing để category ít data không quá cực đoan.
  • Có fallback cho category mới.
  • Không dùng test set để tính encoding.

Trong bài Day 5, best solution là chưa dùng target encoding nếu chưa có test leakage chặt. Hãy dùng one-hot với grouping infrequent trước.

5. Text Features: TF-IDF

TF-IDF là baseline text quan trọng trước khi chuyển sang Transformer. Nó không hiểu semantic sâu, nhưng nhanh, rẻ, dễ debug và thường đủ tốt cho ticket/email/search query classification.

from sklearn.feature_extraction.text import TfidfVectorizer

text_pipeline = TfidfVectorizer(
    max_features=20_000,
    ngram_range=(1, 2),
    min_df=5,
    max_df=0.9,
    strip_accents="unicode",
)

Production concerns:

  • Vocabulary được học trong fit(), nên phải lưu cùng model artifact.
  • Không fit lại vectorizer ở inference.
  • max_features là trade-off giữa recall, memory và latency.
  • Text có thể chứa PII như email, phone, address; cần masking hoặc policy rõ.
  • Với tiếng Việt, TF-IDF word-level có thể kém nếu tokenization chưa tốt; có thể cân nhắc char n-gram hoặc tokenizer tiếng Việt ở bài NLP sau.

6. Datetime Features Và Point-in-Time Correctness

Không nên đưa raw timestamp trực tiếp vào model. Hãy biến nó thành feature có ý nghĩa:

  • account_age_days.
  • days_since_last_login.
  • signup_month.
  • signup_day_of_week.
  • is_weekend_signup.
  • ticket_count_7d, ticket_count_30d.

Điểm quan trọng nhất: dùng prediction_time, không dùng pd.Timestamp.now() tùy tiện trong training.

import pandas as pd

def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
    result = df.copy()
    for col in ["prediction_time", "signup_at", "last_login_at"]:
        result[col] = pd.to_datetime(result[col], utc=True, errors="coerce")

    result["account_age_days"] = (
        result["prediction_time"] - result["signup_at"]
    ).dt.total_seconds() / 86_400
    result["days_since_last_login"] = (
        result["prediction_time"] - result["last_login_at"]
    ).dt.total_seconds() / 86_400
    result["signup_month"] = result["signup_at"].dt.month.astype("Int64")
    result["signup_day_of_week"] = result["signup_at"].dt.dayofweek.astype("Int64")
    return result

Nếu feature cần join từ bảng event/log, dùng as-of join hoặc query có điều kiện event_time <= prediction_time. Trong Pandas, merge_asof là pattern hữu ích cho time-aware join, nhưng input phải được sort theo key thời gian.

7. Missing Data

Missing không phải lúc nào cũng là lỗi. Nó có thể là signal.

Ví dụ:

  • last_login_at missing: user chưa từng login.
  • latest_ticket_text missing: user chưa từng mở ticket.
  • monthly_charges missing: upstream billing lỗi hoặc plan chưa active.

Decision table:

Cách xử lýKhi dùngRủi ro
Drop rowMissing rất ít và randomMất data, bias nếu missing không random
Mean/medianNumerical baselineChe mất signal missing
Most frequentCategorical đơn giảnCó thể làm category phổ biến bị overweight
Constant __missing__Missing có nghĩa nghiệp vụTăng cardinality
Missing indicatorMissing là signalTăng số feature
Model-based imputationData lớn, pattern phức tạpDễ overfit, khó debug

Best default: median cho numerical, constant __missing__ cho categorical/text, thêm missing indicator cho numerical quan trọng.

8. Feature Selection

Feature selection giúp giảm overfitting, memory, latency và độ phức tạp vận hành.

Các bước nên làm:

  1. Drop feature không có nghĩa tại prediction time.
  2. Drop feature quá nhiều missing nếu không có signal rõ.
  3. Drop constant/near-constant.
  4. Group hoặc drop high-cardinality category không kiểm soát.
  5. Dùng SelectPercentile, SelectKBest, L1 regularization hoặc model importance sau khi có baseline.
  6. Không chọn feature dựa trên test set.

Trong scikit-learn, selector có thể nằm trong pipeline để tránh leakage:

from sklearn.feature_selection import SelectPercentile, chi2
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder

categorical_pipeline = Pipeline([
    ("encoder", OneHotEncoder(handle_unknown="ignore")),
    ("selector", SelectPercentile(score_func=chi2, percentile=80)),
])

9. Pipeline Gần Production

Skeleton nên dùng:

from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, RobustScaler

numeric_features = [
    "monthly_charges",
    "account_age_days",
    "days_since_last_login",
    "support_ticket_count_30d",
]
categorical_features = ["contract_type", "payment_method", "signup_month"]
text_feature = "latest_ticket_text"

numeric_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median", add_indicator=True)),
    ("scaler", RobustScaler()),
])

categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="constant", fill_value="__missing__")),
    ("onehot", OneHotEncoder(handle_unknown="ignore", min_frequency=10)),
])

preprocessor = ColumnTransformer([
    ("num", numeric_pipeline, numeric_features),
    ("cat", categorical_pipeline, categorical_features),
    ("text", TfidfVectorizer(max_features=20_000, ngram_range=(1, 2), min_df=5), text_feature),
])

model = Pipeline([
    ("preprocess", preprocessor),
    ("classifier", LogisticRegression(max_iter=1_000, class_weight="balanced")),
])

Điểm đáng chú ý:

  • ColumnTransformer áp dụng preprocessing khác nhau theo nhóm cột.
  • Pipeline.fit() fit cả preprocessing và model trên train.
  • Pipeline.predict() tái sử dụng đúng preprocessing đã fit.
  • OneHotEncoder(handle_unknown="ignore") tránh crash với category mới, nhưng cần monitoring.
  • SimpleImputer(add_indicator=True) giúp model biết giá trị nào từng bị missing.

10. Schema Validation Trước Inference

scikit-learn không thay thế được data contract. Trước khi gọi model.predict, service nên validate input.

Ví dụ validation tối thiểu:

REQUIRED_COLUMNS = {
    "customer_id": "object",
    "prediction_time": "datetime64[ns, UTC]",
    "signup_at": "datetime64[ns, UTC]",
    "last_login_at": "datetime64[ns, UTC]",
    "contract_type": "object",
    "payment_method": "object",
    "monthly_charges": "number",
    "latest_ticket_text": "object",
}

def validate_inference_schema(df):
    missing = set(REQUIRED_COLUMNS) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {sorted(missing)}")

    if df["customer_id"].isna().any():
        raise ValueError("customer_id must not be null")

    if (df["monthly_charges"].dropna() < 0).any():
        raise ValueError("monthly_charges must be non-negative")

    if (df["prediction_time"] < df["signup_at"]).any():
        raise ValueError("prediction_time must be >= signup_at")

Trong production thật, nên dùng Pandera, Great Expectations, Pydantic hoặc validation ở API/data layer. Với Day 5, mục tiêu là hiểu contract, không phụ thuộc tool.

11. Performance Và Production Concerns

Các bottleneck thường không nằm ở model mà nằm ở feature:

  • Join nhiều bảng để tính rolling feature.
  • High-cardinality one-hot làm sparse matrix rất lớn.
  • TF-IDF vocabulary lớn làm tăng memory và latency.
  • Imputation/encoding fit lại trong inference do deploy sai artifact.
  • Batch training dùng Pandas ổn nhưng online serving cần feature store/cache.

Checklist vận hành:

  • Version model artifact cùng feature pipeline.
  • Lưu training schema, feature list, encoder vocabulary, TF-IDF vocabulary.
  • Log input row count, missing rate, unknown category rate, text empty rate.
  • Monitor distribution drift cho feature quan trọng.
  • Có fallback khi feature source chậm hoặc lỗi.
  • Không log raw PII trong text.

12. Dùng Được Trong Production Không?

Có, pipeline trong bài này dùng được làm nền tảng production cho bài toán tabular/text baseline nếu đáp ứng các điều kiện sau:

  • Feature được định nghĩa theo point-in-time correctness, không dùng dữ liệu tương lai.
  • Train/test split phản ánh cách model sẽ chạy thật: time-based split nếu bài toán có yếu tố thời gian.
  • Toàn bộ preprocessing nằm trong Pipeline/ColumnTransformer và được lưu cùng model.
  • Có schema validation trước inference.
  • Có monitoring cho missing rate, unknown category, feature drift, latency và model metrics.
  • Có versioning cho feature schema, model, data snapshot và training code.
  • Có kiểm thử cho leakage, schema contract và train-serving consistency.
  • Với online low-latency, rolling/aggregate features cần precompute hoặc cache, không query/ad-hoc join nặng trong request path.

Không nên coi notebook preprocessing rời rạc là production-ready. Production-ready là deploy cả feature contract + preprocessing pipeline + model + validation + monitoring.

13. Tự Kiểm Tra

  1. Vì sao không fit scaler/encoder/vectorizer trên toàn bộ dataset trước khi split?
  2. Khi nào RobustScaler tốt hơn StandardScaler?
  3. Vì sao OneHotEncoder(handle_unknown="ignore") không đủ để bỏ qua monitoring category drift?
  4. Target encoding leak như thế nào?
  5. Tại sao datetime feature phải dựa trên prediction_time?
  6. Nếu offline ROC-AUC cao nhưng production fail, bạn kiểm tra feature nào trước?
  7. Feature engineering khác business rule engineering ở điểm nào?

Liên Kết


Tài liệu

File này dùng như tài liệu tra nhanh khi chọn kỹ thuật feature engineering. Hãy đọc cùng lession.md và làm bài trong exercise.md.

1. Decision Matrix Theo Loại Feature

Feature typeDefault tốt để bắt đầuKhi cần đổiProduction concern
Numerical sạch, ít outlierMedian imputation + StandardScalerCó outlier lớn thì dùng RobustScalerMonitor min/max, p95/p99, missing rate
Numerical skewednp.log1p + scalerNếu value có âm thì cần transform khácLog transform phải giống nhau ở train/inference
Count featureRaw count + optional log/count bucketCount quá lệch hoặc long-tailĐảm bảo window tính count không dùng future event
Low-cardinality categoricalConstant imputation + one-hotCategory có thứ tự thật thì ordinalUnknown category rate có thể báo upstream drift
High-cardinality categoricalGroup infrequent + one-hotTarget encoding nếu có CV encoding chặtMemory, latency, leakage
Text ngắnTF-IDF word/char n-gramCần semantic thì dùng embedding/Transformer ở phase sauPII, vocabulary version, latency
DatetimeAge/delta/cyclical/calendar featuresSeasonality phức tạp thì thêm rolling featurePoint-in-time correctness
Missing value có nghĩaMissing indicator hoặc __missing__Missing là lỗi source thì fail fastPhân biệt business missing và system missing

2. Encoding Trade-off

EncodingƯu điểmNhược điểmBest context
One-hotDễ hiểu, ít giả định, hợp linear modelNổ số chiều với cardinality caoCategory dưới vài trăm giá trị ổn định
OrdinalGọn, nhanhTạo thứ tự giả nếu dùng saiCategory có thứ tự thật
HashingKhông cần vocabulary, xử lý category mớiCollision, khó debugStreaming/high-cardinality khi memory hạn chế
Target encodingMạnh với high-cardinalityRất dễ leakageCó CV encoding, smoothing và kiểm thử nghiêm
Learned embeddingMạnh khi data lớnCần deep learning pipelineRecommender/NLP/large-scale categorical

Best solution theo context Day 5: one-hot với handle_unknown="ignore" và grouping infrequent categories. Chỉ nâng cấp lên target encoding khi đã có leakage tests và cross-validation encoding đúng.

3. Scaling Trade-off

ScalerNên dùng khiKhông nên dùng khi
StandardScalerLinear/SVM/KNN/NN, data gần normalOutlier cực lớn
MinMaxScalerCần range cố định, ví dụ một số NN setupData production có outlier không kiểm soát
RobustScalerRevenue, usage, latency, transaction amountData sạch và muốn interpret z-score
No scalingTree-based modelModel phụ thuộc distance/gradient

4. Leakage Checklist

Trước khi tin metrics, hãy hỏi:

  • Feature này có tồn tại tại thời điểm prediction không?
  • Aggregation window có điều kiện event_time <= prediction_time không?
  • Split có được làm trước khi fit imputer/scaler/encoder/vectorizer không?
  • Feature selection có dùng test set không?
  • Duplicate customer/order/session có bị rơi cả train và test không?
  • Text có chứa label trực tiếp như "cancelled", "refund completed", "retention approved" sau outcome không?
  • Target encoding có được tính out-of-fold không?

5. Schema Contract Tối Thiểu

Mỗi feature nên có metadata:

FieldÝ nghĩa
nameTên ổn định của feature
dtypeKiểu dữ liệu expected
nullableCó cho phép null không
allowed_valuesDomain cho categorical nếu biết
rangeMin/max hợp lệ cho numerical
sourceBảng/API/event sinh ra feature
availability_timeKhi nào feature có thể biết
ownerTeam chịu trách nhiệm
versionVersion khi đổi semantics

Ví dụ:

monthly_charges:
  dtype: float
  nullable: true
  range: [0, 10000]
  source: billing.accounts
  availability_time: before_prediction
  owner: billing-platform
  version: 1

6. Monitoring Metrics

MetricVì sao cần
Missing rate theo featurePhát hiện upstream source lỗi
Unknown category ratePhát hiện taxonomy/category drift
Numerical p50/p95/p99Phát hiện distribution shift/outlier
Text empty rate và average lengthPhát hiện ingestion/tokenization lỗi
Preprocessing latencyFeature pipeline có thể là bottleneck
Prediction input rejection rateValidation quá chặt hoặc upstream sai contract
Feature freshnessAggregate feature stale làm model sai

7. Khi Nào Nâng Cấp Feature Pipeline?

Tín hiệuHành động
One-hot matrix quá lớnGroup infrequent, hashing, target encoding có kiểm soát
Rolling feature tính chậmPrecompute batch, cache, feature store
Text TF-IDF không đủ tốtThử char n-gram, tokenizer tốt hơn, embedding model
Offline tốt production kémKiểm tra leakage, train-serving skew, drift
Model khó giải thíchGiảm feature phức tạp, thêm feature lineage, dùng model explainability

8. Context7 Docs Đã Dùng

  • scikit-learn stable docs qua Context7: Pipeline, ColumnTransformer, SimpleImputer, OneHotEncoder(handle_unknown="ignore"), TfidfVectorizer, SelectPercentile.
  • pandas docs qua Context7: pd.to_datetime, DataFrame.isna/info/memory_usage, missing data handling, merge_asof cho time-aware join.

Bài tập

Mục tiêu bài tập: build một pipeline churn prediction có numerical, categorical, text và datetime features; kiểm soát leakage; validate schema; đánh giá baseline. Đây không phải code cuối cùng cho production, nhưng là skeleton đủ gần production để bạn mở rộng.

1. Setup

pip install pandas numpy scikit-learn joblib

2. Yêu Cầu

Bạn sẽ tạo pipeline với:

  • Feature datetime dựa trên prediction_time.
  • Numerical preprocessing: impute median, missing indicator, scale.
  • Categorical preprocessing: impute constant, one-hot, handle unknown category.
  • Text preprocessing: TF-IDF.
  • Model: Logistic Regression baseline.
  • Validation: check required columns, non-negative charge, prediction time hợp lệ.
  • Leakage checks: không dùng event sau prediction_time.

3. Full Example

from __future__ import annotations

import joblib
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, RobustScaler


RAW_REQUIRED_COLUMNS = [
    "customer_id",
    "prediction_time",
    "signup_at",
    "last_login_at",
    "contract_type",
    "payment_method",
    "monthly_charges",
    "support_ticket_count_30d",
    "latest_ticket_text",
]


def build_sample_data(n: int = 2_000, seed: int = 42) -> pd.DataFrame:
    rng = np.random.default_rng(seed)
    prediction_time = pd.Timestamp("2026-05-01", tz="UTC")

    signup_days_ago = rng.integers(30, 1_500, size=n)
    last_login_days_ago = rng.integers(0, 180, size=n).astype(float)
    never_login_mask = rng.random(n) < 0.08
    last_login_days_ago[never_login_mask] = np.nan

    contract_type = rng.choice(
        ["monthly", "one_year", "two_year"],
        size=n,
        p=[0.58, 0.30, 0.12],
    )
    payment_method = rng.choice(
        ["credit_card", "bank_transfer", "electronic_check", "wallet"],
        size=n,
        p=[0.38, 0.28, 0.28, 0.06],
    )
    monthly_charges = rng.lognormal(mean=4.1, sigma=0.45, size=n).clip(5, 300)
    support_ticket_count_30d = rng.poisson(1.4, size=n)

    latest_ticket_text = np.where(
        support_ticket_count_30d >= 3,
        "service slow billing issue want cancel",
        "general question invoice support ok",
    )

    df = pd.DataFrame({
        "customer_id": [f"cus_{i:05d}" for i in range(n)],
        "prediction_time": prediction_time,
        "signup_at": prediction_time - pd.to_timedelta(signup_days_ago, unit="D"),
        "last_login_at": prediction_time - pd.to_timedelta(last_login_days_ago, unit="D"),
        "contract_type": contract_type,
        "payment_method": payment_method,
        "monthly_charges": monthly_charges,
        "support_ticket_count_30d": support_ticket_count_30d,
        "latest_ticket_text": latest_ticket_text,
    })

    df.loc[rng.choice(df.index, size=int(n * 0.04), replace=False), "monthly_charges"] = np.nan
    df.loc[rng.choice(df.index, size=int(n * 0.03), replace=False), "payment_method"] = np.nan
    df.loc[rng.choice(df.index, size=int(n * 0.06), replace=False), "latest_ticket_text"] = ""

    churn_logit = (
        0.95 * (df["contract_type"] == "monthly").astype(float)
        + 0.75 * (df["payment_method"] == "electronic_check").astype(float)
        + 0.18 * df["support_ticket_count_30d"]
        + 0.010 * np.nan_to_num(last_login_days_ago, nan=120.0)
        + 0.004 * np.nan_to_num(monthly_charges, nan=np.nanmedian(monthly_charges))
        - 2.15
        + rng.normal(0, 0.65, size=n)
    )
    churn_probability = 1 / (1 + np.exp(-churn_logit))
    df["churn"] = rng.binomial(1, churn_probability)
    return df


def validate_raw_schema(df: pd.DataFrame) -> None:
    missing = set(RAW_REQUIRED_COLUMNS) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {sorted(missing)}")

    if df["customer_id"].isna().any():
        raise ValueError("customer_id must not be null")

    prediction_time = pd.to_datetime(df["prediction_time"], utc=True, errors="coerce")
    signup_at = pd.to_datetime(df["signup_at"], utc=True, errors="coerce")
    last_login_at = pd.to_datetime(df["last_login_at"], utc=True, errors="coerce")

    if prediction_time.isna().any() or signup_at.isna().any():
        raise ValueError("prediction_time and signup_at must be valid datetimes")

    if (prediction_time < signup_at).any():
        raise ValueError("prediction_time must be greater than or equal to signup_at")

    known_last_login = last_login_at.notna()
    if (last_login_at[known_last_login] > prediction_time[known_last_login]).any():
        raise ValueError("last_login_at must not be after prediction_time")

    if (df["monthly_charges"].dropna() < 0).any():
        raise ValueError("monthly_charges must be non-negative")


def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
    result = df.copy()
    result["prediction_time"] = pd.to_datetime(result["prediction_time"], utc=True, errors="coerce")
    result["signup_at"] = pd.to_datetime(result["signup_at"], utc=True, errors="coerce")
    result["last_login_at"] = pd.to_datetime(result["last_login_at"], utc=True, errors="coerce")

    result["account_age_days"] = (
        result["prediction_time"] - result["signup_at"]
    ).dt.total_seconds() / 86_400
    result["days_since_last_login"] = (
        result["prediction_time"] - result["last_login_at"]
    ).dt.total_seconds() / 86_400
    result["signup_month"] = result["signup_at"].dt.month.astype("Int64").astype("string")
    result["signup_day_of_week"] = result["signup_at"].dt.dayofweek.astype("Int64").astype("string")
    result["latest_ticket_text"] = result["latest_ticket_text"].fillna("")
    return result


def build_model_pipeline() -> Pipeline:
    numeric_features = [
        "monthly_charges",
        "support_ticket_count_30d",
        "account_age_days",
        "days_since_last_login",
    ]
    categorical_features = [
        "contract_type",
        "payment_method",
        "signup_month",
        "signup_day_of_week",
    ]

    numeric_pipeline = Pipeline([
        ("imputer", SimpleImputer(strategy="median", add_indicator=True)),
        ("scaler", RobustScaler()),
    ])

    categorical_pipeline = Pipeline([
        ("imputer", SimpleImputer(strategy="constant", fill_value="__missing__")),
        ("onehot", OneHotEncoder(handle_unknown="ignore", min_frequency=5)),
    ])

    preprocessor = ColumnTransformer([
        ("num", numeric_pipeline, numeric_features),
        ("cat", categorical_pipeline, categorical_features),
        ("text", TfidfVectorizer(max_features=3_000, ngram_range=(1, 2), min_df=3), "latest_ticket_text"),
    ])

    return Pipeline([
        ("preprocess", preprocessor),
        ("classifier", LogisticRegression(max_iter=1_000, class_weight="balanced")),
    ])


def train_and_evaluate() -> Pipeline:
    df = build_sample_data()
    validate_raw_schema(df)
    df = add_time_features(df)

    X = df.drop(columns=["churn"])
    y = df["churn"]

    X_train, X_test, y_train, y_test = train_test_split(
        X,
        y,
        test_size=0.2,
        random_state=42,
        stratify=y,
    )

    model = build_model_pipeline()
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]
    print(classification_report(y_test, y_pred))
    print("ROC-AUC:", round(roc_auc_score(y_test, y_prob), 4))

    transformed_shape = model.named_steps["preprocess"].transform(X_test).shape
    print("Transformed shape:", transformed_shape)

    joblib.dump(model, "day05_churn_pipeline.joblib")
    return model


if __name__ == "__main__":
    train_and_evaluate()

4. Bài Tập Bắt Buộc

  1. Chạy baseline và ghi lại ROC-AUC.
  2. Thay RobustScaler bằng StandardScaler, so sánh metric và giải thích vì sao khác hoặc không khác.
  3. Bỏ text feature khỏi ColumnTransformer, đo lại ROC-AUC và transformed shape.
  4. Thêm feature is_inactive_30d = days_since_last_login > 30. Feature này nên là numerical hay categorical? Vì sao?
  5. Tạo 5 row inference giả lập có category mới payment_method = "crypto". Pipeline có crash không? Bạn sẽ monitor gì?
  6. Viết test nhỏ để đảm bảo last_login_at > prediction_time bị reject.
  7. Thay train_test_split bằng time-based split nếu dataset có nhiều prediction_time. Giải thích khi nào stratified random split là không đủ.

5. Câu Hỏi Review

  • Feature nào có nguy cơ leakage nhất trong pipeline này?
  • Nếu production latency target là 50 ms/request, phần nào cần precompute?
  • Nếu unknown category rate tăng từ 1% lên 18%, bạn nghi ngờ điều gì?
  • Nếu text có PII, bạn xử lý ở đâu: trước TF-IDF, trong pipeline, hay sau prediction?
  • Pipeline này dùng được trong production không? Nếu có, thiếu điều kiện nào so với hệ thống thật?

6. Gợi Ý Kiểm Thử Production

Các test tối thiểu:

  • Schema test: thiếu cột bắt buộc thì fail.
  • Type test: datetime parse lỗi thì fail.
  • Point-in-time test: event/login sau prediction time thì fail.
  • Pipeline persistence test: joblib.dump rồi joblib.load, prediction không đổi.
  • Unknown category test: category mới không crash.
  • Empty text test: text rỗng không crash.
  • Transformed shape test: shape ổn định với cùng model artifact.

7. Deliverable

Tạo một file ghi chú ngắn gồm:

  • Baseline ROC-AUC.
  • 3 thay đổi bạn thử và kết quả.
  • 3 leakage risks bạn đã kiểm tra.
  • 5 validation rules cho inference input.
  • Kết luận production readiness.