DBT는 데이터 가공에 있어 Transformation에 집중된 Workflow 를 제공해주는 도구.
{% test not_null(model, column_name) %}
select *
from {{ model }}
where {{ column_name }} is null
{% endtest %}
version: 2
models:
- name: orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['placed', 'shipped', 'completed', 'returned']
- name: customer_id
tests:
- relationships:
to: ref('customers')
field: id
DBT를 편리하게 사용하기 위해서 웹 IDE를 제공해주는 서비스이다. DBT CLI를 통해 할 수 있는 모든 기능을 할 수가 있고 job기능을 이용해서 Ingestion 파이프라인을 구축할 수도 있다.
하지만 무료플랜으로는 하나의 계정만 사용이 가능하다. 팀이 같이 사용하기 위해서는 인당 달에 50달러를 사용해야한다.
DBT 에서 제공하는 모든 기능을 사용할 수 있지만, 실제 production 운영을 하기위해서는 많은 부분들을 수동으로 세팅 해주어야하는 단점이 있다. 하지만 무료라는점…
모든 DA 들에게 Cloud 계정을 주기에는 금전적인 부담이 있기 때문에 CLI 셋팅을 하고 DA들의 진입장벽을 낮추기 위한 가이드라인을 잘 작성해주기로 결정.
pip 를 이용하여 dbt 를 설치할 수 있다. 사용하는 방식에 맞게 package를 설치할 수 있는데 우선 기본적으로 core package를 다운 받는다. 어떤 data warehouse를 사용하느냐에 따라서 추가 package를 받으면 된다. ( 버전은 상황에 맞게 사용하면 된다. )
pip install dbt-core==1.2.0
##
# dbt-postgres
##
RUN python -m pip install dbt-postgres==1.2.0
##
# dbt-bigquery
##
RUN python -m pip install dbt-bigquery==1.2.0
##
# dbt-databrick
##
RUN python -m pip install dbt-databrick==1.2.0
##
# dbt-snowflake
##
RUN python -m pip install dbt-snowflake==1.2.0
##
# dbt-redshift
##
RUN python -m pip install dbt-redshift==1.2.0
설치가 완료되고 나면 dbt cli를 사용할 수 있다.
dbt --version
Core:
- installed: 1.2.0
- latest: 1.2.0 - Up to date!
Plugins:
- bigquery: 1.2.0 - Up to date!
dbt 를 사용하기 위해서 우선 profile을 작성해주어야한다.
profile이란 어떤 warehouse에 connection을 맺을지에 대한 configuration 값을 정리해놓은 yml파일이다. 설정한 값은 default로 따로 sql 파일에서 jinja template 을 이용하여 정의해줄 수 있다. {{ config(database=”something”, schema=”something”) }}
dbt의 project를 처음 만들게 될 때 dbt init
cli 를 사용할 수 있는데 해당 명령어를 사용하면 기본 구조를 만들어준다.
dbt init 코드 예시
(env) (base) ➜ dbt-init-test dbt init
08:30:42 Running with dbt=1.2.0
08:30:42 Creating dbt configuration folder at /Users/mason/.dbt
Enter a name for your project (letters, digits, underscore): dbt_test
Which database would you like to use?
[1] bigquery
(Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)
Enter a number: 1
[1] oauth
[2] service_account
Desired authentication method option (enter a number): 2
keyfile (/path/to/bigquery/keyfile.json): .
project (GCP project id): tech-dbt-test
dataset (the name of your dbt dataset): test_1
threads (1 or more): 1
job_execution_timeout_seconds [300]:
[1] US
[2] EU
Desired location option (enter a number): 1
08:31:58 Profile dbt_test written to /Users/mason/.dbt/profiles.yml using target's profile_template.yml and your supplied values. Run 'dbt debug' to validate the connection.
08:31:58
Your new dbt project "dbt_test" was created!
For more information on how to configure the profiles.yml file,
please consult the dbt documentation here:
https://docs.getdbt.com/docs/configure-your-profile
One more thing:
Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:
https://community.getdbt.com/
Happy modeling!
모든 입력을 마치고 나면 현재 경로에 project directory가 형성되게 되고 profile 은
~/.dbt/profile.json
경로로 .json 파일이 형성되게 된다.
profile.json 예시
dbt_test: # profile 이름
outputs:
dev:
dataset: test_1
job_execution_timeout_seconds: 300
job_retries: 1
keyfile: .
location: US
method: service-account
priority: interactive
project: tech-dbt-test
threads: 1
type: bigquery
# 추가 될 수 있는 target 의 예시
# prod:
# dataset: test_1
# job_execution_timeout_seconds: 300
# job_retries: 1
# keyfile: "${{ env_var('GOOGLE_KEYFILE_PATH', ''}}" env_var를 이용하여 환경변수값을 사용할 수도 있다.
# location: US
# method: service-account
# priority: interactive
# project: tech-dbt-prod
# threads: 1
# type: bigquery
target: dev
dbt run --target dev
와 같은 방식으로 할 수 있다.version: 2
sources:
- name: source_name
database: |
{%- if target.name == "dev" -%} raw_dev
{%- elif target.name == "qa" -%} raw_qa
{%- elif target.name == "prod" -%} raw_prod
{%- else -%} invalid_database
{%- endif -%}
schema: source_schema
이외에도 다양한 config 값들을 정의할 수 있다.
Config 값 예시
config:
send_anonymous_usage_stats: <true | false>
use_colors: <true | false>
partial_parse: <true | false>
printer_width: <integer>
write_json: <true | false>
warn_error: <true | false>
log_format: <true | false>
debug: <true | false>
version_check: <true | false>
fail_fast: <true | false>
use_experimental_parser: <true | false>
static_parser: <true | false>
본인의 상황에 맞게 config 값들을 설정해주면 된다.
더 다양한 폴더 구조를 이용할 수 있지만 dbt init으로 생성된 default에 대해서만 정리해보면 아래 folder tree 로 나타낼 수 있다.
folder 구조 예시
├── analyses
├── models
| └── example
| ├── my_first_dbt_model.sql
| ├── my_second_dbt_model.sql
| └── schema.yml
├── seeds
├── data
├── macros
├── tests
├── snapshots
├── .gitignore
├── README.md
└── dbt_project.yml
create view as
, create table as
로 wrapping 되게 된다.{% macro cents_to_dollars(column_name, precision=2) %}
({{ column_name }} / 100)::numeric(16, {{ precision }})
{% endmacro %}
select
id as payment_id,
{{ cents_to_dollars('amount') }} as amount_usd,
...
from app_data.payments
{% test not_null(model, column_name) %}
select *
from {{ model }}
where {{ column_name }} is null
{% endtest %}
테스트를 진행했을 때 1개이상의 row가 나온다면 fail 처리가 된다.# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'test_dbt'
version: '1.0.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: 'test_dbt'
# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
models:
test_dbt:
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
기본적으로 주석이 달려있어 이해하기 어렵지 않다. 그중에서 models 에 +materialized: view로 정의된 부분을 볼 수 있는데 해당 파라미터를 통해서 해당 model folder에 있는 sql 파일을 만들 때 default로 view를 할지, table을 할지 선택할 수 있다. 추가적으로 incremental이란 옵션이 있는데 해당 옵션은 해당 table의 timestamp 값을 기준으로 append 하는 방식으로 row를 insert 하는 방식이다. name 은 내가 지정하고자 하는 프로젝트 폴더의 이름을 넣어주면 된다.아래 cli 를 사용할 때 따로 옵션을 주지 않으면 ~/.dbt/profile.json path로 profile을 찾게 된다. 또한 project도 현재 directory를 기준으로 dbt_project.yml
파일을 찾게된다. 혹시 따로 profile을 관리하거나 project dir을 따로 지정해주고 싶다면 --profiles-dir [path]
, —-project-dir [path]
옵션으로 설정이 가능하다.
packages:
- package: dbt-labs/dbt_utils
version: 0.7.1
- package: tailsdotcom/dbt_artifacts
version: 0.5.0-a1
install-prerelease: true
- package: dbt-labs/codegen
version: 0.4.0
- package: calogica/dbt_expectations
version: 0.4.1
- git: https://github.com/dbt-labs/dbt-audit-helper.git
revision: 0.4.0
- git: "https://github.com/dbt-labs/dbt-labs-experimental-features" # git URL
subdirectory: "materialized-views" # name of subdirectory containing `dbt_project.yml`
revision: 0.0.1
- package: dbt-labs/snowplow
version: 0.13.0
import 된 project의 model들도 사용할 수 있게 된다. cross project가 가능.
with source as (
{#-
Normally we would select from the table here, but we are using seeds to load
our data in this project
#}
select * from {{ ref('raw_customer') }}
),
renamed as (
select
id as customer_id,
first_name,
last_name
from source
)
select * from renamed
with source as (
select * from `tech-dbt-test`.`test_1`.`raw_customer`
),
renamed as (
select
id as customer_id,
first_name,
last_name
from source
)
select * from renamed
—-full-refresh
기능을 통해 incremantal model을 처음부터 다시 쌓을 수 있게 설정도 가능.해당 방식을 찾아본 이유는 Bigquery 에서의 과금문제 때문에 어떤 방식으로 이루어지는지 알아볼 필요가 있었다.
크게 두 가지 방식이 있다.
1. merge ( default )
2. insert_overwrite ( partition table에 사용하면 좋음 )
merge statement 방식은 DW가 merge statement를 지원한다면 default로 적용되는 방식이다. bigquery 는 merege statement를 지원하기 때문에 default로 사용이되고 있음. 실제 코드를 보면 아래와 같다.
merge into {{ destination_table }} DEST
using ({{ model_sql }}) SRC
on SRC.{{ unique_key }} = DEST.{{ unique_key }}
when matched then update ...
when not matched then insert ...
기존에 있을 시 update, 새로운 row는 insert하는 방식이다.
incremental model을 만들 때는 기본적으로 unique_key값을 등록해주고 unique_key를 기준으로 어떤 조건을 걸어서 incremental을 진행할지 정해주면 된다.
{{
config(
materialized='incremental',
unique_key='date_day'
)
}}
select
date_trunc('day', event_at) as date_day,
count(distinct user_id) as daily_active_users
from raw_app_data.events
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where date_day >= (select max(date_day) from {{ this }})
{% endif %}
group by 1
임시테이블을 만들어 새로 partiion을 적용하여 overwrite하고 목적 테이블에 merge 하는 방식으로 진행된다.
/*
Create a temporary table from the model SQL
*/
create temporary table {{ model_name }}__dbt_tmp as (
{{ model_sql }}
);
/*
If applicable, determine the partitions to overwrite by
querying the temp table.
*/
declare dbt_partitions_for_replacement array<date>;
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct date(max_tstamp))
from `my_project`.`my_dataset`.{{ model_name }}__dbt_tmp
);
/*
Overwrite partitions in the destination table which match
the partitions in the temporary table
*/
merge into {{ destination_table }} DEST
using {{ model_name }}__dbt_tmp SRC
on FALSE
when not matched by source and {{ partition_column }} in unnest(dbt_partitions_for_replacement)
then delete
when not matched then insert ...
incremental_strategy 는 yml, sql 에서 모두 설정이 가능하다
models:
+incremental_strategy: "insert_overwrite"
{{
config(
materialized='incremental',
unique_key='date_day',
incremental_strategy='insert_override',
...
)
}}
select ...