Building Your First ETL Pipeline in Python
Every data engineering interview involves ETL. Whether it is a take-home project, a system design question, or a coding problem, you need to demonstrate that you can build a reliable data pipeline. This guide walks through a complete, production-grade ETL pipeline in Python — the same patterns used at companies like Airbnb, Stripe, and Lyft.
What is ETL?
ETL stands for Extract, Transform, Load. Extract means pulling raw data from a source (API, database, S3, CSV). Transform means cleaning, normalising, and reshaping that data into the structure you need. Load means writing the result to a destination (data warehouse, database, file). The goal: reliable, repeatable data movement.
Step 1: Extract — Reading from a Source
import requests
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
def extract_from_api(url: str, params: dict = None) -> pd.DataFrame:
"""Extract data from a REST API with retry logic."""
logger.info(f"Extracting from {url}")
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
df = pd.DataFrame(data)
logger.info(f"Extracted {len(df)} rows")
return df
except requests.exceptions.RequestException as e:
logger.error(f"Extraction failed: {e}")
raise
def extract_from_csv(filepath: str) -> pd.DataFrame:
"""Extract data from a CSV file."""
logger.info(f"Reading {filepath}")
df = pd.read_csv(filepath, parse_dates=['created_at', 'updated_at'])
logger.info(f"Read {len(df)} rows, {len(df.columns)} columns")
return dfStep 2: Transform — Clean and Reshape
Transformation is where most of the complexity lives. The key principles: validate early, fail loudly, be explicit about nulls, and make transformations idempotent (running them twice should produce the same result).
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and transform the raw data."""
logger.info("Starting transformation")
original_len = len(df)
# 1. Drop exact duplicates
df = df.drop_duplicates()
# 2. Validate required columns
required = ['user_id', 'event_type', 'created_at']
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}")
# 3. Handle nulls — log and drop rows missing critical fields
null_counts = df[required].isnull().sum()
if null_counts.any():
logger.warning(f"Null counts in required columns:\n{null_counts[null_counts > 0]}")
df = df.dropna(subset=['user_id'])
# 4. Normalise types
df['user_id'] = df['user_id'].astype(str).str.strip()
df['event_type'] = df['event_type'].str.lower().str.strip()
df['created_at'] = pd.to_datetime(df['created_at'], utc=True)
# 5. Derived columns
df['date'] = df['created_at'].dt.date
df['hour'] = df['created_at'].dt.hour
df['is_mobile'] = df['platform'].isin(['ios', 'android'])
logger.info(f"Transformation complete: {original_len} → {len(df)} rows")
return dfStep 3: Load — Writing to the Destination
import sqlalchemy
def load_to_postgres(df: pd.DataFrame, table: str, engine, if_exists: str = 'append') -> None:
"""Load DataFrame to a PostgreSQL table."""
logger.info(f"Loading {len(df)} rows to {table}")
df.to_sql(
name=table,
con=engine,
if_exists=if_exists, # 'replace', 'append', or 'fail'
index=False,
chunksize=1000, # write in batches to avoid memory spikes
method='multi', # use multi-row INSERT for performance
)
logger.info(f"Load complete")
# Idempotent upsert pattern — safe to re-run
UPSERT_SQL = """
INSERT INTO events (user_id, event_type, created_at, date, hour)
VALUES %s
ON CONFLICT (user_id, event_type, created_at) DO NOTHING;
"""Step 4: Glue It Together
def run_pipeline(date: str) -> None:
"""Main ETL orchestrator. Pass a date to backfill."""
logger.info(f"Pipeline starting for {date}")
engine = sqlalchemy.create_engine(os.getenv("DATABASE_URL"))
try:
# Extract
raw = extract_from_api(
url="https://api.example.com/events",
params={"date": date, "limit": 10000}
)
# Transform
clean = transform(raw)
# Load
load_to_postgres(clean, table="events_staging", engine=engine)
logger.info(f"Pipeline completed successfully for {date}")
except Exception as e:
logger.error(f"Pipeline failed for {date}: {e}")
raise # re-raise so orchestrators (Airflow, Prefect) can mark as failed
if __name__ == "__main__":
import sys
date = sys.argv[1] if len(sys.argv) > 1 else str(datetime.date.today())
run_pipeline(date)Production Checklist
Before deploying any ETL pipeline: add structured logging (JSON logs for Datadog/Splunk), add data quality checks (row count, null rate, value range assertions), make the pipeline idempotent (re-runnable without side effects), handle partial failures gracefully, add alerting on failure, and document expected schema and SLAs.