1. DBT란
1-1. DBT란?
- dbt는 Data Warehouse 에서 데이터의 변환(transformation)하는 것을 정의, 변환을 오케스트레이션 및 실행하는 라이브러리
- 모든 데이터 변환을 중앙 집중화
- 재사용 가능한 모듈로 구성 가능
- 데이터 모델링 파일을 관리하여 sql을 문서화해서 형상 관리 가능
1-2. DBT 주요 기능
- 데이터 모델링 코드 재사용
- 데이터 품질 품질 검사, 자동화된 품질 테스트
- sql 형상 관리를 통해 변경 사항 추적 및 개발 파이프라인과 연동 가능
- 데이터 웨어하우스와 함께 작동되어 대용량 데이터 처리를 쉽게 확장 가능
1-3. DBT resources
| resource | explanation |
|---|
| models | data model, 각 모델은 단일 파일에 있음. |
| snapshots | 변경 가능한 테이블의 상태를 캡쳐하여 데이터 변경 이력 추적 (주로 Fact Table) |
| seeds | DataPlatform(snowflake)에 로드할 수 있는 초기 csv 데이터 적재 |
| tests | 프로젝트 모델 테스트하는 sql query |
| macros | 여러번 재사용할 수 있는 코드 블록 |
| docs | build할 수 있는 문서 |
| sources | data load tool, data warehouse에 로드된 데이터 이름 지정 및 설명 |
| exposures | 프로젝트의 다운스트림 사용을 정의 및 설명 |
| metrics | 프로젝트에 대한 메트릭 정의 |
| analysis | 프로젝트에서 분석 sql query를 구성하는 방법 |
2. How to save dbt model?
2-1. Materialization
- 모델이 데이터베이스에 저장하는 방식을 지정
- 모델의 결과를 저장하고 쿼리할 수 있는 형식을 지정
- Transformation or Data cleansing 수행
2-1-1. DBT Materialization options
1) Table
- 모델 결과를 Database의 테이블에 저장
- 쿼리 실행시 캐싱 처리로 같은 쿼리라면 매번 계산하지 않아서 성능 향상
{{
config(
materialized='table',
sort='timestamp',
dist='user_id')
}}
select *
from ...
create or replace transient table {table_name} as (
...
)
2) View
- 모델 결과를 Database의 View로 저장
- 실제 데이터가 저장되지 않고 모델을 실행할 때마다 쿼리 실행
- 특정 데이터만 필요한 경우 결과를 저장하지 않기 때문에 저장 공간 절약 가능
- 데이터의 실시간 업데이트나 반영이 필요한 경우 사용
{{
config(
materialized='view',
indexes=[{'columns': ['col_a'], 'cluster': 'cluster_a'}]) }}
indexes=[{'columns': ['symbol']}])
}}
select ...
- Incremental
- insert 역할
- 전체 데이터를 다시 처리할 필요가 없어 성능을 향상 가능
- 주로 Fact 테이블 (과거 레코드 수정 필요 X)
{{
config(
materialized='incremental',
unique_key='date_day',
incremental_strategy='delete+insert',
...
)
}}
select ...
insert into {table_name} as (...)
(...)
- Seed
- 정적인 데이터 모델 제공하는데 사용
- 초기 자원 테이블 데이터 제공하는데 사용
- Snapshot
- 지정된 시점의 데이터를 스냅샷 형태로 저장
- 특정 시점의 데이터 상태를 캡쳐 및 추적 가능
2-1-2. materialized = 'incremental'
{{
config(
materialized='incremental',
unique_key=['id', 'current_date'],
incremental_strategy='delete+insert',
schema=generate_schema_name('L1'),
alias='new_table'
)
}}
WITH TARGET_DATA AS (
SELECT *
FROM tablename,
{% if is_incremental() %}
WHERE START_DATE < current_date AND END_DATE > current_date
{% endif %}
),
...
SOURCE_DATA AS (...)
SELECT *
FROM SOURCE_DATA
- materialized='incremental': 기존 데이터를 유지하면서 새로운 데이터를 추가
- unique_key : 중복된 행을 식별하는 데 사용되는 열을 지정
- incremental_strategy='delete+insert': 데이터를 업데이트하기 위해 unique_key에 해당하는 데이터를 삭제한 후 새로운 데이터를 삽입하는 방식