Python_for_Data_Engineering
Python for DataOps
The foundation of every great pipeline is clean, efficient, and testable code. The purpose of this module is to establish a solid Python foundation tailored specifically for the needs of DataOps workflows.
Requirements
- Write performant Python code using
numpyandpandas. - Manage and version code with Git using DataOps best practices.
- Test and package preprocessing pipelines for reproducibility and automation.
- Enhance performance and optimize memory usage.
Technical Skills Gained
- Mastery of Pandas and NumPy for data engineering tasks.
- Efficient data handling through memory optimizations and vectorized operations.
- Building modular and testable preprocessing pipelines.
- Managing collaborative workflows with Git and GitHub.
- Ensuring code quality and reproducibility via packaging and environment isolation.
Tools & Technologies
- Python 3.10+
- Libraries:
pandas,numpy - Version Control: Git, GitHub
- Package Management: Poetry, pip, venv, uv
- Testing:
pytest - IDE: Jupyter Lab, VS Code
What's a python package?
a collection of python modules bundled under a directory with an __init__.py
file, It allows reusable, importable components
Why?
- promote modular code reuse across multiple projects
- standardize your ML pipeline
- improve modularity and testability
- share your tools with others
When?
- when your pipeline functions become stable and reusable
- when collabing with multiple engineers or teams
- when deploying to airflow, kubeflow or mlflow
- when preparing for CI/C, deployment or testing
How?
Typical structure
reco_pipeline reco_pipeline init.py cleaning.py features.py validation.py tests test_cleaning.py setup.py pyproject.toml(optional) requirements.txt
setup.py(minimal)
from setuptools import setup, find_packages
setup(
name = "reco_pipeline",
version = "0.1",
packages = find_packages(),
install_requires=[
"pandas>=1.5",
"numpy>=1.23"
]
)

analogy
let's think of a package as a toolbox
- each tool inside is a module
- the label on the box is your package name
- you can take this toolbox with you anywhere, reuse it, version it and even give it to other people
Introduction to Pandas
Pandas is a python library for:
- tabular data manipulation (like SQL/Excel)
- Built on top of NumPy
- Core objects: Series (1D) and DataFrame (2D)
Why it matters in DataOps
- preprocess raw data for ML pipelines
- Validate an clean data
- Analyze data performance, quality, drift, etc
Creating DataFrames
import pandas as pd
#from dictionary
data={
'product_id':[1,2,3],
'price':[10.5, 20.0, 15.75],
'stock':[100, 50, 0]
}
df = pd.DataFrame(data)
Data Loading in Pandas
Reading structured and semi-structured data into DataFrame
Why does it matter?
As a DataOps engineer, your pipelines usually from raw files (logs, exports, APIs). Efficient reading is critical for both accuracy and performance
When?
Any time you need to ingest structured tabular data like product catalogs, user logs or data lake exports
How?
CSV(Comma-Separated Values)
import pandas as pd
df=pd.read_csv("path_to_csv_file")
print(df.head())
print(df.tail())
Pros: Human-readable and simple. Cons: no schema, large file sizes
Parquet (Columnar Format)
import pandas as pd
df_parquet=pd.read_parquet("path_to_parquet_file")
Pros: compressed, typed, great for large tables, used in spark, aws s3, data lakes
JSON (Nested/Structured Data)
import pandas as pd
df_json = pd.read_json("path_to_json_file")
often used for clickstream logs or events lines=True is necessary for JSONL (one object per line)
Optimizing on Load
import pandas as pd
df=pd.read_csv(
"path_to_csv_file", #assuming it's a products files with product ids and price
dtype={
"product_id":str,
"price":float32}
)
Data selection, filtering and indexing
working with subsets of rows and columns
Why does it matter?
preprocessing for ML often requires cleaning, filtering or selecting only relevant observations
When?
Before transformations, feature engineering or when isolating records of interest
How?
Columns selection
#single col selection
df["price"]
#multiple col selection
df["product_id", "price", "category"](/brain/%22product_id%22%2C%20%22price%22%2C%20%22category%22)
#boolean filter: electronics under 500
electronics=df[(df["category"]=="Electronics")&(df["price"]>500)]
Row filtering
#products above usd 200
electronics =df[(df[category]=="Electronics")&(df["price"]>500)]
Boolean masks & indexing
df.loc[df["stock"]==0]
df.iloc[0:10]
Set index and use it
df.set_index("product_id", inplace=True)
df.loc["P12345"]
Sorting and handling missing data
Operations to organize and clean tabular data (sort_values(), dropna(), fillna(), isnull())
Why does it matter?
Clean, ordered data improves down steam processing, missing values can corrupt ML training or break production pipelines.
When?
During early preprocessing stages before any modeling or aggregation
How?
Check for nulls
df.isnull().sum()
Drop or fill nulls
df.dropna(subset=["price"], inplace=True)
df["stock"].fillna(0, inplace=True)
Sort values
df.sort_values(by="price", ascending=False)
Deduplication strategies
Remove duplicate records based on logic(row-level or key-based)
Why?
Duplicate data leads to inflated metrics and model bias
How?
Row-level deduplication
df.drop_duplicates()
subset-based deduplication
df.drop_duplicates(subset=["title", "product_id", "price"])
Keep first/last
df.drop_duplicates(subset="title", keep="last")
Concatenating tables
combine multiple datasets into one along rows or columns
Why?
Used when appending logs, creating feature unions or combining distributed data
How?
#row wise, same schema
all_df =pd.concat([df1, df2])
#col wise: side by side features
df_combined = pd.concat([df_a, df_b], axis=1)
Merging and enriching datasets
Joins two datasets based on common columns (keys); using either pd.merge(), join()
Why?
both merge() and join() are methods in pandas that allow you to combine datasets (dataframes) based on common columns or indexes. for example in recommendation systems, you often join metadata (products, users) with logs (clicks, views) to build features
When?
any time you enrich data (e.g. join user info with behavior logs)
How?
Post-merge checks
df_merged.isnull().sum()
df_merge["product_id"].nunique()
/nunique: count number of distinct elements in specified axis. Return Series with number of distinct elements. Can ignore NaN values.
merge() – more powerful and explicit
- inspired by SQL-style joins
- Allows you to spcift which columns to join on
- supports left, right, inner, outer joins
- can merge on column names or indices
pd.merge(df_orders, df_customers, on=customer_id, how="left")
join() – simpler and index based
- shortcut method for joining on indices
- used when one or both dataframes have meaningful indices
- less flexible for multi-key joins
df.drop_duplicates(subsdf1.set_index("customer_id").join(df2.set_index("customer_id")) et="title", keep="last")

