Logo
Brain/Additional_Resources

Additional_Resources

#resources#learning

Introduction and Environment Setup

How to figure out my Snowflake Account URL?

The easiest way is to take a look at your Snowflake Registration email and copy the string before .snowflakecomputing.com. In my case, this is frgcsyo-ie17820. Keep in mind that sometimes URLs include the .aws tag, too, such as frgcsyo-ie17820.aws. This isn't simple, I know. Even dbt Labs has its own section on how to figure it out.

Screenshot 2024-10-21 at 10 36 03

Automated Snowflake Setup

I encourage you to go through the automated Snowflake Setup as importing the data and setting the permissions from scratch might take quite some time. Follow the instructions here https://bit.ly/dbt-course-setup to set up your Snowflake database with a click of a button!

Snowflake data import (manual)

Only execute these commands if you decided to skip the Automated Snowflake Setup.

Resources presented:

Copy these SQL statements into a Snowflake Worksheet, fill in the public key, select all and execute them (i.e. pressing the play button).

-- Set up the defaults
CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH;
USE WAREHOUSE COMPUTE_WH;

CREATE DATABASE IF NOT EXISTS AIRBNB;
CREATE SCHEMA IF NOT EXISTS AIRBNB.RAW;
CREATE SCHEMA IF NOT EXISTS AIRBNB.DEV;

USE DATABASE AIRBNB;
USE SCHEMA RAW;

-- Create our three tables and import the data from S3
CREATE OR REPLACE TABLE raw_listings
                    (id integer,
                     listing_url string,
                     name string,
                     room_type string,
                     minimum_nights integer,
                     host_id integer,
                     price string,
                     created_at datetime,
                     updated_at datetime);

COPY INTO raw_listings (id,
                        listing_url,
                        name,
                        room_type,
                        minimum_nights,
                        host_id,
                        price,
                        created_at,
                        updated_at)
                   from 's3://dbt-datasets/listings.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

CREATE OR REPLACE TABLE raw_reviews
                    (listing_id integer,
                     date datetime,
                     reviewer_name string,
                     comments string,
                     sentiment string);

COPY INTO raw_reviews (listing_id, date, reviewer_name, comments, sentiment)
                   from 's3://dbt-datasets/reviews.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

CREATE OR REPLACE TABLE raw_hosts
                    (id integer,
                     name string,
                     is_superhost string,
                     created_at datetime,
                     updated_at datetime);

COPY INTO raw_hosts (id, name, is_superhost, created_at, updated_at)
                   from 's3://dbt-datasets/hosts.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

Snowflake user creation

Only execute these commands if you decided to skip the Automated Snowflake Setup.

Copy these SQL statements into a Snowflake Worksheet, fill in the public key, select all and execute them (i.e. pressing the play button).

-- Use an admin role
USE ROLE ACCOUNTADMIN;

-- Create the `transform` role
DROP ROLE IF EXISTS TRANSFORM;
CREATE ROLE TRANSFORM;
GRANT ROLE TRANSFORM TO ROLE ACCOUNTADMIN;

-- Create the default warehouse if necessary
GRANT OPERATE ON WAREHOUSE COMPUTE_WH TO ROLE TRANSFORM;

-- Create the `dbt` user and assign to role
DROP USER IF EXISTS dbt;
CREATE USER IF NOT EXISTS dbt
  LOGIN_NAME='dbt'
  TYPE=SERVICE
  RSA_PUBLIC_KEY="<<Add Your Public Key File's content here>>"
  DEFAULT_ROLE=TRANSFORM
  DEFAULT_WAREHOUSE='COMPUTE_WH'
  DEFAULT_NAMESPACE='AIRBNB.RAW'
  COMMENT='DBT user used for data transformation';

GRANT ROLE TRANSFORM to USER dbt;

-- Set up permissions to role `transform`
GRANT ALL ON WAREHOUSE COMPUTE_WH TO ROLE TRANSFORM;
GRANT ALL ON DATABASE AIRBNB to ROLE TRANSFORM;
GRANT ALL ON ALL SCHEMAS IN DATABASE AIRBNB to ROLE TRANSFORM;
GRANT ALL ON FUTURE SCHEMAS IN DATABASE AIRBNB to ROLE TRANSFORM;
GRANT ALL ON ALL TABLES IN SCHEMA AIRBNB.RAW to ROLE TRANSFORM;
GRANT ALL ON FUTURE TABLES IN SCHEMA AIRBNB.RAW to ROLE TRANSFORM;

