Capstone Project: From Raw Data to Report
Before writing any analysis code, set up a clean project directory. A well-organized project is easier to understand, debug, and share with collaborators. The structure below separates data, code, and output so that each component has a clear role.
# Recommended project structure # my_project/ # ├── data/ # │ ├── raw/ # Original, untouched data files # │ └── processed/ # Cleaned data ready for analysis # ├── src/ # Python source code # │ ├── __init__.py # │ ├── load.py # Data loading functions # │ ├── clean.py # Data cleaning functions # │ ├── features.py # Feature engineering # │ └── model.py # Model training and evaluation # ├── output/ # Generated figures, reports, model artifacts # ├── notebooks/ # Jupyter notebooks for exploration # ├── tests/ # Unit tests # ├── requirements.txt # Python dependencies # └── main.py # Entry point that runs the pipeline # Create this structure programmatically import os dirs = ["data/raw", "data/processed", "src", "output", "notebooks", "tests"] for d in dirs: os.makedirs(d, exist_ok=True) print(f" Created: {d}/")
data/raw/ folder should contain the original, unmodified data files. Never overwrite them. Your cleaning code reads from raw/ and writes to processed/. This way, you can always reproduce the entire pipeline from the original data if something goes wrong. It also makes it easy to answer the question "what exactly changed during cleaning?" by comparing the two folders.
import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from scipy import stats import statsmodels.formula.api as smf from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LinearRegression from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_squared_error, r2_score from sklearn.pipeline import Pipeline sns.set_theme(style="whitegrid") # --- Simulate a realistic supply chain dataset --- np.random.seed(42) n = 500 df = pd.DataFrame({ "order_id": range(1, n + 1), "warehouse": np.random.choice(["Newark", "Chicago", "Dallas"], n), "product_cat": np.random.choice(["Electronics", "Apparel", "Grocery"], n), "units": np.random.randint(1, 50, n), "unit_price": np.random.uniform(5, 120, n).round(2), "lead_time_days": np.random.normal(5, 1.5, n).round(1), "distance_miles": np.random.uniform(50, 2000, n).round(0), }) # Introduce some missing values (realistic) mask = np.random.random(n) < 0.05 df.loc[mask, "lead_time_days"] = np.nan print(df.shape) df.head()
# Check for missing values print("Missing values:") print(df.isnull().sum()) # Fill missing lead times with median by warehouse df["lead_time_days"] = df.groupby("warehouse")["lead_time_days"].transform( lambda x: x.fillna(x.median()) ) # Remove negative lead times (data errors) df = df[df["lead_time_days"] > 0].copy() # Verify print(f"\nClean dataset: {df.shape[0]} rows, {df.isnull().sum().sum()} missing values")
# Compute revenue df["revenue"] = df["units"] * df["unit_price"] # Summary by warehouse summary = df.groupby("warehouse").agg( orders=("order_id", "count"), total_revenue=("revenue", "sum"), avg_lead_time=("lead_time_days", "mean"), avg_distance=("distance_miles", "mean") ).round(2) print(summary) # Visualization: 2x2 dashboard fig, axes = plt.subplots(2, 2, figsize=(12, 8)) # Revenue by warehouse df.groupby("warehouse")["revenue"].sum().plot( kind="bar", ax=axes[0, 0], color="#3776AB") axes[0, 0].set_title("Revenue by Warehouse") axes[0, 0].set_ylabel("Revenue ($)") # Lead time distribution axes[0, 1].hist(df["lead_time_days"], bins=20, color="#4b8bbe", edgecolor="white") axes[0, 1].set_title("Lead Time Distribution") axes[0, 1].set_xlabel("Days") # Distance vs. lead time axes[1, 0].scatter(df["distance_miles"], df["lead_time_days"], alpha=0.4, s=15) axes[1, 0].set_title("Distance vs. Lead Time") axes[1, 0].set_xlabel("Distance (miles)") axes[1, 0].set_ylabel("Lead Time (days)") # Revenue by product category df.groupby("product_cat")["revenue"].sum().plot( kind="barh", ax=axes[1, 1], color="#6ba3d6") axes[1, 1].set_title("Revenue by Product Category") plt.tight_layout() plt.show()
# Create new features df["log_distance"] = np.log1p(df["distance_miles"]) df["revenue_per_unit"] = df["revenue"] / df["units"] df["is_express"] = (df["lead_time_days"] < 3).astype(int) # One-hot encode categoricals df_model = pd.get_dummies(df, columns=["warehouse", "product_cat"], drop_first=True) print(f"Feature matrix: {df_model.shape}") print(df_model.columns.tolist())
# Is there a significant difference in lead time across warehouses? groups = [g["lead_time_days"].values for _, g in df.groupby("warehouse")] f_stat, p_val = stats.f_oneway(*groups) print(f"ANOVA: F={f_stat:.2f}, p={p_val:.4f}") # OLS regression: what drives lead time? ols_model = smf.ols( "lead_time_days ~ distance_miles + units + unit_price + C(warehouse)", data=df ).fit() print(ols_model.summary().tables[1])
# Target: lead_time_days feature_cols = ["units", "unit_price", "distance_miles", "log_distance", "warehouse_Chicago", "warehouse_Newark", "product_cat_Electronics", "product_cat_Grocery"] X = df_model[feature_cols] y = df_model["lead_time_days"] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # Model 1: Linear Regression Pipeline lr_pipe = Pipeline([ ("scaler", StandardScaler()), ("model", LinearRegression()) ]) lr_pipe.fit(X_train, y_train) lr_pred = lr_pipe.predict(X_test) # Model 2: Random Forest rf_pipe = Pipeline([ ("scaler", StandardScaler()), ("model", RandomForestRegressor(n_estimators=100, max_depth=8, random_state=42)) ]) rf_pipe.fit(X_train, y_train) rf_pred = rf_pipe.predict(X_test) # Compare print("Model Comparison:") print(f" Linear Regression RMSE: {mean_squared_error(y_test, lr_pred, squared=False):.3f}, R2: {r2_score(y_test, lr_pred):.3f}") print(f" Random Forest RMSE: {mean_squared_error(y_test, rf_pred, squared=False):.3f}, R2: {r2_score(y_test, rf_pred):.3f}")
fig, axes = plt.subplots(1, 2, figsize=(12, 5)) # Actual vs. predicted (best model) axes[0].scatter(y_test, rf_pred, alpha=0.5, s=20, color="#3776AB") axes[0].plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], "r--", linewidth=2) axes[0].set_xlabel("Actual Lead Time") axes[0].set_ylabel("Predicted Lead Time") axes[0].set_title("Random Forest: Actual vs. Predicted") # Feature importance rf_model = rf_pipe.named_steps["model"] importance = pd.Series(rf_model.feature_importances_, index=feature_cols) importance.sort_values().plot(kind="barh", ax=axes[1], color="#4b8bbe") axes[1].set_title("Feature Importance") axes[1].set_xlabel("Importance") plt.tight_layout() plt.savefig("pipeline_results.png", dpi=300, bbox_inches="tight") plt.show()
def generate_report(df, model_name, rmse, r2): """Print a formatted summary report.""" print("=" * 50) print(" SUPPLY CHAIN ANALYTICS REPORT") print("=" * 50) print(f" Dataset: {df.shape[0]} orders, {df.shape[1]} features") print(f" Total Revenue: ${df['revenue'].sum():,.0f}") print(f" Avg Lead Time: {df['lead_time_days'].mean():.1f} days") print(f" Best Model: {model_name}") print(f" RMSE: {rmse:.3f}") print(f" R-squared: {r2:.3f}") print("=" * 50) rf_rmse = mean_squared_error(y_test, rf_pred, squared=False) rf_r2 = r2_score(y_test, rf_pred) generate_report(df, "Random Forest (depth=8, trees=100)", rf_rmse, rf_r2)
In quick scripts, print() works fine. But in a real pipeline that runs unattended (e.g., scheduled overnight), you need logging because it can write to files, include timestamps, and distinguish between severity levels (DEBUG, INFO, WARNING, ERROR). When something fails at 3 AM, the log file tells you exactly what happened and when.
import logging # Configure logging once at the top of your script logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("pipeline.log"), # write to file logging.StreamHandler() # also print to console ] ) logger = logging.getLogger(__name__) # Use it instead of print() logger.info("Starting data pipeline...") logger.info(f"Loaded {df.shape[0]} rows, {df.shape[1]} columns") logger.warning(f"Found {df.isnull().sum().sum()} missing values") # In error handling try: model = rf_pipe.fit(X_train, y_train) logger.info("Model training complete") except Exception as e: logger.error(f"Model training failed: {e}") raise
Hard-coding file paths and parameters makes your script inflexible. argparse lets users pass arguments from the command line, making your pipeline reusable across different datasets and configurations without editing the code.
import argparse def parse_args(): parser = argparse.ArgumentParser( description="Supply Chain Analytics Pipeline" ) parser.add_argument( "input_file", help="Path to the input CSV file" ) parser.add_argument( "--output-dir", default="output", help="Directory for output files (default: output/)" ) parser.add_argument( "--n-trees", type=int, default=100, help="Number of trees in Random Forest (default: 100)" ) parser.add_argument( "--test-size", type=float, default=0.2, help="Fraction of data for test set (default: 0.2)" ) return parser.parse_args() # Usage: python main.py data/orders.csv --n-trees 200 --test-size 0.3 # In your main function: # args = parse_args() # df = pd.read_csv(args.input_file) # rf = RandomForestRegressor(n_estimators=args.n_trees)
--verbose flag that switches the logging level from INFO to DEBUG. This lets users see detailed diagnostic output when troubleshooting without cluttering normal runs. Example: parser.add_argument("--verbose", action="store_true") followed by logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO).
A requirements.txt file lists all Python packages your project depends on, with version numbers. This allows anyone (or any server) to recreate your exact environment. Without it, your collaborator might install a different version of pandas that breaks your code.
# Generate requirements.txt from your current environment # Run in terminal: pip freeze > requirements.txt # Better: create it manually with only the packages you actually use # requirements.txt contents: # pandas==2.1.4 # numpy==1.26.2 # matplotlib==3.8.2 # seaborn==0.13.0 # scikit-learn==1.3.2 # scipy==1.11.4 # statsmodels==0.14.1 # Install from requirements.txt: # pip install -r requirements.txt # For reproducibility, also consider virtual environments: # python -m venv .venv # source .venv/bin/activate # macOS/Linux # .venv\Scripts\activate # Windows # pip install -r requirements.txt
pip freeze captures every package in your environment, including transitive dependencies you did not directly install. This can include dozens of packages you do not actually need. A cleaner approach is to manually list only the packages your code imports, then pin their versions. Use pip freeze as a starting reference, then trim it down.
Tests verify that your functions produce correct output. When you refactor your cleaning code three months from now, tests catch bugs before they corrupt your analysis. Even a few simple tests can save hours of debugging.
# File: tests/test_pipeline.py import pytest import pandas as pd import numpy as np # Assume these functions live in src/clean.py def fill_missing_lead_times(df): """Fill missing lead times with group median.""" df["lead_time_days"] = df.groupby("warehouse")["lead_time_days"].transform( lambda x: x.fillna(x.median()) ) return df def remove_negative_lead_times(df): """Remove rows with negative lead times.""" return df[df["lead_time_days"] > 0].copy() # --- Tests --- def test_fill_missing_no_nulls_remain(): """After filling, no NaN values should remain.""" df = pd.DataFrame({ "warehouse": ["A", "A", "B", "B"], "lead_time_days": [3.0, np.nan, 5.0, np.nan] }) result = fill_missing_lead_times(df) assert result["lead_time_days"].isnull().sum() == 0 def test_remove_negative_values(): """No negative lead times should remain.""" df = pd.DataFrame({ "lead_time_days": [3.0, -1.0, 5.0, 0.0, 2.0] }) result = remove_negative_lead_times(df) assert (result["lead_time_days"] > 0).all() assert len(result) == 3 # -1.0 and 0.0 removed def test_revenue_calculation(): """Revenue should equal units * unit_price.""" df = pd.DataFrame({ "units": [10, 20, 5], "unit_price": [5.0, 3.0, 10.0] }) df["revenue"] = df["units"] * df["unit_price"] expected = [50.0, 60.0, 50.0] assert df["revenue"].tolist() == expected # Run tests: pytest tests/ -v
pandas.read_csv works; pandas already tests that.
Jupyter notebooks are excellent for exploration, but they are difficult to version-control, test, and run in production. Converting your finalized analysis to a .py script makes it reproducible and automatable.
# Convert a notebook to a Python script (run in terminal) # jupyter nbconvert --to script analysis.ipynb # This creates analysis.py with all code cells # Better: refactor as you go # 1. Move reusable functions from notebook cells into src/*.py files # 2. Keep the notebook for visualization and narrative only # 3. Import your functions: from src.clean import fill_missing_lead_times # Typical workflow: # - Explore in Jupyter: notebooks/01_exploration.ipynb # - Extract functions: src/clean.py, src/features.py, src/model.py # - Create entry point: main.py that imports and calls everything # - Run the pipeline: python main.py data/raw/orders.csv # Example main.py structure def main(): # args = parse_args() logger.info("=== Pipeline Start ===") # Step 1: Load # df = load_data(args.input_file) # Step 2: Clean # df = clean_data(df) # Step 3: Features # df = build_features(df) # Step 4: Model # results = train_and_evaluate(df, args.n_trees) # Step 5: Report # generate_report(results, args.output_dir) logger.info("=== Pipeline Complete ===") if __name__ == "__main__": main()
Extend this pipeline by adding a third model (GradientBoostingRegressor from scikit-learn). Compare all three models using 5-fold cross-validation and select the best one. Add the cross-validation results to the summary report.
Refactor the entire pipeline into a single Python script with separate functions for each step. The script should accept a CSV filename as a command-line argument using argparse (not sys.argv) and produce both a PNG chart and a printed summary. Add --output-dir and --verbose flags.
Write three pytest test functions for the data pipeline: one that tests the missing value filling logic, one that tests the feature engineering step (revenue = units * unit_price), and one that tests that the model's R-squared is above 0.0 on the test set (a basic sanity check). Run them with pytest tests/ -v and verify all pass.
Set up a complete project structure for a new dataset of your choice: create the data/raw/, src/, output/, and tests/ directories. Write a requirements.txt file listing only the packages you import. Replace all print() calls with logging statements at appropriate severity levels (INFO for progress, WARNING for data quality issues, ERROR for failures). Run the pipeline and verify that the log file contains all expected messages.
logging instead of print() for production pipelines; it provides timestamps, severity levels, and file output.argparse to make your scripts flexible and reusable across different datasets and configurations.requirements.txt and use virtual environments for reproducibility..py modules for your production pipeline.