airflow + dbt

문주은·2024년 5월 10일
0

1. dbt core basic

1-1. Install

default pip install dbt는 dbt cloud 가 설치되므로 dbt-core설치 필수

pip install dbt-core

# check
> dbt --version
Core:
  - installed: 1.7.13
  - latest:    1.8.0  - Update available!

  Your version of dbt-core is out of date!
  You can find instructions for upgrading here:
  https://docs.getdbt.com/docs/installation

1-2. Create dbt project

> dbt init {project_name}

## output
> tree
.
├─dags
└─config
└─dbt
    ├─logs
    ├─{project_name}
    ├─analyses
    ├─macros
    ├─models
    │  └─example
    ├─seeds
    ├─snapshots
    └─tests
└─docker-compose.yaml
└─Dockerfile
└─entrypoint.sh
  • profile.yml
    profile.yml in windows directory : C:\Users\Username\ .dbt
    profile.yml in ubuntu directory : /home/airflow/.dbt
    profile.yml setting

  • packages to install

> tree
.
├─dags
└─config
└─dbt
    ├─logs
    ├─{project_name}
    ├─analyses
    ├─macros
    ├─models
    │  └─example
    ├─seeds
    ├─snapshots
    └─tests
└─dbt_project.yml
└─packages.yml
└─docker-compose.yaml
└─Dockerfile
└─entrypoint.sh
  • Create packages.yml file same with dbt_project.yml level.

    packages:
      - package: fivetran/sap_source
        version: [">=0.1.0", "<0.2.0"]
    
      - package: fivetran/fivetran_utils
        version: [">=0.4.0", "<0.5.0"]
    
      - package: dbt-labs/dbt_utils
        version: [">=1.3.0", "<2.0.0"]
    
      - package: dbt-labs/spark_utils
        version: [">=0.3.0", "<0.4.0"]
  • run dbt deps to install packages

1-3. Define model

  • dbt/project_name/models/customer_orders.sql
select
	customer_id, 
    count(*) as order_count
from
    orders
group by
    customer_id;

1-4. Run model

# Activate env installed DBT 
> conda activate airflow

# Run all models
> dbt run

# Run specific model
> dbt run --models {model_name}
  • 실행 결과는 target directory에 저장

2. DBT custom parameter setting

2-1. macros

  • macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}

        {{ default_schema }}

    {%- else -%}

        {{ custom_schema_name | trim }}

    {%- endif -%}

{%- endmacro %}

2-2. models

  • models/L0/tmp_model.sql
-- example 
{{ 
  config(
    materialized='incremental',
    unique_key='id',
    incremental_strategy='delete+insert',
    schema=generate_schema_name('L1'),
    alias='new_table'
  )
}}

WITH TARGET_DATA AS (
    SELECT *
    FROM tablename,
    {% if is_incremental() %}
        WHERE START_DATE < current_date AND END_DATE > current_date
    {% endif %}
),

...

SOURCE_DATA AS (...)
SELECT * 
FROM SOURCE_DATA

[Explanation model]

  • materialized='incremental': 기존 데이터를 유지하면서 새로운 데이터를 추가(append)
  • unique_key : 중복된 행을 식별하는 데 사용되는 열을 지정
  • incremental_strategy='delete+insert': 데이터를 업데이트하기 위해 unique_key('id')에 해당하는 데이터를 삭제한 후 새로운 데이터를 삽입하는 방식
  • schema=generate_schema_name('L1') : L1 schema 사용
    • default schema : C:\Users\Username.dbt\profiles.yml에서 projectname.outputs.dev.schema에 저장된 값이 default
  • alias='new_table' : new_table이라는 table name으로 저장
    • default table name : filename.sql에서 filename이 default table name으로 custom 하게 설정

[Caution❗️]
{% if is_incremental() %} ~ {% endif %}

  • START_DATE, END_DATE 상관없이 처음 데이터를 모두 저장할 때 명령어:
    dbt run --full-refresh --select {model_name.sql}
  • 그 이후 START_DATE ~ END_DATE 사이의 데이터를 저장할 때 사용하는 명령어:
    dbt run --select {model_name.sql}

Etc. Error and solving

Error name : KeyError: '://macros/generate_schema_name.sql'

1) Detail

airflow@:/opt/airflow/dbt/saleshub$ dbt run --models dim_project_info
03:45:43  Running with dbt=1.7.13
03:45:47  Registered adapter: snowflake=1.7.3
03:45:50  Encountered an error:
'project://macros/generate_schema_name.sql'
...
KeyError: 'project://macros/generate_schema_name.sql'

2) Causes

Since I changed dbt profiles.yml file location in airflow container, there's a dbt cache problem or compile directory problem

3) Solution

Solution 1) Clean up dbt chache

# in the airflow container
$ dbt clean
$ dbt compile
profile
Data Engineer

0개의 댓글