-- Create the user and permissions for Preset.io
USE ROLE ACCOUNTADMIN;

DROP ROLE IF EXISTS REPORTER;
CREATE ROLE REPORTER;

DROP USER IF EXISTS PRESET;
CREATE USER PRESET
  LOGIN_NAME='preset'
  TYPE=SERVICE
  RSA_PUBLIC_KEY="<<Add Your Public Key File's content here>>"
  DEFAULT_WAREHOUSE='COMPUTE_WH'
  DEFAULT_ROLE=REPORTER
  DEFAULT_NAMESPACE='AIRBNB.DEV'
 COMMENT='Preset user for creating reports';

GRANT ROLE REPORTER TO USER PRESET;
GRANT ROLE REPORTER TO ROLE ACCOUNTADMIN;
GRANT ALL ON WAREHOUSE COMPUTE_WH TO ROLE REPORTER;
GRANT USAGE ON DATABASE AIRBNB TO ROLE REPORTER;
GRANT USAGE ON ALL SCHEMAS IN DATABASE AIRBNB to ROLE REPORTER;
GRANT USAGE ON FUTURE SCHEMAS IN DATABASE AIRBNB to ROLE REPORTER;
GRANT SELECT ON ALL TABLES IN SCHEMA AIRBNB.DEV to ROLE REPORTER;
GRANT SELECT ON FUTURE TABLES IN SCHEMA AIRBNB.DEV to ROLE REPORTER;

Python and Virtualenv setup, and dbt installation - Windows

Python

You want to use Python 3.12 as this is the most recent version that is compatible with every database adapter, Snowflake included.

https://www.python.org/downloads/release/python-31211/

Please make sure that you work with Python 3.12 as newer versions of python might not be compatible with some of the dbt packages.

Virtualenv setup

Here are the commands we executed in this lesson:

cd Desktop
mkdir course
cd course

python -m venv venv
# Windows:
venv\Scripts\activate
# Mac:
source venv/bin/activate

Virtualenv setup and dbt installation - Mac

iTerm2

We suggest you use iTerm2 instead of the built-in Terminal application.

https://iterm2.com/

dbt installation

Supported Python Versions: https://docs.getdbt.com/faqs/Core/install-python-compatibility

Here are the commands we execute in this lesson:

mkdir course
cd course
virtualenv venv
. venv/bin/activate
python --version
pip install dbt-snowflake==1.10.2
dbt --version

Create a dbt project (all platforms):

dbt init --skip-profile-setup airbnb

dbt 1.10 Compatibility Notes

In dbt 1.10 and later:

  • Use data_tests: instead of tests: for column tests
  • Test parameters must be wrapped under the arguments: property
  • The require_generic_test_arguments_property flag is no longer needed

Models

Code used in the lesson

SRC Listings

models/src/src_listings.sql:

WITH raw_listings AS (
    SELECT
        *
    FROM
        AIRBNB.RAW.RAW_LISTINGS
)
SELECT
    id AS listing_id,
    name AS listing_name,
    listing_url,
    room_type,
    minimum_nights,
    host_id,
    price AS price_str,
    created_at,
    updated_at
FROM
    raw_listings

SRC Reviews

models/src/src_reviews.sql:

WITH raw_reviews AS (
    SELECT
        *
    FROM
        AIRBNB.RAW.RAW_REVIEWS
)
SELECT
    listing_id,
    date AS review_date,
    reviewer_name,
    comments AS review_text,
    sentiment AS review_sentiment
FROM
    raw_reviews

Exercise

Create a model which builds on top of our raw_hosts table.

  1. Call the model models/src/src_hosts.sql
  2. Use a CTE (common table expression) to define an alias called raw_hosts. This CTE selects every column from the raw hosts table AIRBNB.RAW.RAW_HOSTS
  3. In your final SELECT, select every column and record from raw_hosts and rename the following columns:
    • id to host_id
    • name to host_name

Solution

WITH raw_hosts AS (
    SELECT
        *
    FROM
       AIRBNB.RAW.RAW_HOSTS
)
SELECT
    id AS host_id,
    NAME AS host_name,
    is_superhost,
    created_at,
    updated_at
