Chapter 10: End-to-End Data Pipeline

Capstone Project: From Raw Data to Report

Project overview: This chapter ties together everything from Chapters 1 through 9 into a single, realistic analytics workflow. We will load a raw CSV dataset, clean and explore it, engineer features, build a predictive model, evaluate results, and generate a summary. Each step maps directly to a previous chapter.

10.1 Project Structure Best Practices

Before writing any analysis code, set up a clean project directory. A well-organized project is easier to understand, debug, and share with collaborators. The structure below separates data, code, and output so that each component has a clear role.

# Recommended project structure
# my_project/
# ├── data/
# │   ├── raw/          # Original, untouched data files
# │   └── processed/    # Cleaned data ready for analysis
# ├── src/              # Python source code
# │   ├── __init__.py
# │   ├── load.py       # Data loading functions
# │   ├── clean.py      # Data cleaning functions
# │   ├── features.py   # Feature engineering
# │   └── model.py      # Model training and evaluation
# ├── output/           # Generated figures, reports, model artifacts
# ├── notebooks/        # Jupyter notebooks for exploration
# ├── tests/            # Unit tests
# ├── requirements.txt  # Python dependencies
# └── main.py           # Entry point that runs the pipeline

# Create this structure programmatically
import os

dirs = ["data/raw", "data/processed", "src", "output", "notebooks", "tests"]
for d in dirs:
    os.makedirs(d, exist_ok=True)
    print(f"  Created: {d}/")
Why separate raw and processed data? The data/raw/ folder should contain the original, unmodified data files. Never overwrite them. Your cleaning code reads from raw/ and writes to processed/. This way, you can always reproduce the entire pipeline from the original data if something goes wrong. It also makes it easy to answer the question "what exactly changed during cleaning?" by comparing the two folders.

10.2 Step 1: Load the Data (Ch 1, 5)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import statsmodels.formula.api as smf
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.pipeline import Pipeline

sns.set_theme(style="whitegrid")

# --- Simulate a realistic supply chain dataset ---
np.random.seed(42)
n = 500

df = pd.DataFrame({
    "order_id":       range(1, n + 1),
    "warehouse":      np.random.choice(["Newark", "Chicago", "Dallas"], n),
    "product_cat":    np.random.choice(["Electronics", "Apparel", "Grocery"], n),
    "units":          np.random.randint(1, 50, n),
    "unit_price":     np.random.uniform(5, 120, n).round(2),
    "lead_time_days": np.random.normal(5, 1.5, n).round(1),
    "distance_miles": np.random.uniform(50, 2000, n).round(0),
})

# Introduce some missing values (realistic)
mask = np.random.random(n) < 0.05
df.loc[mask, "lead_time_days"] = np.nan

print(df.shape)
df.head()

10.3 Step 2: Data Cleaning (Ch 2, 5)

# Check for missing values
print("Missing values:")
print(df.isnull().sum())

# Fill missing lead times with median by warehouse
df["lead_time_days"] = df.groupby("warehouse")["lead_time_days"].transform(
    lambda x: x.fillna(x.median())
)

# Remove negative lead times (data errors)
df = df[df["lead_time_days"] > 0].copy()

# Verify
print(f"\nClean dataset: {df.shape[0]} rows, {df.isnull().sum().sum()} missing values")

10.4 Step 3: Exploratory Data Analysis (Ch 5, 6)

# Compute revenue
df["revenue"] = df["units"] * df["unit_price"]

# Summary by warehouse
summary = df.groupby("warehouse").agg(
    orders=("order_id", "count"),
    total_revenue=("revenue", "sum"),
    avg_lead_time=("lead_time_days", "mean"),
    avg_distance=("distance_miles", "mean")
).round(2)
print(summary)

# Visualization: 2x2 dashboard
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

# Revenue by warehouse
df.groupby("warehouse")["revenue"].sum().plot(
    kind="bar", ax=axes[0, 0], color="#3776AB")
axes[0, 0].set_title("Revenue by Warehouse")
axes[0, 0].set_ylabel("Revenue ($)")

# Lead time distribution
axes[0, 1].hist(df["lead_time_days"], bins=20, color="#4b8bbe", edgecolor="white")
axes[0, 1].set_title("Lead Time Distribution")
axes[0, 1].set_xlabel("Days")

