airflow를 astronomer로 효과적으로 관리하기

오현우·2022년 3월 26일
0

airflow

목록 보기
4/20
post-thumbnail

https://docs.astronomer.io/astro/

저번 챕터에서 일관된 airflow 환경을 제공하지 못해서, 디버깅에 대해 매우 한정적이여서 벽을 느끼고 위와 같은 data orchestration platform 을 도입하게 되었다.

전반적인 아키텍쳐

astro 설치

https://docs.astronomer.io/software/cli-quickstart/
0.27.0을 권장한다. 0.28.0은 써봤는데 아직 오류가 많이 존재하는 듯.

docker 환경 구축

https://docs.docker.com/get-docker/

astro를 이용해 버젼과 환경을 세팅해준다.

git clone https://github.com/tuanchris/cloud-data-lake
cd cloud-data-lake

위에는 dag와 dockerfile 등 astro로 버젼관리와 dag를 효과적으로 관리할 수 있게 이미 전반적인 환경을 세팅해 두었다.

astro d start

어라라... 이게 아닌데...?

를 진행하면, 이상한 문구가 보이면서 작동이 되지 않을 것이다.

이러한 방법을 해결하는 방법은 2가지만 진행하면 된다.
1. Docker 접속 > preference(설정) > Docker Engine

{
  "builder": {
    "gc": {
      "enabled": true,
      "defaultKeepStorage": "20GB"
    }
  },
  "experimental": false,
  "features": {
    "buildkit": false
  }

빌드킷을 false로 바꿔준다.
다시 이제 터미널에서 아래의 명령어를 실행하면 정상적으로 작동한다.

DOCKER_BUILDKIT=0 astro dev start

도커에서 각 오퍼레이션이 수행되고 있는 모습

airflow 실행

도커환경에서 각기 스케쥴러와 서버가 실행되고 있으니 우리는 localhost:8080 으로 접속한다.

admin / admin 입력하여 로그인 한다.

해당 화면에서 dag가 정상적으로 업로드 되어있는 것을 보여준다.

구글 클라우드 플랫폼 환경 설정

  1. service account 등록 및 키 생성 > 권한을 잘 체크해야함.

  2. 버킷 설정 및 빅쿼리 생성 > 참고: 빅쿼리랑 같은 리젼이여야 데이터가 흐를 수 있다. (이전 포스팅 참고)

    빅쿼리 데이터 세트를 2개 만드는데, datawarehouse랑 staging dataset을 만들어서 준비작업을 해준다.

그 후 admin > connection 이동 후 connection을 2개 등록해준다.

bigquery service account를 등록하여 api를 정상적으로 활용할 수 있도록 한다.

keyfile을 위치 지정해주단가, json file의 내용을 직접 삽입할 수 있다.

이제 dag를 조정해야 한다.

set dag code

# Import packages
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator

# Define default arguments
default_args = {
    'owner': 'HyunWoo Oh',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5),
}

# Define dag variables
project_id = 'theta-cider-344811'
staging_dataset = 'IMMIGRATION_DWH_STAGING'
dwh_dataset = 'IMMIGRATION_DWH'
gs_bucket = 'hyunwoo_airflow_example'

# Define dag
dag = DAG('cloud-data-lake-pipeline',
          start_date=datetime.now(),
          schedule_interval='@once',
          concurrency=5,
          max_active_runs=1,
          default_args=default_args)

start_pipeline = DummyOperator(
    task_id = 'start_pipeline',
    dag = dag
)
# {project_id}:
# Load data from GCS to BQ
load_us_cities_demo = GoogleCloudStorageToBigQueryOperator(
    task_id = 'load_us_cities_demo',
    bucket = gs_bucket,
    source_objects = ['cloud-data-lake-gcp/cities/us-cities-demographics.csv'],
    destination_project_dataset_table = f'{project_id}:{staging_dataset}.us_cities_demo',
    schema_object = 'cloud-data-lake-gcp/cities/us_cities_demo.json',
    write_disposition='WRITE_TRUNCATE',
    source_format = 'csv',
    field_delimiter=';',
    skip_leading_rows = 1
)