FROM
    raw_hosts

Models

Code used in the lesson

DIM Listings

models/dim/dim_listings_cleansed.sql:

WITH src_listings AS (
  SELECT
    *
  FROM
    {{ ref('src_listings') }}
)
SELECT
  listing_id,
  listing_name,
  room_type,
  CASE
    WHEN minimum_nights = 0 THEN 1
    ELSE minimum_nights
  END AS minimum_nights,
  host_id,
  REPLACE(
    price_str,
    '$'
  ) :: NUMBER(
    10,
    2
  ) AS price,
  created_at,
  updated_at
FROM
  src_listings

DIM hosts

models/dim/dim_hosts_cleansed.sql:

{{
  config(
    materialized = 'view'
    )
}}

WITH src_hosts AS (
    SELECT
        *
    FROM
        {{ ref('src_hosts') }}
)
SELECT
    host_id,
    NVL(
        host_name,
        'Anonymous'
    ) AS host_name,
    is_superhost,
    created_at,
    updated_at
FROM
    src_hosts

Exercise

Create a new model in the models/dim/ folder called dim_hosts_cleansed.sql.

  • Use a CTE to reference the src_hosts model
  • SELECT every column and every record, and add a cleansing step to host_name:
    • If host_name is not null, keep the original value
    • If host_name is null, replace it with the value ‘Anonymous’
    • Use the NVL(column_name, default_null_value) function Execute dbt run and verify that your model has been created

Solution

WITH src_hosts AS (
    SELECT
        *
    FROM
        {{ ref('src_hosts') }}
)
SELECT
    host_id,
    NVL(
        host_name,
        'Anonymous'
    ) AS host_name,
    is_superhost,
    created_at,
    updated_at
FROM
    src_hosts

Incremental Models

The fct/fct_reviews.sql model:

{{
  config(
    materialized = 'incremental',
    on_schema_change='fail'
    )
}}
WITH src_reviews AS (
  SELECT * FROM {{ ref('src_reviews') }}
)
SELECT * FROM src_reviews
WHERE review_text is not null

{% if is_incremental() %}
  AND review_date > (select max(review_date) from {{ this }})
{% endif %}

Get every review for listing 3176:

SELECT * FROM "AIRBNB"."DEV"."FCT_REVIEWS" WHERE listing_id=3176;

Add a new record to the table:

INSERT INTO "AIRBNB"."RAW"."RAW_REVIEWS"
VALUES (3176, CURRENT_TIMESTAMP(), 'Zoltan', 'excellent stay!', 'positive');

Making a full-refresh:

dbt run --full-refresh

DIM listings with hosts

The contents of dim/dim_listings_w_hosts.sql:

WITH
l AS (
    SELECT
        *
    FROM
        {{ ref('dim_listings_cleansed') }}
),
h AS (
    SELECT *
    FROM {{ ref('dim_hosts_cleansed') }}
)

SELECT
    l.listing_id,
    l.listing_name,
    l.room_type,
    l.minimum_nights,
    l.price,
    l.host_id,
    h.host_name,
    h.is_superhost as host_is_superhost,
    l.created_at,
    GREATEST(l.updated_at, h.updated_at) as updated_at
FROM l
LEFT JOIN h ON (h.host_id = l.host_id)

Dropping the views after ephemeral materialization

DROP VIEW AIRBNB.DEV.SRC_HOSTS;
DROP VIEW AIRBNB.DEV.SRC_LISTINGS;
DROP VIEW AIRBNB.DEV.SRC_REVIEWS;

Sources and Seeds

Full Moon Dates CSV

Download the CSV from the lesson's Resources section, or download it from the following S3 location: https://dbt-datasets.s3.us-east-2.amazonaws.com/seed_full_moon_dates.csv

Then place it to the seeds folder

If you download from S3 on a Mac/Linux, you can import the CSV straight to your seed folder by executing this command:

curl https://dbt-datasets.s3.us-east-2.amazonaws.com/seed_full_moon_dates.csv -o seeds/seed_full_moon_dates.csv

Contents of models/sources.yml

