PracticeTutorialsBlogPricing
Back to blog
Python

Building Your First ETL Pipeline in Python

Every data engineering interview involves ETL. Whether it is a take-home project, a system design question, or a coding problem, you need to demonstrate that you can build a reliable data pipeline. This guide walks through a complete, production-grade ETL pipeline in Python — the same patterns used at companies like Airbnb, Stripe, and Lyft.

What is ETL?

ETL stands for Extract, Transform, Load. Extract means pulling raw data from a source (API, database, S3, CSV). Transform means cleaning, normalising, and reshaping that data into the structure you need. Load means writing the result to a destination (data warehouse, database, file). The goal: reliable, repeatable data movement.

Step 1: Extract — Reading from a Source

import requests
import pandas as pd
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)

def extract_from_api(url: str, params: dict = None) -> pd.DataFrame:
    """Extract data from a REST API with retry logic."""
    logger.info(f"Extracting from {url}")
    try:
        resp = requests.get(url, params=params, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        df = pd.DataFrame(data)
        logger.info(f"Extracted {len(df)} rows")
        return df
    except requests.exceptions.RequestException as e:
        logger.error(f"Extraction failed: {e}")
        raise

def extract_from_csv(filepath: str) -> pd.DataFrame:
    """Extract data from a CSV file."""
    logger.info(f"Reading {filepath}")
    df = pd.read_csv(filepath, parse_dates=['created_at', 'updated_at'])
    logger.info(f"Read {len(df)} rows, {len(df.columns)} columns")
    return df

Step 2: Transform — Clean and Reshape

Transformation is where most of the complexity lives. The key principles: validate early, fail loudly, be explicit about nulls, and make transformations idempotent (running them twice should produce the same result).

def transform(df: pd.DataFrame) -> pd.DataFrame:
    """Clean and transform the raw data."""
    logger.info("Starting transformation")
    original_len = len(df)

    # 1. Drop exact duplicates
    df = df.drop_duplicates()

    # 2. Validate required columns
    required = ['user_id', 'event_type', 'created_at']
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    # 3. Handle nulls — log and drop rows missing critical fields
    null_counts = df[required].isnull().sum()
    if null_counts.any():
        logger.warning(f"Null counts in required columns:\n{null_counts[null_counts > 0]}")
    df = df.dropna(subset=['user_id'])

    # 4. Normalise types
    df['user_id']    = df['user_id'].astype(str).str.strip()
    df['event_type'] = df['event_type'].str.lower().str.strip()
    df['created_at'] = pd.to_datetime(df['created_at'], utc=True)

    # 5. Derived columns
    df['date']  = df['created_at'].dt.date
    df['hour']  = df['created_at'].dt.hour
    df['is_mobile'] = df['platform'].isin(['ios', 'android'])

    logger.info(f"Transformation complete: {original_len} → {len(df)} rows")
    return df

Step 3: Load — Writing to the Destination

import sqlalchemy

def load_to_postgres(df: pd.DataFrame, table: str, engine, if_exists: str = 'append') -> None:
    """Load DataFrame to a PostgreSQL table."""
    logger.info(f"Loading {len(df)} rows to {table}")
    df.to_sql(
        name=table,
        con=engine,
        if_exists=if_exists,   # 'replace', 'append', or 'fail'
        index=False,
        chunksize=1000,        # write in batches to avoid memory spikes
        method='multi',        # use multi-row INSERT for performance
    )
    logger.info(f"Load complete")

# Idempotent upsert pattern — safe to re-run
UPSERT_SQL = """
    INSERT INTO events (user_id, event_type, created_at, date, hour)
    VALUES %s
    ON CONFLICT (user_id, event_type, created_at) DO NOTHING;
"""

Step 4: Glue It Together

def run_pipeline(date: str) -> None:
    """Main ETL orchestrator. Pass a date to backfill."""
    logger.info(f"Pipeline starting for {date}")
    engine = sqlalchemy.create_engine(os.getenv("DATABASE_URL"))

    try:
        # Extract
        raw = extract_from_api(
            url="https://api.example.com/events",
            params={"date": date, "limit": 10000}
        )

        # Transform
        clean = transform(raw)

        # Load
        load_to_postgres(clean, table="events_staging", engine=engine)

        logger.info(f"Pipeline completed successfully for {date}")

    except Exception as e:
        logger.error(f"Pipeline failed for {date}: {e}")
        raise   # re-raise so orchestrators (Airflow, Prefect) can mark as failed

if __name__ == "__main__":
    import sys
    date = sys.argv[1] if len(sys.argv) > 1 else str(datetime.date.today())
    run_pipeline(date)

Production Checklist

Before deploying any ETL pipeline: add structured logging (JSON logs for Datadog/Splunk), add data quality checks (row count, null rate, value range assertions), make the pipeline idempotent (re-runnable without side effects), handle partial failures gracefully, add alerting on failure, and document expected schema and SLAs.

Practice Python data engineering questions