load_airports = GoogleCloudStorageToBigQueryOperator(
    task_id = 'load_airports',
    bucket = gs_bucket,
    source_objects = ['cloud-data-lake-gcp/airports/airport-codes_csv.csv'],
    destination_project_dataset_table = f'{project_id}:{staging_dataset}.airport_codes',
    schema_object = 'cloud-data-lake-gcp/airports/airport_codes.json',
    write_disposition='WRITE_TRUNCATE',
    source_format = 'csv',
    skip_leading_rows = 1
)

load_weather = GoogleCloudStorageToBigQueryOperator(
    task_id = 'load_weather',
    bucket = gs_bucket,
    source_objects = ['cloud-data-lake-gcp/weather/GlobalLandTemperaturesByCity.csv'],
    destination_project_dataset_table = f'{project_id}:{staging_dataset}.temperature_by_city',
    schema_object = 'cloud-data-lake-gcp/weather/temperature_by_city.json',
    write_disposition='WRITE_TRUNCATE',
    source_format = 'csv',
    skip_leading_rows = 1
)

load_immigration_data = GoogleCloudStorageToBigQueryOperator(
    task_id = 'load_immigration_data',
    bucket = gs_bucket,
    source_objects = ['cloud-data-lake-gcp/immigration_data/*.parquet'],
    destination_project_dataset_table = f'{project_id}:{staging_dataset}.immigration_data',
    source_format = 'parquet',
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows = 1,
    autodetect = True
)

# Check loaded data not null
check_us_cities_demo = BigQueryCheckOperator(
    task_id = 'check_us_cities_demo',
    use_legacy_sql=False,
    sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.us_cities_demo`'

)

check_airports = BigQueryCheckOperator(
    task_id = 'check_airports',
    use_legacy_sql=False,
    sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.airport_codes`'
)