sources:
  - name: airbnb
    schema: raw
    tables:
      - name: listings
        identifier: raw_listings

      - name: hosts
        identifier: raw_hosts

      - name: reviews
        identifier: raw_reviews
        config:
          loaded_at_field: date
          freshness:
            warn_after: {count: 1, period: hour}
            error_after: {count: 24, period: hour}

Source Freshness

Getting the exit code of the most recent process

# Mac/Linux:
echo $?

# Windows - Command Prompt (cmd)
dir C:\nonexistent
echo %ERRORLEVEL%

# Windows - PowerShell
$LASTEXITCODE

Contents of models/mart/mart_full_moon_reviews.sql

{{ config(
  materialized = 'table',
) }}

WITH fct_reviews AS (
    SELECT * FROM {{ ref('fct_reviews') }}
),
full_moon_dates AS (
    SELECT * FROM {{ ref('seed_full_moon_dates') }}
)

SELECT
  r.*,
  CASE
    WHEN fm.full_moon_date IS NULL THEN 'not full moon'
    ELSE 'full moon'
  END AS is_full_moon
FROM
  fct_reviews
  r
  LEFT JOIN full_moon_dates
  fm
  ON (TO_DATE(r.review_date) = DATEADD(DAY, 1, fm.full_moon_date))

Snapshots

Snapshots for listing

The contents of snapshots/scd_raw_listings.sql:

{% snapshot scd_raw_listings %}

{{
   config(
       target_schema='DEV',
       unique_key='id',
       strategy='timestamp',
       updated_at='updated_at',
       invalidate_hard_deletes=True
   )
}}

select * FROM {{ source('airbnb', 'listings') }}

{% endsnapshot %}

Updating the table

UPDATE AIRBNB.RAW.RAW_LISTINGS SET MINIMUM_NIGHTS=30,
    updated_at=CURRENT_TIMESTAMP() WHERE ID=3176;

SELECT * FROM AIRBNB.DEV.SCD_RAW_LISTINGS WHERE ID=3176;

Snapshots for hosts

The contents of snapshots/scd_raw_hosts.sql:

{% snapshot scd_raw_hosts %}

{{
   config(
       target_schema='dev',
       unique_key='id',
       strategy='timestamp',
       updated_at='updated_at',
       invalidate_hard_deletes=True
   )
}}

select * FROM {{ source('airbnb', 'hosts') }}

{% endsnapshot %}

Tests

Generic Tests

The contents of models/schema.yml:

models:
  - name: dim_listings_cleansed
    columns:

     - name: listing_id
       data_tests:
         - unique
         - not_null

     - name: host_id
       data_tests:
         - not_null
         - relationships:
             arguments:
               to: ref('dim_hosts_cleansed')
               field: host_id

     - name: room_type
       data_tests:
         - accepted_values:
             arguments:
               values: ['Entire home/apt',
                        'Private room',
                        'Shared room',
                        'Hotel room']

Storing Test Failures:

Add this to your dbt_project.yml:

data_tests:
  +store_failures: true
  +schema: _test_failures

Here is the link to Elementary Data if you want to take testing to the next level. :)

Singular test for minimum nights check

The contents of tests/dim_listings_minimum_nights.sql:

SELECT
    *
FROM
    {{ ref('dim_listings_cleansed') }}
WHERE minimum_nights < 1
LIMIT 10

Restricting test execution to a specific test

dbt test -s dim_listings_minimum_nights

Unit Tests

Add this to models/mart/unit_tests.yml:

unit_tests:
  - name: unittest_fullmoon_matcher
    model: mart_fullmoon_reviews
    given:
      - input: ref('fct_reviews')
        rows:
          - {review_date: '2025-01-13'}
          - {review_date: '2025-01-14'}
          - {review_date: '2025-01-15'}
      - input: ref('seed_full_moon_dates')
        rows:
          - {full_moon_date: '2025-01-14'}
    expect:
      rows:
        - {review_date: '2025-01-13', is_full_moon: "not full moon"}
        - {review_date: '2025-01-14', is_full_moon: "not full moon"}
        - {review_date: '2025-01-15', is_full_moon: "full moon"}

Restricting test execution to tests associated with a specific model

dbt test -s mart_fullmoon_reviews

Contracts

