Logo
Brain/Python_for_Data_Engineering

Python_for_Data_Engineering

#python#pandas#numpy#data-processing#optimization

Python for DataOps

The foundation of every great pipeline is clean, efficient, and testable code. The purpose of this module is to establish a solid Python foundation tailored specifically for the needs of DataOps workflows.

Requirements

  • Write performant Python code using numpy and pandas.
  • Manage and version code with Git using DataOps best practices.
  • Test and package preprocessing pipelines for reproducibility and automation.
  • Enhance performance and optimize memory usage.

Technical Skills Gained

  • Mastery of Pandas and NumPy for data engineering tasks.
  • Efficient data handling through memory optimizations and vectorized operations.
  • Building modular and testable preprocessing pipelines.
  • Managing collaborative workflows with Git and GitHub.
  • Ensuring code quality and reproducibility via packaging and environment isolation.

Tools & Technologies

  • Python 3.10+
  • Libraries: pandas, numpy
  • Version Control: Git, GitHub
  • Package Management: Poetry, pip, venv, uv
  • Testing: pytest
  • IDE: Jupyter Lab, VS Code

What's a python package?

a collection of python modules bundled under a directory with an __init__.py file, It allows reusable, importable components

Why?

  • promote modular code reuse across multiple projects
  • standardize your ML pipeline
  • improve modularity and testability
  • share your tools with others

When?

  • when your pipeline functions become stable and reusable
  • when collabing with multiple engineers or teams
  • when deploying to airflow, kubeflow or mlflow
  • when preparing for CI/C, deployment or testing

How?

Typical structure

reco_pipeline reco_pipeline init.py cleaning.py features.py validation.py tests test_cleaning.py setup.py pyproject.toml(optional) requirements.txt

setup.py(minimal)

from setuptools import setup, find_packages

setup(
	name = "reco_pipeline",
	version = "0.1",
	packages = find_packages(),
	install_requires=[
		"pandas>=1.5",
		"numpy>=1.23"
		]
)

Screenshot 2025-07-28 at 15.08.49.png

analogy

let's think of a package as a toolbox

  • each tool inside is a module
  • the label on the box is your package name
  • you can take this toolbox with you anywhere, reuse it, version it and even give it to other people

Introduction to Pandas

Pandas is a python library for:

  • tabular data manipulation (like SQL/Excel)
  • Built on top of NumPy
  • Core objects: Series (1D) and DataFrame (2D)

Why it matters in DataOps

  • preprocess raw data for ML pipelines
  • Validate an clean data
  • Analyze data performance, quality, drift, etc

Creating DataFrames

import pandas as pd
#from dictionary
data={
'product_id':[1,2,3],
'price':[10.5, 20.0, 15.75],
'stock':[100, 50, 0]
}

df = pd.DataFrame(data)

Data Loading in Pandas

Reading structured and semi-structured data into DataFrame

Why does it matter?

As a DataOps engineer, your pipelines usually from raw files (logs, exports, APIs). Efficient reading is critical for both accuracy and performance

When?

Any time you need to ingest structured tabular data like product catalogs, user logs or data lake exports

How?

CSV(Comma-Separated Values)
import pandas as pd

df=pd.read_csv("path_to_csv_file")
print(df.head())
print(df.tail())

Pros: Human-readable and simple. Cons: no schema, large file sizes

Parquet (Columnar Format)
import pandas as pd
df_parquet=pd.read_parquet("path_to_parquet_file")

Pros: compressed, typed, great for large tables, used in spark, aws s3, data lakes

JSON (Nested/Structured Data)
import pandas as pd
df_json = pd.read_json("path_to_json_file")

often used for clickstream logs or events lines=True is necessary for JSONL (one object per line)

Optimizing on Load

import pandas as pd

df=pd.read_csv(
	"path_to_csv_file", #assuming it's a products files with product ids and price
	dtype={
	"product_id":str,
	"price":float32}
)

Data selection, filtering and indexing

working with subsets of rows and columns

Why does it matter?

preprocessing for ML often requires cleaning, filtering or selecting only relevant observations

When?

Before transformations, feature engineering or when isolating records of interest

How?

Columns selection

#single col selection
df["price"]

#multiple col selection
df["product_id", "price", "category"](/brain/%22product_id%22%2C%20%22price%22%2C%20%22category%22)

#boolean filter: electronics under 500
electronics=df[(df["category"]=="Electronics")&(df["price"]>500)]