check_weather = BigQueryCheckOperator(
    task_id = 'check_weather',
    use_legacy_sql=False,
    sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.temperature_by_city`'
)


check_immigration_data = BigQueryCheckOperator(
    task_id = 'check_immigration_data',
    use_legacy_sql=False,
    sql = f'SELECT count(*) FROM `{project_id}.{staging_dataset}.immigration_data`'
)

loaded_data_to_staging = DummyOperator(
    task_id = 'loaded_data_to_staging'
)

# Load dimensions data from files directly to DWH table
load_country = GoogleCloudStorageToBigQueryOperator(
    task_id = 'load_country',
    bucket = gs_bucket,
    source_objects = ['cloud-data-lake-gcp/master_data/I94CIT_I94RES.csv'],
    destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_COUNTRY',
    write_disposition='WRITE_TRUNCATE',
    source_format = 'csv',
    skip_leading_rows = 1,
    schema_fields=[
        {'name': 'COUNTRY_ID', 'type': 'NUMERIC', 'mode': 'NULLABLE'},
        {'name': 'COUNTRY_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
    ]
)

load_port = GoogleCloudStorageToBigQueryOperator(
    task_id = 'load_port',
    bucket = gs_bucket,
    source_objects = ['cloud-data-lake-gcp/master_data/I94PORT.csv'],
    destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_PORT',
    write_disposition='WRITE_TRUNCATE',
    source_format = 'csv',
    skip_leading_rows = 1,
    schema_fields=[
        {'name': 'PORT_ID', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'PORT_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
    ]
)

load_state = GoogleCloudStorageToBigQueryOperator(
    task_id = 'load_state',
    bucket = gs_bucket,
    source_objects = ['cloud-data-lake-gcp/master_data/I94ADDR.csv'],
    destination_project_dataset_table = f'{project_id}:{dwh_dataset}.D_STATE',
    write_disposition='WRITE_TRUNCATE',
    source_format = 'csv',
    skip_leading_rows = 1,
    schema_fields=[
        {'name': 'STATE_ID', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'STATE_NAME', 'type': 'STRING', 'mode': 'NULLABLE'},
    ]
)

# Transform, load, and check fact data
create_immigration_data = BigQueryOperator(
    task_id = 'create_immigration_data',
    use_legacy_sql = False,
    params = {
        'project_id': project_id,
        'staging_dataset': staging_dataset,
        'dwh_dataset': dwh_dataset
    },
    sql = './sql/F_IMMIGRATION_DATA.sql'
)

check_f_immigration_data = BigQueryCheckOperator(
    task_id = 'check_f_immigration_data',
    use_legacy_sql=False,
    params = {
        'project_id': project_id,
        'staging_dataset': staging_dataset,
        'dwh_dataset': dwh_dataset
    },
    sql = f'SELECT count(*) = count(distinct cicid) FROM `{project_id}.{dwh_dataset}.F_IMMIGRATION_DATA`'
)

# Create remaining dimensions data
create_d_time = BigQueryOperator(
    task_id = 'create_d_time',
    use_legacy_sql = False,
    params = {
        'project_id': project_id,
        'staging_dataset': staging_dataset,
        'dwh_dataset': dwh_dataset
    },
    sql = './sql/D_TIME.sql'
)

create_d_weather = BigQueryOperator(
    task_id = 'create_d_weather',
    use_legacy_sql = False,
    params = {
        'project_id': project_id,
        'staging_dataset': staging_dataset,
        'dwh_dataset': dwh_dataset
    },
    sql = './sql/D_WEATHER.sql'
)

create_d_airport = BigQueryOperator(
    task_id = 'create_d_airport',
    use_legacy_sql = False,
    params = {
        'project_id': project_id,
        'staging_dataset': staging_dataset,
        'dwh_dataset': dwh_dataset
    },
    sql = './sql/D_AIRPORT.sql'
)

create_d_city_demo = BigQueryOperator(
    task_id = 'create_d_city_demo',
    use_legacy_sql = False,
    params = {
        'project_id': project_id,
        'staging_dataset': staging_dataset,
        'dwh_dataset': dwh_dataset
    },
    sql = './sql/D_CITY_DEMO.sql'
)

finish_pipeline = DummyOperator(
    task_id = 'finish_pipeline'
)

# Define task dependencies
dag >> start_pipeline >> [load_us_cities_demo, load_airports, load_weather, load_immigration_data]

load_us_cities_demo >> check_us_cities_demo
load_airports >> check_airports
load_weather >> check_weather
load_immigration_data >> check_immigration_data


[check_us_cities_demo, check_airports, check_weather,check_immigration_data] >> loaded_data_to_staging

loaded_data_to_staging >> [load_country, load_port, load_state] >> create_immigration_data >> check_f_immigration_data

check_f_immigration_data >> [create_d_time, create_d_weather, create_d_airport, create_d_city_demo] >> finish_pipeline

결과 화면

결론

단순하지는 않은 복잡한 워크플로우를 구성해 보았다.
이렇게 결과만 보면, 복잡하지 않을 수 있지만 필자는 이러한 워크플로우를 따라가려고 해도 전체적인 흐름을 가끔씩 놓쳤으며 이렇게 진행될 수 있게 환경을 세팅하기란 정말 어려웠다.

이러한 상황에서 astro cli 환경에서 airflow 구축은 내가 어느정도 생각해야할 짐을 덜어줬고, 도커 환경을 통해 각기 다른 터미널들이 동작하여 좀 더 편하게 작업하였다.

이런 프로젝트를 하며 느낀 것은 이렇게 통합된 환경을 제공하고, 다른 플랫폼의 api를 활용하여 소통하는 것은 정말 어렵지만 한번 잘 구축해 둔다면 추후에 코드 재사용성이 많이 클 것으로 생각된다.

참고 자료 https://github.com/tuanchris

profile
핵심은 같게, 생각은 다르게

0개의 댓글