Add this to models/schema.yml:

  - name: dim_hosts_cleansed
    config:
        contract:
          enforced: true
    columns:
      - name: host_id
        data_type: integer
      - name: host_name
        data_type: string
      - name: is_superhost
        data_type: string
      - name: updated_at
        data_type: timestamp
      - name: created_at
        data_type: timestamp

Exercise

Create a singular test in tests/consistent_created_at.sql that checks that there is no review date that is submitted before its listing was created: Make sure that every review_date in fct_reviews is more recent than the associated created_at in dim_listings_cleansed.

Solution

SELECT * FROM {{ ref('dim_listings_cleansed') }} l
INNER JOIN {{ ref('fct_reviews') }} r
USING (listing_id)
WHERE l.created_at >= r.review_date

Custom Generic Tests

Add this to tests/generic/positive_values.sql:

{% test positive_values(model, column_name) %}
SELECT * FROM {{ model }} WHERE {{ column_name }} <= 0
{% endtest %}

Macros, Custom Tests and Packages

Macros

The contents of macros/no_nulls_in_columns.sql:

{% macro no_nulls_in_columns(model) %}
    SELECT * FROM {{ model }} WHERE
    {% for col in adapter.get_columns_in_relation(model) -%}
        {{ col.column }} IS NULL OR
    {% endfor %}
    FALSE
{% endmacro %}

The contents of tests/no_nulls_in_dim_listings.sql

{{ no_nulls_in_columns(ref('dim_listings_cleansed')) }}

Custom Generic Tests

The contents of tests/generic/positive_values.sql

{% test positive_values(model, column_name) %}
SELECT * FROM {{ model }} WHERE {{ column_name }} <= 0
{% endtest %}

Setting Test Parameters

The contents of tests/generic/minimum_row_count.sql:

{% test minimum_row_count(model, min_row_count) %}
{{ config(severity = 'warn') }}

SELECT
    COUNT(*) as cnt
FROM
    {{ model }}
HAVING
    COUNT(*) < {{ min_row_count }}
{% endtest %}

Test Severity

Add this to models/schema.yml:

  - name: dim_listings_cleansed
    description: Cleansed table which contains Airbnb listings.
    data_tests:
      - minimum_row_count:
          arguments:
            min_row_count: 1000
          severity: error

Packages

The contents of packages.yml:

packages:
  - package: dbt-labs/dbt_utils
    version: 1.3.0

The contents of models/fct_reviews.sql:

{{
  config(
    materialized = 'incremental',
    on_schema_change='fail'
    )
}}
WITH src_reviews AS (
  SELECT * FROM {{ ref('src_reviews') }}
)
SELECT
  {{ dbt_utils.generate_surrogate_key(['listing_id', 'review_date', 'reviewer_name', 'review_text']) }}
    AS review_id,
  *
  FROM src_reviews
WHERE review_text is not null
{% if is_incremental() %}
  AND review_date > (select max(review_date) from {{ this }})
{% endif %}

Documentation

The models/schema.yml after adding the documentation:


models:
  - name: dim_listings_cleansed
    description: Cleansed table which contains Airbnb listings.
    columns:

      - name: listing_id
        description: Primary key for the listing
        data_tests:
          - unique
          - not_null

      - name: host_id
        description: The host's id. References the host table.
        data_tests:
          - not_null
          - relationships:
              arguments:
                to: ref('dim_hosts_cleansed')
                field: host_id

      - name: room_type
        description: Type of the apartment / room
        data_tests:
          - accepted_values:
              arguments:
                values: ['Entire home/apt', 'Private room', 'Shared room', 'Hotel room']

      - name: minimum_nights
        description: '{{ doc("dim_listing_cleansed__minimum_nights") }}'
        data_tests:
          - positive_values

  - name: dim_hosts_cleansed
    columns:
      - name: host_id
        data_tests:
          - not_null
          - unique

      - name: host_name
        data_tests:
          - not_null

      - name: is_superhost
        data_tests:
          - accepted_values:
              arguments:
                values: ['t', 'f']

  - name: fct_reviews
    columns:
      - name: listing_id
        data_tests:
          - relationships:
              arguments:
                to: ref('dim_listings_cleansed')
                field: listing_id

      - name: reviewer_name
        data_tests:
          - not_null

      - name: review_sentiment
        data_tests:
          - accepted_values:
              arguments:
                values: ['positive', 'neutral', 'negative']