GroupBy and Aggregation
- groupby() splits a dataframe inot groups based on one or more keys(IE, user_id, product_id, date)
- agg() lets you summarize each group by applying one or more functions like sum(), mean(), count(), min(), max()
Why?
use to create features like "click frequency", "avg session time", "top-N products".
In real-world data engineering tasks especially for ML:

When?
during data summarization, feature engineering or quality checks
How?
group by
#Count number od products per category
df.groupby("category").size().reset_index(name="product_id")
#Average price by category
df.groupby("category")["price"].mean()
Multiple aggregations
df.groupby('user_id').agg({
'product_id':'nunique', #how many unique products
'feature': 'count' #total features
})
Named aggregations(recommended for clarity)
df.groupby('user_id').agg(
num_products=('product_id', 'unique'),
total_features=('features','count')
).reset_index()
Pandas Recap

NumPy(numerical python) is a fundamental package for:
- fast numerical computation
- working with arrays and matrices
- vectorized operations (no python loops)
Why does it matter?
it powers:
- fast data processing(under the hood of pandas, sklearn, etc)
- memory-efficient storage and transformation
- ML model preprocessing (EG: normalization, reshaping)
How?
Numpy basics
import numpy as np
a=np.array(1 ,2 ,3)
b=np.array([1 ,2], [3 ,4](/brain/1%20%2C2%5D%2C%20%5B3%20%2C4))
Data types (dtypes)
arr=np.array([1.0, 2.0 ,3.0], dtype=np.float32
- helps control memory usage
- needed for data serialization
Array manipulation
shape, reshape, transpose
arr=np.arrange(12).reshape(3,4)
arr.T
Indexing, slicing
arr[0, :] # first row
arr[:, 1:3] # columns 1 and 2
broadcasting
a=np.array([1], [2],[3](/brain/1%5D%2C%20%5B2%5D%2C%5B3))
b=np.array([10,20,30])
vectorized computation element wise operations
x=np.array([1,2,3])
x+**2+2*x+1
aggregations
x.sum(), x.mean(), x.std(), x.min(), x.max()
Randomness and simulation random arrays
np.random.seed(42)
np.random.randint(0 ,100 , size=(3,3))
np.random.normal(0,1,100)
why important:
- data augmentation
- simulation-based tests
- ML training scenarios
Useful NumPy functions for DataOps

Outcome
- Analyze memory usage of dataframes
- Optimize memory by changing data types
- Load and process massive files using chunking strategies
- Apply lazy evaluation techniques for big data
- Build scalable data ingestion pipelines for preprocessing
Diagnosing memory usage in pandas
Leaning how to analyze and reduce memory usage in large dataframes
Why does it matter?
In prod pipiles, large datasets (logs, interactions, telemetry) must be loaded without blowing up memory – especially in containers or cloud workers
When?
Before training models, running transforms or storing data
How?
Inspect memory usage
df=pd.read_csv("products.csv)
df.info(memory_usage='deep')
df.memory_usage(deep=True).sum()=/1024**2 # in MB
# Get the size of the Dataframe object itself
size_df=sys.getsizeof(df)
print(d"Size of DataFrame object (shallow): {size_df} bytes")
Check data types
df.types
Downcasting
Changing data type of a smaller, more specific variant, for example:
| Original Type | Downcast To | Example |
|---|---|---|
| int64 | int32 | Order IDs |
| float64 | float32 | Product prices |
| object | category | Country, Category |
| Downcasting helps reduce memory usage without changing the values or behavior of your data |
Why does it matter?
When working with large datasets, even small inefficiences can cost you gigabytes of memory, leading to:
- Crashes and out of memory errors
- Slow data loading and transformations
- Unscalable pipelines in production Downcasting is the process of converting data to a more memory-efficient type without losing information
When?
Use downcasting immediately after loading a large dataset or when:
- Memory usage is high (
df.memory_usage(deep=True.sum()> ~ 100MB) - columns contain integers, floats or repetitive strings
- you're preparing data for modeling or ingestion in resource-constrained environments
How?
Downcasting numeric columns
import pandas as pd
import numpy as np
# Example dataframe
df = pd.DataFrame({
'order_id':np.arrange(1,1000001), # int64
'price':np.random.uniform(1,500, 1000000) #float64
})
# Check original types and memory
print(df.dtypes)
print(f"Original size: {df.memory_usage(deep=True.sum()/1024**2:.2f) MB}")
# Downcast intergers
df['order_id'] = pd.to_numeric(df['order_id'], downcast='unsigned')
# Downcast floats
df['price'] = pd.to_numeric(df['price'], downcast = 'float')
# Check optimized memory
print(df.dtypes)
print(f"Original size: {df.memory_usage(deep=True.sum()/1024**2:.2f) MB}")
Downcasting categorical/object columns
df = pd.DataFrame({
'country': np.random.choice(['USA', 'France', 'Germany', 'Canada'], 1_000_000)
})
print(df['country'].memory_usage(deep=True)/1024**2, "MB before")
# Convert object to category
df['country'] = df['country'].astype('category')
print(df['country'].memory_usage(deep=True)/1024**2, "MB after")
Note: Downcasting is like switching from full-fat milk to skim milk – it still tastes the same , but it's lighter. When my DataFrame uses int64 or float64, it's often overkill: I check if I can safely switch to smaller types like int32, float32 or even categorical strings, which saves memory. I make sure not to lose any data, of course!
Chunked Reading of Massive Data Files
Chunked reading meands loading a file in smaller portions (chunks) instead of all at once. In pandas, this is done using:
pd.read_csv('file.csv', chunksize=100_000)
this returns an iterator, not a DataFrame – each iteration yields a DataFrame chunk.
Why?
Imagine trying to open a 20GB CSV file in memory – your system crashes. When data files are too large to fit into RAM, reading the entire file at once is inefficient or impossible. That's where chunked reading comes in. Chunking lets you load only part of the data at a time, process it in-memory, and stream the results efficiently.
When?
Use chunked reading when:
- Your data doesn't fit in memory
- You want to preprocess or aggregate large logs, clickstreams or telemetry files
- You're building streaming or batch pipelines in prod
- You want to monitor progress and have better failure recovery
How?
basic chunked reading
import pandas as pd
# Define chunk size
CHUNKSIZE= 100_000
# Create an iteration
reader =pd.read_csv('huge_orders.csv', chunksize=CHUNKSIZE)
# Process each chunk
for chunk in reader:
print(f"Chunk shape :{chunk.shape}")
aggregate while reading
total_sales=0
for chink in pd.read_csv('huge_orders.csv', chunksize=100_000):
chunk['total']=chunk['quantity']*chunk['price']
total_sales+=chunk['total'].sum()
print(f"total sales: {total_sales:,.2f}")
filter and save to a new file
filtered_chunks=[]
for chunk in pd.read_csv('huge_orders.csv', chunksize=100_000):
filtered=chunk[chunk['country']=='USA']
filtered_chunks.append(filtered)
# Combine and save
pd.concat(filtered_chunks).to_csv('usa_orders', index=False)
Note: Chunking is like eating a big sandwich in small bites. Instead of reading a 5GB CSV file all at once and choking your RAM, you load it in parts _ like 100,000 rows at a time. You can process or filter each chunk, then forget it. It's scalable, efficient and lets you handle big data on small machines
Aggregation while Streaming
Streaming aggregation means computing aggregates like sum, mean, count, min/max or groupby while reading data in chunks. Instead of building one big DataFrame and aggregating at the end, we accumulate the result progressively.
Why?
In real world DataOps, datasets are often too large to fully load into memory, but you still need answers like:
- What's the total revenue?
- How many unique users?
- What's the average session time per country? Streaming aggregation lets you compute these statistics incrementally, chunk by chunk, without loading the entire dataset at once. this is critical for. building scalable, memory safe and real time pipelines
When?
Use streaming aggregation when:
- files are too large to fit into memory (>1GB)
- you need to process data continuously
- you want to do real time monitoring or reporting
- you're building batch ETL pipelines and want to avoid intermediate storage
How?
Global sum/ count/ mean
import pandas as pd
total = 0
count = 0
for chunk in pd.read_csv("huge_orders.csv", chunksize=100_000):
total+=chunk['price'].sum()
count+=chunk['price'].count()
mean_price=total/count
print(f"Average price: {mean_price:.2f}")
GroupBy aggregation while streaming
from collections import defaultdict
country_sales=defaultdict(float)
for chunk in pd.read_csv("huge_orders.csv", chunksize=100_000):
chunk['total']=chunk['quantity']*chunk['price']
grouped=chunk.groupby('country')['total'].sum()
for country, total in grouped.items():
country_sales[country] += total
print(country_sales)
Unique value counting across chunks
unique_customers=set()
for chunk in pd.read_csv('huge_orders.csv', chunksize=100_000):
unique_customers.update(chunk['customer_id'].nunique())
print(f"Total unique customers: {len(unique_customers)}")
Note: instead of loading the entire dataset, we load it bit by bit – like streaming a video instead of downloading the whole thing. we sum up values as they come in. For example, we keep a total_sales variable and keep adding to it chunk by chunk, same for groupby – we merge mini-aggregates into a global result
Big Data Techniques for Memory-Limited Environments
while pandas is powerful, it wasn't built for big data. When working with datasets that are:
- larger than your machine's RAM
- growing in real time
- shared across teams and need parallel processing You'll hit a wall with pandas That's where big data tools like dask shine: they scale pandas-like code across memory, CPU, or even a cluster
What are big data techniques for low-memory environments?
these techniques include:

tools: dask, vaex, modin, polars, apache arrow, duckdb, pyspark
When to use dask vs Pandas

How to use dask to scale beyond pandas
Setup and load data with Dask
import dask.dataframe as dd
# Load a large CSV lazily
df=dd.read_csv("huge_orders.csv")
# Note: No data is loaded yet
print(df.head()) # Triggers a small computation
Compute aggregations lazily
# Calculate total revenue per country
df['total']=df['quantiy'] * df['price']
country_sales = df.groupby('country')['total'].sum()
# Nothing is computed yet!
result = country_sales.compute()
print(result)
Parallel CSV writing
# Filter and save in parallel
df[df['country']=='France'].to_csv('output/france-*.csv', index=False)
Dask architecture in 10 seconds
- Dask splits your data into chunks (partitions)
- Each chunk is a pandas dataframe
- Dask builds a task graph and executes lazily, only when .compute() is called
- You can run it on your machine or scale to a cluster
Dask is like pandas, but with superpowers. It read big files in pieces, delays operations until necessary, and runs things in parallel. I write code just like pandas, but it doesn't crash on bif giles – and i only compute when i'm ready
NumPy Data Types and Their Sizes in Memory
| NumPy Data Type | Description & Range | Size (Bytes) |
|---|---|---|
int8 | Signed integer (–128 to 127) | 1 |
uint8 | Unsigned integer (0 to 255) | 1 |
int16 | Signed integer (–32,768 to 32,767) | 2 |
uint16 | Unsigned integer (0 to 65,535) | 2 |
int32 | Signed integer (–2,147,483,648 to 2,147,483,647) | 4 |
uint32 | Unsigned integer (0 to 4,294,967,295) | 4 |
int64 | Signed integer (–9.22e18 to 9.22e18) approx | 8 |
uint64 | Unsigned integer (0 to 1.84e19) approx | 8 |
float16 | Half precision float (~±6.55e4, 3 decimal digits) | 2 |
float32 | Single precision float (~±3.4e38, 7 decimal digits) | 4 |
float64 | Double precision float (~±1.8e308, 15 decimal digits) | 8 |
complex64 | Complex (2 × float32) | 8 |
complex128 | Complex (2 × float64) | 16 |
bool_ | Boolean (True or False) | 1 |
object_ | Python object reference | 8 (platform-dependent) |
str_ / bytes_ | Fixed-length string/bytes (length N) | N |
datetime64 | Date/time with nanosecond resolution | 8 |
timedelta64 | Difference between datetimes | 8 |
Note: For floating point types, range and precision are approximate and implementation-defined. Use
np.finfo()ornp.iinfo()for programmatic inspection.
Downcasting in pandas refers to converting data types to smaller, more memory-efficient types to reduce memory usage and improve performance.
Why Downcast DataFrames?
Large datasets can consume significant memory if default types like int64 or float64 are used unnecessarily. Downcasting:
- Reduces RAM usage
- Speeds up data processing
- Makes DataFrames more efficient to store or transmit
Example
Before downcasting:
import pandas as pd
import numpy as np
df = pd.DataFrame({
'int_col': np.random.randint(0, 100, size=1_000_000),
'float_col': np.random.rand(1_000_000)
})
print(df.info(memory_usage='deep'))
The columns are likely int64 and float64 by default.
How to Downcast
1. Use pd.to_numeric() with downcast
df['int_col'] = pd.to_numeric(df['int_col'], downcast='integer')
df['float_col'] = pd.to_numeric(df['float_col'], downcast='float')
2. Use .astype() manually
df['int_col'] = df['int_col'].astype('int16') # if range allows
df['float_col'] = df['float_col'].astype('float32')
3. Automated Downcasting Function
def downcast_df(df):
for col in df.select_dtypes(include=['int', 'float']).columns:
col_type = df[col].dtype
if pd.[api](/brain/Web_Scraping_and_APIs).types.is_integer_dtype(col_type):
df[col] = pd.to_numeric(df[col], downcast='integer')
elif pd.[api](/brain/Web_Scraping_and_APIs).types.is_float_dtype(col_type):
df[col] = pd.to_numeric(df[col], downcast='float')
return df
Check Memory Savings
df_before = df.copy()
print("Before:", df_before.memory_usage(deep=True).sum() / 1024**2, "MB")
df = downcast_df(df)
print("After:", df.memory_usage(deep=True).sum() / 1024**2, "MB")
Caveats
- Ensure no loss of precision or overflow occurs during downcasting.
- Always check value ranges before downcasting.
print(df['int_col'].min(), df['int_col'].max())
Downcast object Columns
If a column is of type object but contains only a few repeated values, convert it to category:
df['category_col'] = df['category_col'].astype('category')
This significantly reduces memory usage if the number of unique values is small.
Summary
| Column Type | Method | Benefit |
|---|---|---|
| Integers | pd.to_numeric(..., downcast='int') | Less memory usage |
| Floats | pd.to_numeric(..., downcast='float') | Acceptable precision |
| Object | .astype('category') | Huge memory saving |
| Manual | .astype('int16'), etc. | Full control |
