https://docs.astronomer.io/astro/
저번 챕터에서 일관된 airflow 환경을 제공하지 못해서, 디버깅에 대해 매우 한정적이여서 벽을 느끼고 위와 같은 data orchestration platform 을 도입하게 되었다.
전반적인 아키텍쳐
https://docs.astronomer.io/software/cli-quickstart/
0.27.0을 권장한다. 0.28.0은 써봤는데 아직 오류가 많이 존재하는 듯.
https://docs.docker.com/get-docker/
git clone https://github.com/tuanchris/cloud-data-lake
cd cloud-data-lake
위에는 dag와 dockerfile 등 astro로 버젼관리와 dag를 효과적으로 관리할 수 있게 이미 전반적인 환경을 세팅해 두었다.
astro d start
어라라... 이게 아닌데...?
를 진행하면, 이상한 문구가 보이면서 작동이 되지 않을 것이다.
이러한 방법을 해결하는 방법은 2가지만 진행하면 된다.
1. Docker 접속 > preference(설정) > Docker Engine
{
"builder": {
"gc": {
"enabled": true,
"defaultKeepStorage": "20GB"
}
},
"experimental": false,
"features": {
"buildkit": false
}
빌드킷을 false로 바꿔준다.
다시 이제 터미널에서 아래의 명령어를 실행하면 정상적으로 작동한다.
DOCKER_BUILDKIT=0 astro dev start
도커에서 각 오퍼레이션이 수행되고 있는 모습
도커환경에서 각기 스케쥴러와 서버가 실행되고 있으니 우리는 localhost:8080 으로 접속한다.
admin / admin 입력하여 로그인 한다.
해당 화면에서 dag가 정상적으로 업로드 되어있는 것을 보여준다.
service account 등록 및 키 생성 > 권한을 잘 체크해야함.
버킷 설정 및 빅쿼리 생성 > 참고: 빅쿼리랑 같은 리젼이여야 데이터가 흐를 수 있다. (이전 포스팅 참고)
빅쿼리 데이터 세트를 2개 만드는데, datawarehouse랑 staging dataset을 만들어서 준비작업을 해준다.
그 후 admin > connection 이동 후 connection을 2개 등록해준다.
bigquery service account를 등록하여 api를 정상적으로 활용할 수 있도록 한다.
keyfile을 위치 지정해주단가, json file의 내용을 직접 삽입할 수 있다.
이제 dag를 조정해야 한다.
# Import packages
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
# Define default arguments
default_args = {
'owner': 'HyunWoo Oh',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5),
}
# Define dag variables
project_id = 'theta-cider-344811'
staging_dataset = 'IMMIGRATION_DWH_STAGING'
dwh_dataset = 'IMMIGRATION_DWH'
gs_bucket = 'hyunwoo_airflow_example'
# Define dag
dag = DAG('cloud-data-lake-pipeline',
start_date=datetime.now(),
schedule_interval='@once',
concurrency=5,
max_active_runs=1,
default_args=default_args)
start_pipeline = DummyOperator(
task_id = 'start_pipeline',
dag = dag
)
# {project_id}:
# Load data from GCS to BQ
load_us_cities_demo = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_us_cities_demo',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/cities/us-cities-demographics.csv'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.us_cities_demo',
schema_object = 'cloud-data-lake-gcp/cities/us_cities_demo.json',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
field_delimiter=';',
skip_leading_rows = 1
)
load_airports = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_airports',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/airports/airport-codes_csv.csv'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.airport_codes',
schema_object = 'cloud-data-lake-gcp/airports/airport_codes.json',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1
)
load_weather = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_weather',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/weather/GlobalLandTemperaturesByCity.csv'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.temperature_by_city',
schema_object = 'cloud-data-lake-gcp/weather/temperature_by_city.json',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1
)
load_immigration_data = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_immigration_data',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/immigration_data/*.parquet'],
destination_project_dataset_table = f'{project_id}:{staging_dataset}.immigration_data',
source_format = 'parquet',
write_disposition='WRITE_TRUNCATE',
skip_leading_rows = 1,
autodetect = True
)
# Check loaded data not null
check_us_cities_demo = BigQueryCheckOperator(
task_id = 'check_us_cities_demo',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.us_cities_demo`'
)
check_airports = BigQueryCheckOperator(
task_id = 'check_airports',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.airport_codes`'
)
check_weather = BigQueryCheckOperator(
task_id = 'check_weather',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.temperature_by_city`'
)
check_immigration_data = BigQueryCheckOperator(
task_id = 'check_immigration_data',
use_legacy_sql=False,
sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.immigration_data`'
)
loaded_data_to_staging = DummyOperator(
task_id = 'loaded_data_to_staging'
)
# Load dimensions data from files directly to DWH table
load_country = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_country',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/master_data/I94CIT_I94RES.csv'],
destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_COUNTRY',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1,
schema_fields=[
{'name': 'COUNTRY_ID', 'type': 'NUMERIC', 'mode': 'NULLABLE'},
{'name': 'COUNTRY_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
]
)
load_port = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_port',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/master_data/I94PORT.csv'],
destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_PORT',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1,
schema_fields=[
{'name': 'PORT_ID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'PORT_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
]
)
load_state = GoogleCloudStorageToBigQueryOperator(
task_id = 'load_state',
bucket = gs_bucket,
source_objects = ['cloud-data-lake-gcp/master_data/I94ADDR.csv'],
destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_STATE',
write_disposition='WRITE_TRUNCATE',
source_format = 'csv',
skip_leading_rows = 1,
schema_fields=[
{'name': 'STATE_ID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'STATE_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
]
)
# Transform, load, and check fact data
create_immigration_data = BigQueryOperator(
task_id = 'create_immigration_data',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/F_IMMIGRATION_DATA.sql'
)
check_f_immigration_data = BigQueryCheckOperator(
task_id = 'check_f_immigration_data',
use_legacy_sql=False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = f'SELECT count(*) = count(distinct cicid) FROM `{project_id}.{dwh_dataset}.F_IMMIGRATION_DATA`'
)
# Create remaining dimensions data
create_d_time = BigQueryOperator(
task_id = 'create_d_time',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/D_TIME.sql'
)
create_d_weather = BigQueryOperator(
task_id = 'create_d_weather',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/D_WEATHER.sql'
)
create_d_airport = BigQueryOperator(
task_id = 'create_d_airport',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/D_AIRPORT.sql'
)
create_d_city_demo = BigQueryOperator(
task_id = 'create_d_city_demo',
use_legacy_sql = False,
params = {
'project_id': project_id,
'staging_dataset': staging_dataset,
'dwh_dataset': dwh_dataset
},
sql = './sql/D_CITY_DEMO.sql'
)
finish_pipeline = DummyOperator(
task_id = 'finish_pipeline'
)
# Define task dependencies
dag >> start_pipeline >> [load_us_cities_demo, load_airports, load_weather, load_immigration_data]
load_us_cities_demo >> check_us_cities_demo
load_airports >> check_airports
load_weather >> check_weather
load_immigration_data >> check_immigration_data
[check_us_cities_demo, check_airports, check_weather,check_immigration_data] >> loaded_data_to_staging
loaded_data_to_staging >> [load_country, load_port, load_state] >> create_immigration_data >> check_f_immigration_data
check_f_immigration_data >> [create_d_time, create_d_weather, create_d_airport, create_d_city_demo] >> finish_pipeline
결과 화면
단순하지는 않은 복잡한 워크플로우를 구성해 보았다.
이렇게 결과만 보면, 복잡하지 않을 수 있지만 필자는 이러한 워크플로우를 따라가려고 해도 전체적인 흐름을 가끔씩 놓쳤으며 이렇게 진행될 수 있게 환경을 세팅하기란 정말 어려웠다.
이러한 상황에서 astro cli 환경에서 airflow 구축은 내가 어느정도 생각해야할 짐을 덜어줬고, 도커 환경을 통해 각기 다른 터미널들이 동작하여 좀 더 편하게 작업하였다.
이런 프로젝트를 하며 느낀 것은 이렇게 통합된 환경을 제공하고, 다른 플랫폼의 api를 활용하여 소통하는 것은 정말 어렵지만 한번 잘 구축해 둔다면 추후에 코드 재사용성이 많이 클 것으로 생각된다.