The contents of models/docs.md:

{% docs dim_listing_cleansed__minimum_nights %}
Minimum number of nights required to rent this property.

Keep in mind that old listings might have `minimum_nights` set
to 0 in the source tables. Our cleansing algorithm updates this to `1`.

{% enddocs %}

The contents of models/overview.md:

{% docs __overview__ %}
# Airbnb pipeline

Hey, welcome to our Airbnb pipeline documentation!

Here is the schema of our input data:
![input schema](https://dbt-datasets.s3.us-east-2.amazonaws.com/input_schema.png)

{% enddocs %}

Analyses, Hooks and Exposures

Analyses

The contents of analyses/full_moon_no_sleep.sql:

WITH fullmoon_reviews AS (
    SELECT * FROM {{ ref('mart_fullmoon_reviews') }}
)
SELECT
    is_full_moon,
    review_sentiment,
    COUNT(*) as reviews
FROM
    fullmoon_reviews
GROUP BY
    is_full_moon,
    review_sentiment
ORDER BY
    is_full_moon,
    review_sentiment

Hooks

Changes made to dbt_project.yml:

on-run-start:
  - "CREATE TABLE IF NOT EXISTS {{ target.schema }}.audit_log (
      model_name STRING,
      run_timestamp TIMESTAMP
    )"

models:
  airbnb:
    ...
    +post-hook:
      - "INSERT INTO {{ target.schema }}.audit_log VALUES ('{{ this }}', CURRENT_TIMESTAMP)"

Grants

Add grants to dbt_project.yml:

models:
  airbnb:
    grants:
      select: ["transform", "reporter"]

Creating a Dashboard in Preset

Getting the Snowflake credentials up to the screen:

  • Mac / Linux / Windows Powershell: cat ~/.dbt/profiles.yml
  • Windows (cmd): type %USERPROFILE%\.dbt\profiles.yml

Exposures

The contents of models/dashboard.yml:


exposures:
  - name: executive_dashboard
    label: Executive Dashboard
    type: dashboard
    maturity: low
    url: https://00d200da.us1a.app.preset.io/superset/dashboard/x/?edit=true&native_filters_key=fnn_HJZ0z42ZJtoX06x7gRbd9oBFgFLbnPlCW2o_aiBeZJi3bZuyfQuXE96xfgB
    description: Executive Dashboard about Airbnb listings and hosts

    depends_on:
      - ref('dim_listings_w_hosts')
      - ref('mart_fullmoon_reviews')

    owner:
      name: Zoltan C. Toth
      email: dbtstudent@gmail.com

Debugging Tests and Testing with dbt-expectations

For the final code in packages.yml, models/schema.yml and models/sources.yml, please refer to the course's Github repo: https://github.com/nordquant/complete-dbt-bootcamp-zero-to-hero

Testing a single model

dbt test --select dim_listings_w_hosts

Testing individual sources:

dbt test --select source:airbnb.listings

Debugging dbt

dbt --debug test --select dim_listings_w_hosts

Keep in mind that in the lecture we didn't use the --debug flag after all, as taking a look at the compiled SQL file is the better way of debugging tests.

Logging

The contents of macros/logging.sql:

