데이터 분석가가 BigQuery에서 데이터 변환을 수행하는 복잡한 SQL 워크플로를 개발, 테스트, 버전 관리 및 예약하는 서비스.
Dataform을 사용하면 ELT(추출, 로드, 변환) 프로세스에서 데이터 변환을 관리할 수 있다.
특징
목표: 기상 데이터와 관측소 데이터를 결합하고 거기에 항공기 정시 도착 데이터를 맞추어 날씨와 항공기의 지연 관계를 확인하는 데이터를 만드는 것.
사용할 데이터
기상 데이터
관측소 데이터
항공기 정시 도착 데이터
저장소 생성
저장소를 생성했을 때 자동으로 만들어지는 서비스 계정에 BigQuery에 대한 액세스 권한을 부여해줘야 한다.
Dataform에서 사용되는 서비스 계정에 필요한 BigQuery 역할 부여를 보면 BigQuery 작업 사용자 및 BigQuery 데이터 편집자 권한을 부여하면 될 듯 하다.
여기선 그냥 소유자 권한 줌...
(실제 production에 도입시에는 소유자 권한은 함부로 부여하지 않는 걸 권장한다.)
개발 작업공간 생성
개발 작업공간이란 Git으로 말하면 브랜치와 같은 위치 지정이라고 생각하면 된다.
공동 개발의 경우 개발자가 각각 Git에서 브랜치를 잘라 개발하는 것과 마찬가지로 Dataform에서는 작업 공간을 개발자가 각각 작성하여 작업 공간을 나눌 수 있어 다른 멤버에 영향 없이 변경 내용을 저장소에 commit/push 할 수 있게 된다.
작업공간 초기화
기본으로 나오는 2개의 SQLX 파일은 불필요하므로 삭제해도 된다.
데이터 소스를 저장하는 디렉토리와 실제 처리를 진행하는 디렉토리를 나눌것이다.
아래와 같은 방식으로 definition폴더 밑에 sources, tables 디렉토리를 각각 생성해준다.
sources 폴더에서 새 파일 만들기로 데이터 소스 3개를 불러올 것이다.
sources/ghcnd_2012.sqlx
config {
type: "declaration",
database: "bigquery-public-data",
schema: "ghcn_d",
name: "ghcnd_2012"
}
sources/ghcnd_stations.sqlx
config {
type: "declaration",
database: "bigquery-public-data",
schema: "ghcn_d",
name: "ghcnd_stations"
}
sources/airline_ontime_data_flight.sqlx
config {
type: "declaration",
database: "bigquery-samples",
schema: "airline_ontime_data",
name: "flights"
}
분석 비용 절감을 위해 테이블을 보통 많이 파티셔닝하기 때문에 여기서도 파티셔닝을 한 다음 실제 변환 작업을 해볼 것이다.
tables/ghcnd_2012_by_month.sqlx
(기상 데이터 파티셔닝, 미국 데이터로 범위를 좁혔다.)
config {
type: "table",
bigquery: {
partitionBy: "DATE_TRUNC(date, MONTH)"
}
}
SELECT
id,
date,
element,
value,
qflag
FROM
${ref("ghcnd_2012")}
WHERE
id like 'US%'
tables/flight_by_month.sqlx
(항공기 정시 도착 데이터 파티셔닝)
config {
type: "table",
bigquery: {
partitionBy: "DATE_TRUNC(date_formatted, MONTH)"
}
}
SELECT
PARSE_DATE("%F", date) AS date_formatted,
departure_delay,
departure_airport,
arrival_airport,
FROM
${ref("flights")}
데이터 소스를 만들 때와 다른 점은 config 블록의 설정 내용, body 블록에 SQL이 정이되어 있는데 중요한 것은 ${ref("ghcnd_2012")} 이 부분!
이 ref 함수는 SQL 워크플로에서 모든 테이블, 데이터 소스 선언 또는 커스텀 SQL 작업을 참조하고 자동으로 종속되도록 하는 Dataform Core에서 기본 제공되는 함수. 이 형식으로 테이블을 참조하면 Dataform이 종속성을 정의해준다.
종속성이 정의되면 DAG에서도 그래프로 확인할 수 있다!
어떤 관측소에서 측정된 기상 데이터인지 알 수 있도록 관측소 데이터를 조인하고 관측소 이름에 애틀랜타를 포함하는 데이터에서 강수량을 얻어볼 것이다.
(데이터량이 많기 때문에, 반 년 마다의(2012년 1월~6월, 7월~12월) 테이블을 작성)
tables/atlanta_weather_1to6.sqlx
config {
type: "table",
schema: "dataform",
name: "atlanta_weather_1to6",
description: "ATALANTA 1월 ~ 6월 강수",
columns: {
id: "id",
name: "관측소명",
date: "일",
qflag: "qflag",
prcp: "강수량",
}
}
SELECT
id,
name,
date,
MAX(prcp) AS prcp,
qflag
FROM (
SELECT
wd.id,
s.name,
wd.date,
wd.qflag,
IF( wd.element = 'PRCP', wd.value / 10, NULL ) AS prcp
FROM
${ref("ghcnd_2012_by_month")} AS wd
JOIN
${ref("ghcnd_stations")} AS s
ON
wd.id = s.id
WHERE
wd.date BETWEEN "2012-01-01" AND "2012-06-30"
AND s.name LIKE "%ATLANTA%")
GROUP BY
id,name,date,qflag
ORDER BY
date
tables/atlanta_weather_7to12.sqlx
config {
type: "table",
schema: "dataform",
name: "atlanta_weather_7to12",
description: "ATALANTA 7월 ~ 12월 강수",
columns: {
id: "id",
name: "관측소명",
date: "일",
qflag: "qflag",
prcp: "강수량"
}
}
SELECT
id,
name,
date,
qflag,
MAX(prcp) AS prcp
FROM (
SELECT
wd.id,
s.name,
wd.date,
wd.qflag,
IF( wd.element = 'PRCP', wd.value / 10, NULL ) AS prcp
FROM
${ref("ghcnd_2012_by_month")} AS wd
JOIN
${ref("ghcnd_stations")} AS s
ON
wd.id = s.id
WHERE
wd.date BETWEEN "2012-07-01" AND "2012-12-31"
AND s.name LIKE "%ATLANTA%")
GROUP BY
id,name,date,qflag
ORDER BY
date
위에서 생성한 애틀랜타의 강수량 데이터와 항공기 정시 도착 데이터의 파티션 테이블을 조인하여 강수량과 항공기 지연에 관계가 있는지 확인하는 뷰를 작성해볼 것이다.
해당 뷰에선 다음의 데이터를 확인할 수 있다.
tables/relation_flight_weather_1to6.sqlx
config {
type: "view",
schema: "dataform",
name: "relation_flight_weather_1to6",
description: "항공기 지연과 날씨의 관계를 보는 View(1월~6월)",
columns: {
date: "일",
name: "관측소명",
prcp: "강수량",
departure_delay: "출발 지연",
arrival_airport: "도착 공항"
}
}
SELECT
wx.date,
wx.name,
wx.prcp,
f.departure_delay,
f.arrival_airport
FROM (
SELECT
STRING(date) AS date,
name,
prcp
FROM
${ref("atlanta_weather_1to6")}
WHERE
name like 'ATLANTA HARTSFIELD%'
AND qflag IS NULL) AS wx
JOIN
${ref("flight_by_month")} AS f
ON
STRING(f.date_formatted) = wx.date
WHERE
f.departure_airport = 'ATL' and
f.date_formatted BETWEEN "2012-01-01" AND "2012-06-30"
tables/relation_flight_weather_7to12.sqlx
config {
type: "view",
schema: "dataform",
name: "relation_flight_weather_7to12",
description: "항공기 지연과 날씨의 관계를 보는 View(7월~12월)",
columns: {
date: "일",
name: "관측소명",
prcp: "강수량",
departure_delay: "출발 지연",
arrival_airport: "도착 공항"
}
}
SELECT
wx.date,
wx.name,
wx.prcp,
f.departure_delay,
f.arrival_airport
FROM (
SELECT
STRING(date) AS date,
name,
prcp
FROM
${ref("atlanta_weather_7to12")}
WHERE
name like 'ATLANTA HARTSFIELD%'
AND qflag IS NULL) AS wx
JOIN
${ref("flight_by_month")} AS f
ON
STRING(f.date_formatted) = wx.date
WHERE
f.departure_airport = 'ATL' and
f.date_formatted BETWEEN "2012-07-01" AND "2012-12-31"
최종 DAG 확인
작업 실행
각각의 작업들이 실행되는 것을 확인할 수 있다.
BigQuery Studio에서 생성된 테이블 및 뷰 확인
추가로 맨 앞에서 말했듯이 Dataform은 Git으로 버전관리를 할 수 있다. Git과의 통합은 아래처럼 세팅에서 손쉽게 설정이 가능하다.
[Dataform을 사용한 ELT 파이프라인 구축 참고]