# Distance vs. lead time
axes[1, 0].scatter(df["distance_miles"], df["lead_time_days"], alpha=0.4, s=15)
axes[1, 0].set_title("Distance vs. Lead Time")
axes[1, 0].set_xlabel("Distance (miles)")
axes[1, 0].set_ylabel("Lead Time (days)")

# Revenue by product category
df.groupby("product_cat")["revenue"].sum().plot(
    kind="barh", ax=axes[1, 1], color="#6ba3d6")
axes[1, 1].set_title("Revenue by Product Category")

plt.tight_layout()
plt.show()

10.5 Step 4: Feature Engineering (Ch 3, 4)

# Create new features
df["log_distance"]   = np.log1p(df["distance_miles"])
df["revenue_per_unit"] = df["revenue"] / df["units"]
df["is_express"]      = (df["lead_time_days"] < 3).astype(int)

# One-hot encode categoricals
df_model = pd.get_dummies(df, columns=["warehouse", "product_cat"], drop_first=True)

print(f"Feature matrix: {df_model.shape}")
print(df_model.columns.tolist())

10.6 Step 5: Statistical Testing (Ch 7)

# Is there a significant difference in lead time across warehouses?
groups = [g["lead_time_days"].values for _, g in df.groupby("warehouse")]
f_stat, p_val = stats.f_oneway(*groups)
print(f"ANOVA: F={f_stat:.2f}, p={p_val:.4f}")

# OLS regression: what drives lead time?
ols_model = smf.ols(
    "lead_time_days ~ distance_miles + units + unit_price + C(warehouse)",
    data=df
).fit()
print(ols_model.summary().tables[1])

10.7 Step 6: Predictive Modeling (Ch 8)

# Target: lead_time_days
feature_cols = ["units", "unit_price", "distance_miles", "log_distance",
                "warehouse_Chicago", "warehouse_Newark",
                "product_cat_Electronics", "product_cat_Grocery"]

X = df_model[feature_cols]
y = df_model["lead_time_days"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Model 1: Linear Regression Pipeline
lr_pipe = Pipeline([
    ("scaler", StandardScaler()),
    ("model",  LinearRegression())
])
lr_pipe.fit(X_train, y_train)
lr_pred = lr_pipe.predict(X_test)

# Model 2: Random Forest
rf_pipe = Pipeline([
    ("scaler", StandardScaler()),
    ("model",  RandomForestRegressor(n_estimators=100, max_depth=8, random_state=42))
])
rf_pipe.fit(X_train, y_train)
rf_pred = rf_pipe.predict(X_test)

# Compare
print("Model Comparison:")
print(f"  Linear Regression  RMSE: {mean_squared_error(y_test, lr_pred, squared=False):.3f}, R2: {r2_score(y_test, lr_pred):.3f}")
print(f"  Random Forest      RMSE: {mean_squared_error(y_test, rf_pred, squared=False):.3f}, R2: {r2_score(y_test, rf_pred):.3f}")

10.8 Step 7: Evaluate and Visualize Results (Ch 6, 8)

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Actual vs. predicted (best model)
axes[0].scatter(y_test, rf_pred, alpha=0.5, s=20, color="#3776AB")
axes[0].plot([y_test.min(), y_test.max()],
           [y_test.min(), y_test.max()], "r--", linewidth=2)
axes[0].set_xlabel("Actual Lead Time")
axes[0].set_ylabel("Predicted Lead Time")
axes[0].set_title("Random Forest: Actual vs. Predicted")

# Feature importance
rf_model = rf_pipe.named_steps["model"]
importance = pd.Series(rf_model.feature_importances_, index=feature_cols)
importance.sort_values().plot(kind="barh", ax=axes[1], color="#4b8bbe")
axes[1].set_title("Feature Importance")
axes[1].set_xlabel("Importance")

plt.tight_layout()
plt.savefig("pipeline_results.png", dpi=300, bbox_inches="tight")
plt.show()

10.9 Step 8: Generate a Summary Report (Ch 3)

def generate_report(df, model_name, rmse, r2):
    """Print a formatted summary report."""
    print("=" * 50)
    print("  SUPPLY CHAIN ANALYTICS REPORT")
    print("=" * 50)
    print(f"  Dataset:       {df.shape[0]} orders, {df.shape[1]} features")
    print(f"  Total Revenue:  ${df['revenue'].sum():,.0f}")
    print(f"  Avg Lead Time:  {df['lead_time_days'].mean():.1f} days")
    print(f"  Best Model:     {model_name}")
    print(f"  RMSE:           {rmse:.3f}")
    print(f"  R-squared:      {r2:.3f}")
    print("=" * 50)

rf_rmse = mean_squared_error(y_test, rf_pred, squared=False)
rf_r2   = r2_score(y_test, rf_pred)

generate_report(df, "Random Forest (depth=8, trees=100)", rf_rmse, rf_r2)
Reproducibility: Package all steps into functions (load_data, clean_data, build_features, train_model, evaluate, report). This lets you re-run the entire pipeline with a single command when the data updates.

10.10 Logging Instead of Print

In quick scripts, print() works fine. But in a real pipeline that runs unattended (e.g., scheduled overnight), you need logging because it can write to files, include timestamps, and distinguish between severity levels (DEBUG, INFO, WARNING, ERROR). When something fails at 3 AM, the log file tells you exactly what happened and when.

import logging

# Configure logging once at the top of your script
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("pipeline.log"),   # write to file
        logging.StreamHandler()                 # also print to console
    ]
)
logger = logging.getLogger(__name__)