Row filtering

#products above usd 200
electronics =df[(df[category]=="Electronics")&(df["price"]>500)]

Boolean masks & indexing

df.loc[df["stock"]==0]
df.iloc[0:10]

Set index and use it

df.set_index("product_id", inplace=True)
df.loc["P12345"]

Sorting and handling missing data

Operations to organize and clean tabular data (sort_values(), dropna(), fillna(), isnull())

Why does it matter?

Clean, ordered data improves down steam processing, missing values can corrupt ML training or break production pipelines.

When?

During early preprocessing stages before any modeling or aggregation

How?

Check for nulls

df.isnull().sum()

Drop or fill nulls

df.dropna(subset=["price"], inplace=True)
df["stock"].fillna(0, inplace=True)

Sort values

df.sort_values(by="price", ascending=False)

Deduplication strategies

Remove duplicate records based on logic(row-level or key-based)

Why?

Duplicate data leads to inflated metrics and model bias

How?

Row-level deduplication

df.drop_duplicates()

subset-based deduplication

df.drop_duplicates(subset=["title", "product_id", "price"])

Keep first/last

df.drop_duplicates(subset="title", keep="last")

Concatenating tables

combine multiple datasets into one along rows or columns

Why?

Used when appending logs, creating feature unions or combining distributed data

How?

#row wise, same schema
all_df =pd.concat([df1, df2])

#col wise: side by side features
df_combined = pd.concat([df_a, df_b], axis=1)

Merging and enriching datasets

Joins two datasets based on common columns (keys); using either pd.merge(), join()

Why?

both merge() and join() are methods in pandas that allow you to combine datasets (dataframes) based on common columns or indexes. for example in recommendation systems, you often join metadata (products, users) with logs (clicks, views) to build features

When?

any time you enrich data (e.g. join user info with behavior logs)

How?

Post-merge checks

df_merged.isnull().sum()
df_merge["product_id"].nunique()

/nunique: count number of distinct elements in specified axis. Return Series with number of distinct elements. Can ignore NaN values.

merge() – more powerful and explicit

  • inspired by SQL-style joins
  • Allows you to spcift which columns to join on
  • supports left, right, inner, outer joins
  • can merge on column names or indices
pd.merge(df_orders, df_customers, on=customer_id, how="left")

join() – simpler and index based

  • shortcut method for joining on indices
  • used when one or both dataframes have meaningful indices
  • less flexible for multi-key joins
df.drop_duplicates(subsdf1.set_index("customer_id").join(df2.set_index("customer_id")) et="title", keep="last")

Screenshot 2025-07-29 at 16.13.18.png

GroupBy and Aggregation

  • groupby() splits a dataframe inot groups based on one or more keys(IE, user_id, product_id, date)
  • agg() lets you summarize each group by applying one or more functions like sum(), mean(), count(), min(), max()

Why?

use to create features like "click frequency", "avg session time", "top-N products". In real-world data engineering tasks especially for ML: Screenshot 2025-07-29 at 16.41.43.png

When?

during data summarization, feature engineering or quality checks

How?

group by

#Count number od products per category
df.groupby("category").size().reset_index(name="product_id")

#Average price by category
df.groupby("category")["price"].mean()

Multiple aggregations

df.groupby('user_id').agg({
	'product_id':'nunique', #how many unique products
	'feature': 'count' #total features
})

Named aggregations(recommended for clarity)

df.groupby('user_id').agg(
	num_products=('product_id', 'unique'),
	total_features=('features','count')
).reset_index()

Pandas Recap

Screenshot 2025-07-29 at 16.48.39.png Screenshot 2025-07-29 at 16.48.51.png Screenshot 2025-07-29 at 16.49.03.png Screenshot 2025-07-29 at 16.49.19.png Screenshot 2025-07-29 at 16.49.31.png


NumPy(numerical python) is a fundamental package for:

  • fast numerical computation
  • working with arrays and matrices
  • vectorized operations (no python loops)

Why does it matter?

it powers:

  • fast data processing(under the hood of pandas, sklearn, etc)
  • memory-efficient storage and transformation
  • ML model preprocessing (EG: normalization, reshaping)

How?

Numpy basics

import numpy as np

a=np.array(1 ,2 ,3)
b=np.array([1 ,2], [3 ,4](/brain/1%20%2C2%5D%2C%20%5B3%20%2C4))
Data types (dtypes)