{% macro learn_logging() %}
    {{ log("Call your mom!") }}
    {{ log("Call your dad!", info=True) }} {# Logs to the screen, too #}
--  {{ log("Call your dad!", info=True) }} {# This will be logged to the screen #}
    {# log("Call your dad!", info=True) #} {# This won't be executed #}
{% endmacro %}

Executing the macro:

dbt run-operation learn_logging

Variables

The contents of macros/variables.sql:

{% macro learn_variables() %}

    {% set your_name_jinja = "Zoltan" %}
    {{ log("Hello " ~ your_name_jinja, info=True) }}

    {{ log("Hello dbt user " ~ var("user_name", "NO USERNAME IS SET!!") ~ "!", info=True) }}

    {% if var("in_test", False) %}
       {{ log("In test", info=True) }}
    {% else %}
       {{ log("NOT in test", info=True) }}
    {% endif %}

{% endmacro %}

We've added the following block to the end of dbt_project.yml:

vars:
  user_name: default_user_name_for_this_project

An example of passing variables:

dbt run-operation learn_variables --vars "{user_name: zoltanctoth}"

More information on variable passing: https://docs.getdbt.com/docs/build/project-variables

dbt Orchestration

Links to different orchestrators

Dagster

Set up your environment

Let's install the dagster-dbt and the dagster-webserver package. These packages are located in requirements.txt.

pip install -r requirements.txt

Create a dagster project

dagster-dbt project scaffold --project-name my_dbt_dagster_project --dbt-project-dir=airbnb

At this point in the course, open schedules.py and uncomment the schedule logic.

Start dagster

Now that our project is created, start the Dagster server:

On Windows - PowerShell (Like the VSCode Terminal Window)
cd dbt_dagster_project
dagster dev

We will continue our work on the dagster UI at http://localhost:3000/

Making incremental models compatible with orchestrators:

The updated contents of models/fct/fct_reviews.sql:

{{
  config(
    materialized = 'incremental',
    on_schema_change='fail'
    )
}}
WITH src_reviews AS (
  SELECT * FROM {{ ref('src_reviews') }}
)
SELECT
  {{ dbt_utils.generate_surrogate_key(['listing_id', 'review_date', 'reviewer_name', 'review_text']) }} as review_id,
  *
FROM src_reviews
WHERE review_text is not null

{% if is_incremental() %}
  {% if var("start_date", False) and var("end_date", False) %}
    {{ log('Loading ' ~ this ~ ' incrementally (start_date: ' ~ var("start_date") ~ ', end_date: ' ~ var("end_date") ~ ')', info=True) }}
    AND review_date >= '{{ var("start_date") }}'
    AND review_date < '{{ var("end_date") }}'
  {% else %}
    AND review_date > (select max(review_date) from {{ this }})
    {{ log('Loading ' ~ this ~ ' incrementally (all missing dates)', info=True)}}
  {% endif %}
{% endif %}

Passing a time range to our incremental model:

dbt run --select fct_reviews  --vars '{start_date: "2024-02-15 00:00:00", end_date: "2024-03-15 23:59:59"}'

Reference - Working with incremental strategies: https://docs.getdbt.com/docs/build/incremental-models#about-incremental_strategy


Data Pipeline Architecture

ETL vs ELT

[!NOTE] The choice between ETL and ELT often depends on the target storage capabilities and latency requirements.

ETL: Extract, Transform, Load

Transform data before loading into the warehouse.

  • Best for: Sensitive data masking, legacy systems, strict compliance needs.
  • Pros: Clean data in DW, less storage cost.
  • Cons: Slower ingestion, rigid pipeline.

ELT: Extract, Load, Transform

Load raw data first, transform in the warehouse.

  • Best for: Modern cloud warehouses (Snowflake, BigQuery), rapid ingestion.
  • Pros: Speed, flexibility (schema-on-read), raw data always available.
  • Cons: higher storage cost, complex transformations in SQL.
graph LR
    subgraph ETL
    S1[Source] --> T[Transform Engine]
    T --> W[Warehouse]
    end
    subgraph ELT
    S2[Source] --> W2[Warehouse Staging]
    W2 --> T2[Transform in DB]
    end

Batch vs Stream Processing

  • Batch: Process large volumes at scheduled intervals (e.g., nightly jobs).
    • Tools: Apache Spark (Classic), DBT runs.
  • Stream: Process data in real-time as it arrives.
    • Tools: Kafka, Flink, Spark Structured Streaming.

Data Storage Systems

Data Warehouses

Structured, optimized for analytics.

  • Examples: Snowflake, Redshift, BigQuery.
  • Characteristics:
    • OLAP (Online Analytical Processing) workloads.
    • Columnar storage: Optimizes read heavy queries (aggregations).

Data Lakes

Store raw data in native format.

  • Examples: AWS S3, HDFS, Azure Data Lake.
  • Characteristics:
    • Schema-on-read: Structure is applied when data is read.
    • Supports structured (CSV, Parquet) & unstructured (JSON, Logs, Images) data.

Lakehouse Architecture

Combines warehouse performance with lake flexibility.

  • Core Tech: Open table formats like Delta Lake, Apache Iceberg, Apache Hudi.
  • Benefit: ACID transactions on Data Lake storage.

Processing Frameworks

Apache Spark

  • Distributed computing for large-scale data processing.
  • Key Feature: In-memory processing (much faster than MapReduce).
  • Supports both Batch and Streaming.

Apache Kafka

  • Distributed streaming platform.
  • Use Case: Event sourcing, real-time data pipelines, decoupling systems.

Apache Airflow

  • Workflow orchestration and scheduling.
  • DAGs (Directed Acyclic Graphs): Define tasks and dependencies as code (Python).

Data Modeling

Dimensional Modeling (Kimball)

  • Star Schema: Central Fact Table (metrics) linked to Dimension Tables (context - who, what, where).
  • Snowflake Schema: Normalized dimensions (dimensions split into sub-dimensions).

Data Vault

  • Hubs (Keys), Links (Relationships), Satellites (Attributes).
  • Goal: Flexible enterprise data warehousing handling historical data and source system changes auditably.

Data Quality & Governance

Data Quality Dimensions

  1. Completeness: Is all data present?
  2. Accuracy: Is it correct?
  3. Consistency: Does it match across systems?
  4. Timeliness: Is it fresh?

Implementation: Data profiling, validation rules (Great Expectations, DBT Tests).

Data Lineage

  • Track data flow from source to destination.
  • Critical for Impact Analysis (upstream changes) and Debugging (downstream errors).

Schema Evolution

  • Capabilities to handle changes in data structure over time without breaking pipelines.
  • Strategies: Backward compatibility (new code can read old data), Forward compatibility.

Modern Concepts

Change Data Capture (CDC)

  • Capture database changes (Insert, Update, Delete) in real-time from transaction logs.
  • Tools: Debezium, AWS DMS.

Data Mesh

  • Paradigm Shift: Decentralized data architecture.
  • Principles:
    • Domain-owned data products.
    • Data as a Product.
    • Self-serve data infrastructure.
    • Federated governance.

DataOps

  • Definition: DevOps principles applied to data pipelines.
  • Pillars:
    • CI/CD for Data pipelines.
    • Automated Testing (Data & Code).
    • Monitoring & Observability.
    • Collaboration.

Key Technologies Setup (Mac/Linux)

CategoryTools
Cloud PlatformsAWS (Glue, EMR), GCP (Dataflow, BigQuery), Azure (Data Factory, Synapse)
ContainerizationDocker, Kubernetes (Scalable execution)
IaCTerraform, CloudFormation (Reproducible infra)

Python Data Types and Their Sizes in Memory

ℹ️ Memory size of Python objects can vary depending on the implementation (CPython, PyPy, etc.) and the system architecture (32-bit vs 64-bit). Values below are based on CPython 3.10+ on a 64-bit system using the sys.getsizeof() function.

📦 Built-in Data Types

Data TypeExampleTypical Size (Bytes)Notes
int4228 (base)Grows with value size (arbitrary precision)
float3.1424Fixed 64-bit double precision
complex1+2j32Two floats (real + imaginary)
boolTrue28Subclass of int
str"a"49Varies based on string length
bytesb"a"33One byte + overhead
bytearraybytearray(1)56Mutable bytes
list[1, 2, 3]88Dynamic array; grows with elements
tuple(1, 2, 3)72Fixed-size immutable sequence
set{1, 2, 3}216Unordered, unique items
frozensetfrozenset()216Immutable version of set
dict{"a": 1}232Hash map; grows with items
NoneTypeNone16Singleton object

🔍 Measuring Size in Python

You can use the sys.getsizeof() function to check the size of an object in memory:

import sys

print(sys.getsizeof(0))           # int
print(sys.getsizeof(3.14))        # float
print(sys.getsizeof("Hello"))     # str
print(sys.getsizeof([1, 2, 3]))   # list

Downcasting Large DataFrames in Python

Downcasting in pandas refers to converting data types to smaller, more memory-efficient types to reduce memory usage and improve performance.

Why Downcast DataFrames?

Large datasets can consume significant memory if default types like int64 or float64 are used unnecessarily. Downcasting:

  • Reduces RAM usage
  • Speeds up data processing
  • Makes DataFrames more efficient to store or transmit

Example

Before downcasting:

import pandas as pd
import numpy as np

df = pd.DataFrame({
    'int_col': np.random.randint(0, 100, size=1_000_000),
    'float_col': np.random.rand(1_000_000)
})

print(df.info(memory_usage='deep'))

Linked to this note