# Use it instead of print()
logger.info("Starting data pipeline...")
logger.info(f"Loaded {df.shape[0]} rows, {df.shape[1]} columns")
logger.warning(f"Found {df.isnull().sum().sum()} missing values")

# In error handling
try:
    model = rf_pipe.fit(X_train, y_train)
    logger.info("Model training complete")
except Exception as e:
    logger.error(f"Model training failed: {e}")
    raise

10.11 Command-Line Arguments with argparse

Hard-coding file paths and parameters makes your script inflexible. argparse lets users pass arguments from the command line, making your pipeline reusable across different datasets and configurations without editing the code.

import argparse

def parse_args():
    parser = argparse.ArgumentParser(
        description="Supply Chain Analytics Pipeline"
    )
    parser.add_argument(
        "input_file",
        help="Path to the input CSV file"
    )
    parser.add_argument(
        "--output-dir", default="output",
        help="Directory for output files (default: output/)"
    )
    parser.add_argument(
        "--n-trees", type=int, default=100,
        help="Number of trees in Random Forest (default: 100)"
    )
    parser.add_argument(
        "--test-size", type=float, default=0.2,
        help="Fraction of data for test set (default: 0.2)"
    )
    return parser.parse_args()

# Usage: python main.py data/orders.csv --n-trees 200 --test-size 0.3
# In your main function:
# args = parse_args()
# df = pd.read_csv(args.input_file)
# rf = RandomForestRegressor(n_estimators=args.n_trees)
Combining argparse with logging: Add a --verbose flag that switches the logging level from INFO to DEBUG. This lets users see detailed diagnostic output when troubleshooting without cluttering normal runs. Example: parser.add_argument("--verbose", action="store_true") followed by logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO).

10.12 Creating requirements.txt

A requirements.txt file lists all Python packages your project depends on, with version numbers. This allows anyone (or any server) to recreate your exact environment. Without it, your collaborator might install a different version of pandas that breaks your code.

# Generate requirements.txt from your current environment
# Run in terminal: pip freeze > requirements.txt

# Better: create it manually with only the packages you actually use
# requirements.txt contents:
# pandas==2.1.4
# numpy==1.26.2
# matplotlib==3.8.2
# seaborn==0.13.0
# scikit-learn==1.3.2
# scipy==1.11.4
# statsmodels==0.14.1

# Install from requirements.txt:
# pip install -r requirements.txt

# For reproducibility, also consider virtual environments:
# python -m venv .venv
# source .venv/bin/activate   # macOS/Linux
# .venv\Scripts\activate      # Windows
# pip install -r requirements.txt
pip freeze vs. manual requirements: pip freeze captures every package in your environment, including transitive dependencies you did not directly install. This can include dozens of packages you do not actually need. A cleaner approach is to manually list only the packages your code imports, then pin their versions. Use pip freeze as a starting reference, then trim it down.

10.13 Unit Testing with pytest

Tests verify that your functions produce correct output. When you refactor your cleaning code three months from now, tests catch bugs before they corrupt your analysis. Even a few simple tests can save hours of debugging.

# File: tests/test_pipeline.py
import pytest
import pandas as pd
import numpy as np