arr=np.array([1.0, 2.0 ,3.0], dtype=np.float32

  • helps control memory usage
  • needed for data serialization
Array manipulation

shape, reshape, transpose

arr=np.arrange(12).reshape(3,4)
arr.T

Indexing, slicing

arr[0, :] # first row
arr[:, 1:3] # columns 1 and 2

broadcasting

a=np.array([1], [2],[3](/brain/1%5D%2C%20%5B2%5D%2C%5B3))
b=np.array([10,20,30])

vectorized computation element wise operations

x=np.array([1,2,3])
x+**2+2*x+1

aggregations

x.sum(), x.mean(), x.std(), x.min(), x.max()

Randomness and simulation random arrays

np.random.seed(42)
np.random.randint(0 ,100 , size=(3,3))
np.random.normal(0,1,100)

why important:

  • data augmentation
  • simulation-based tests
  • ML training scenarios

Useful NumPy functions for DataOps

Screenshot 2025-07-29 at 17.04.53.png


Outcome

  • Analyze memory usage of dataframes
  • Optimize memory by changing data types
  • Load and process massive files using chunking strategies
  • Apply lazy evaluation techniques for big data
  • Build scalable data ingestion pipelines for preprocessing

Diagnosing memory usage in pandas

Leaning how to analyze and reduce memory usage in large dataframes

Why does it matter?

In prod pipiles, large datasets (logs, interactions, telemetry) must be loaded without blowing up memory – especially in containers or cloud workers

When?

Before training models, running transforms or storing data

How?

Inspect memory usage

df=pd.read_csv("products.csv)
df.info(memory_usage='deep')
df.memory_usage(deep=True).sum()=/1024**2 # in MB

# Get the size of the Dataframe object itself
size_df=sys.getsizeof(df)
print(d"Size of DataFrame object (shallow): {size_df} bytes")

Check data types

df.types

Downcasting

Changing data type of a smaller, more specific variant, for example:

Original TypeDowncast ToExample
int64int32Order IDs
float64float32Product prices
objectcategoryCountry, Category
Downcasting helps reduce memory usage without changing the values or behavior of your data

Why does it matter?

When working with large datasets, even small inefficiences can cost you gigabytes of memory, leading to:

  • Crashes and out of memory errors
  • Slow data loading and transformations
  • Unscalable pipelines in production Downcasting is the process of converting data to a more memory-efficient type without losing information

When?

Use downcasting immediately after loading a large dataset or when:

  • Memory usage is high (df.memory_usage(deep=True.sum() > ~ 100MB)
  • columns contain integers, floats or repetitive strings
  • you're preparing data for modeling or ingestion in resource-constrained environments

How?

Downcasting numeric columns

import pandas as pd
import numpy as np

# Example dataframe
df = pd.DataFrame({
	'order_id':np.arrange(1,1000001), # int64
	'price':np.random.uniform(1,500, 1000000) #float64
	})

# Check original types and memory
print(df.dtypes)
print(f"Original size: {df.memory_usage(deep=True.sum()/1024**2:.2f) MB}")

# Downcast intergers
df['order_id'] = pd.to_numeric(df['order_id'], downcast='unsigned')

# Downcast floats
df['price'] = pd.to_numeric(df['price'], downcast = 'float')

# Check optimized memory
print(df.dtypes)
print(f"Original size: {df.memory_usage(deep=True.sum()/1024**2:.2f) MB}")

Downcasting categorical/object columns

df = pd.DataFrame({
	'country': np.random.choice(['USA', 'France', 'Germany', 'Canada'], 1_000_000)
})
print(df['country'].memory_usage(deep=True)/1024**2, "MB before")

# Convert object to category
df['country'] = df['country'].astype('category')
print(df['country'].memory_usage(deep=True)/1024**2, "MB after")

Note: Downcasting is like switching from full-fat milk to skim milk – it still tastes the same , but it's lighter. When my DataFrame uses int64 or float64, it's often overkill: I check if I can safely switch to smaller types like int32, float32 or even categorical strings, which saves memory. I make sure not to lose any data, of course!

Chunked Reading of Massive Data Files

Chunked reading meands loading a file in smaller portions (chunks) instead of all at once. In pandas, this is done using:

pd.read_csv('file.csv', chunksize=100_000)

this returns an iterator, not a DataFrame – each iteration yields a DataFrame chunk.

Why?

Imagine trying to open a 20GB CSV file in memory – your system crashes. When data files are too large to fit into RAM, reading the entire file at once is inefficient or impossible. That's where chunked reading comes in. Chunking lets you load only part of the data at a time, process it in-memory, and stream the results efficiently.

When?

Use chunked reading when:

  • Your data doesn't fit in memory
  • You want to preprocess or aggregate large logs, clickstreams or telemetry files
  • You're building streaming or batch pipelines in prod
  • You want to monitor progress and have better failure recovery

How?

basic chunked reading

import pandas as pd

# Define chunk size
CHUNKSIZE= 100_000

# Create an iteration
reader =pd.read_csv('huge_orders.csv', chunksize=CHUNKSIZE)

# Process each chunk
for chunk in reader:
	print(f"Chunk shape :{chunk.shape}")

aggregate while reading

total_sales=0
for chink in pd.read_csv('huge_orders.csv', chunksize=100_000):
	chunk['total']=chunk['quantity']*chunk['price']
	total_sales+=chunk['total'].sum()
print(f"total sales: {total_sales:,.2f}")

filter and save to a new file

filtered_chunks=[]
for chunk in pd.read_csv('huge_orders.csv', chunksize=100_000):
	filtered=chunk[chunk['country']=='USA']
	filtered_chunks.append(filtered)
# Combine and save
pd.concat(filtered_chunks).to_csv('usa_orders', index=False)

Note: Chunking is like eating a big sandwich in small bites. Instead of reading a 5GB CSV file all at once and choking your RAM, you load it in parts _ like 100,000 rows at a time. You can process or filter each chunk, then forget it. It's scalable, efficient and lets you handle big data on small machines

Aggregation while Streaming

Streaming aggregation means computing aggregates like sum, mean, count, min/max or groupby while reading data in chunks. Instead of building one big DataFrame and aggregating at the end, we accumulate the result progressively.

Why?

In real world DataOps, datasets are often too large to fully load into memory, but you still need answers like:

  • What's the total revenue?
  • How many unique users?
  • What's the average session time per country? Streaming aggregation lets you compute these statistics incrementally, chunk by chunk, without loading the entire dataset at once. this is critical for. building scalable, memory safe and real time pipelines

When?

Use streaming aggregation when:

  • files are too large to fit into memory (>1GB)
  • you need to process data continuously
  • you want to do real time monitoring or reporting
  • you're building batch ETL pipelines and want to avoid intermediate storage

How?

Global sum/ count/ mean

import pandas as pd
total = 0
count = 0

for chunk in pd.read_csv("huge_orders.csv", chunksize=100_000):
	total+=chunk['price'].sum()
	count+=chunk['price'].count()
mean_price=total/count
print(f"Average price: {mean_price:.2f}")

GroupBy aggregation while streaming

from collections import defaultdict

country_sales=defaultdict(float)
for chunk in pd.read_csv("huge_orders.csv", chunksize=100_000):
	chunk['total']=chunk['quantity']*chunk['price']
	grouped=chunk.groupby('country')['total'].sum()
	for country, total in grouped.items():
		country_sales[country] += total

print(country_sales)

Unique value counting across chunks

unique_customers=set()

for chunk in pd.read_csv('huge_orders.csv', chunksize=100_000):
	unique_customers.update(chunk['customer_id'].nunique())

print(f"Total unique customers: {len(unique_customers)}")

Note: instead of loading the entire dataset, we load it bit by bit – like streaming a video instead of downloading the whole thing. we sum up values as they come in. For example, we keep a total_sales variable and keep adding to it chunk by chunk, same for groupby – we merge mini-aggregates into a global result

Big Data Techniques for Memory-Limited Environments

while pandas is powerful, it wasn't built for big data. When working with datasets that are:

  • larger than your machine's RAM
  • growing in real time
  • shared across teams and need parallel processing You'll hit a wall with pandas That's where big data tools like dask shine: they scale pandas-like code across memory, CPU, or even a cluster

What are big data techniques for low-memory environments?

these techniques include: Screenshot 2025-07-31 at 15.19.33.png

tools: dask, vaex, modin, polars, apache arrow, duckdb, pyspark

When to use dask vs Pandas

Screenshot 2025-07-31 at 15.20.29.png

How to use dask to scale beyond pandas

Setup and load data with Dask

import dask.dataframe as dd

# Load a large CSV lazily
df=dd.read_csv("huge_orders.csv")

# Note: No data is loaded yet
print(df.head()) # Triggers a small computation

Compute aggregations lazily

# Calculate total revenue per country
df['total']=df['quantiy'] * df['price']
country_sales = df.groupby('country')['total'].sum()

# Nothing is computed yet!
result = country_sales.compute()
print(result)

Parallel CSV writing

# Filter and save in parallel
df[df['country']=='France'].to_csv('output/france-*.csv', index=False)

Dask architecture in 10 seconds

  • Dask splits your data into chunks (partitions)
  • Each chunk is a pandas dataframe
  • Dask builds a task graph and executes lazily, only when .compute() is called
  • You can run it on your machine or scale to a cluster

Dask is like pandas, but with superpowers. It read big files in pieces, delays operations until necessary, and runs things in parallel. I write code just like pandas, but it doesn't crash on bif giles – and i only compute when i'm ready

NumPy Data Types and Their Sizes in Memory

NumPy Data TypeDescription & RangeSize (Bytes)
int8Signed integer (–128 to 127)1
uint8Unsigned integer (0 to 255)1
int16Signed integer (–32,768 to 32,767)2
uint16Unsigned integer (0 to 65,535)2
int32Signed integer (–2,147,483,648 to 2,147,483,647)4
uint32Unsigned integer (0 to 4,294,967,295)4
int64Signed integer (–9.22e18 to 9.22e18) approx8
uint64Unsigned integer (0 to 1.84e19) approx8
float16Half precision float (~±6.55e4, 3 decimal digits)2
float32Single precision float (~±3.4e38, 7 decimal digits)4
float64Double precision float (~±1.8e308, 15 decimal digits)8
complex64Complex (2 × float32)8
complex128Complex (2 × float64)16
bool_Boolean (True or False)1
object_Python object reference8 (platform-dependent)
str_ / bytes_Fixed-length string/bytes (length N)N
datetime64Date/time with nanosecond resolution8
timedelta64Difference between datetimes8

Note: For floating point types, range and precision are approximate and implementation-defined. Use np.finfo() or np.iinfo() for programmatic inspection.


Downcasting in pandas refers to converting data types to smaller, more memory-efficient types to reduce memory usage and improve performance.

Why Downcast DataFrames?

Large datasets can consume significant memory if default types like int64 or float64 are used unnecessarily. Downcasting:

  • Reduces RAM usage
  • Speeds up data processing
  • Makes DataFrames more efficient to store or transmit

Example

Before downcasting:

import pandas as pd
import numpy as np

df = pd.DataFrame({
    'int_col': np.random.randint(0, 100, size=1_000_000),
    'float_col': np.random.rand(1_000_000)
})

print(df.info(memory_usage='deep'))

The columns are likely int64 and float64 by default.

How to Downcast

1. Use pd.to_numeric() with downcast

df['int_col'] = pd.to_numeric(df['int_col'], downcast='integer')
df['float_col'] = pd.to_numeric(df['float_col'], downcast='float')

2. Use .astype() manually

df['int_col'] = df['int_col'].astype('int16')   # if range allows
df['float_col'] = df['float_col'].astype('float32')

3. Automated Downcasting Function

def downcast_df(df):
    for col in df.select_dtypes(include=['int', 'float']).columns:
        col_type = df[col].dtype
        if pd.[api](/brain/Web_Scraping_and_APIs).types.is_integer_dtype(col_type):
            df[col] = pd.to_numeric(df[col], downcast='integer')
        elif pd.[api](/brain/Web_Scraping_and_APIs).types.is_float_dtype(col_type):
            df[col] = pd.to_numeric(df[col], downcast='float')
    return df

Check Memory Savings

df_before = df.copy()
print("Before:", df_before.memory_usage(deep=True).sum() / 1024**2, "MB")

df = downcast_df(df)

print("After:", df.memory_usage(deep=True).sum() / 1024**2, "MB")

Caveats

  • Ensure no loss of precision or overflow occurs during downcasting.
  • Always check value ranges before downcasting.
print(df['int_col'].min(), df['int_col'].max())

Downcast object Columns

If a column is of type object but contains only a few repeated values, convert it to category:

df['category_col'] = df['category_col'].astype('category')

This significantly reduces memory usage if the number of unique values is small.

Summary

Column TypeMethodBenefit
Integerspd.to_numeric(..., downcast='int')Less memory usage
Floatspd.to_numeric(..., downcast='float')Acceptable precision
Object.astype('category')Huge memory saving
Manual.astype('int16'), etc.Full control

Linked to this note