# Assume these functions live in src/clean.py
def fill_missing_lead_times(df):
    """Fill missing lead times with group median."""
    df["lead_time_days"] = df.groupby("warehouse")["lead_time_days"].transform(
        lambda x: x.fillna(x.median())
    )
    return df

def remove_negative_lead_times(df):
    """Remove rows with negative lead times."""
    return df[df["lead_time_days"] > 0].copy()

# --- Tests ---
def test_fill_missing_no_nulls_remain():
    """After filling, no NaN values should remain."""
    df = pd.DataFrame({
        "warehouse": ["A", "A", "B", "B"],
        "lead_time_days": [3.0, np.nan, 5.0, np.nan]
    })
    result = fill_missing_lead_times(df)
    assert result["lead_time_days"].isnull().sum() == 0

def test_remove_negative_values():
    """No negative lead times should remain."""
    df = pd.DataFrame({
        "lead_time_days": [3.0, -1.0, 5.0, 0.0, 2.0]
    })
    result = remove_negative_lead_times(df)
    assert (result["lead_time_days"] > 0).all()
    assert len(result) == 3  # -1.0 and 0.0 removed

def test_revenue_calculation():
    """Revenue should equal units * unit_price."""
    df = pd.DataFrame({
        "units": [10, 20, 5],
        "unit_price": [5.0, 3.0, 10.0]
    })
    df["revenue"] = df["units"] * df["unit_price"]
    expected = [50.0, 60.0, 50.0]
    assert df["revenue"].tolist() == expected

# Run tests: pytest tests/ -v
What to test in a data pipeline: Focus on the functions that transform data. Test that (1) cleaning functions remove the right rows and fill nulls correctly, (2) feature engineering produces the expected values, (3) edge cases are handled (empty DataFrames, all-null columns, single-row inputs). You do not need to test that pandas.read_csv works; pandas already tests that.

10.14 Converting Jupyter Notebooks to Python Scripts

Jupyter notebooks are excellent for exploration, but they are difficult to version-control, test, and run in production. Converting your finalized analysis to a .py script makes it reproducible and automatable.

# Convert a notebook to a Python script (run in terminal)
# jupyter nbconvert --to script analysis.ipynb
# This creates analysis.py with all code cells

# Better: refactor as you go
# 1. Move reusable functions from notebook cells into src/*.py files
# 2. Keep the notebook for visualization and narrative only
# 3. Import your functions: from src.clean import fill_missing_lead_times

# Typical workflow:
# - Explore in Jupyter: notebooks/01_exploration.ipynb
# - Extract functions: src/clean.py, src/features.py, src/model.py
# - Create entry point: main.py that imports and calls everything
# - Run the pipeline: python main.py data/raw/orders.csv

# Example main.py structure
def main():
    # args = parse_args()
    logger.info("=== Pipeline Start ===")

    # Step 1: Load
    # df = load_data(args.input_file)

    # Step 2: Clean
    # df = clean_data(df)

    # Step 3: Features
    # df = build_features(df)

    # Step 4: Model
    # results = train_and_evaluate(df, args.n_trees)

    # Step 5: Report
    # generate_report(results, args.output_dir)

    logger.info("=== Pipeline Complete ===")

if __name__ == "__main__":
    main()

Exercise 10.1

Extend this pipeline by adding a third model (GradientBoostingRegressor from scikit-learn). Compare all three models using 5-fold cross-validation and select the best one. Add the cross-validation results to the summary report.

Exercise 10.2

Refactor the entire pipeline into a single Python script with separate functions for each step. The script should accept a CSV filename as a command-line argument using argparse (not sys.argv) and produce both a PNG chart and a printed summary. Add --output-dir and --verbose flags.

Exercise 10.3

Write three pytest test functions for the data pipeline: one that tests the missing value filling logic, one that tests the feature engineering step (revenue = units * unit_price), and one that tests that the model's R-squared is above 0.0 on the test set (a basic sanity check). Run them with pytest tests/ -v and verify all pass.

Exercise 10.4

Set up a complete project structure for a new dataset of your choice: create the data/raw/, src/, output/, and tests/ directories. Write a requirements.txt file listing only the packages you import. Replace all print() calls with logging statements at appropriate severity levels (INFO for progress, WARNING for data quality issues, ERROR for failures). Run the pipeline and verify that the log file contains all expected messages.

Official Resources

Chapter 10 Takeaways

← Chapter 9: APIs & Web Data Guide Home →