[airflow] Covid-19 ETL by bigquery #3

오현우·2022년 6월 18일
0

airflow

목록 보기
18/20

지난 시간 리뷰

우리는 airflow docker 환경에서 각종 프로바이더들과 환경들을 세팅했다.
오늘부터는 직접적으로 dag를 만들어 진행하려고 한다.

나의 폴더 구조

위는 airflow라는 폴더안에 들어있는 파일들이다.

dag 폴더 구조를 위와 같이 만들어서 각종 정보가 담긴 config.py를 임포트해서 사용하도록 하자.

내 config file 내용이다.

또한 .gitinore 파일을 만들어서 쓸데없는 내용이 들어가지 않게 세팅한다.

DAG 만들기전에 알아두면 좋을 내용

  1. pipeline의 각각의 task들을 test 하면서 진행하기

  2. gcs 버킷에서 덮어쓰기가 가능하나 개체에 대한 이름 변경등을 할 수 없다.

    https://techblog-history-younghunjo1.tistory.com/27
    위의 링크에서 자세한 내용이 확인 가능하다.

  3. 아무리 테스트를 진행하면서 하더라도 sensor를 이용해서 pipeline 핸들링 하기

  4. bash command의 경우 scheduler 에서 진행이 된다.
    때문에 본인도 많이 찾아보다가 겨우 해결했다... ㅠ 도커이미지만들고 도커 컴포즈할껄

DAG first step!! Extract data.

해당 파이프라인을 아래와 같이 구성하려고 하고 있다.

start_pipeline

해당 오퍼레이터는 DAG의 시작을 알리는 더미 오퍼레이터이다.

    start_pipeline = DummyOperator(
    task_id = 'start_pipeline'
    )

위와 같이 구성해주자.

Download_csv

해당 task는 bash operator를 사용하여 컨테이너 내부에 저장하려고 한다.

    download_csv = BashOperator(
        do_xcom_push=False,
        task_id="download_csv",
        bash_command="wget https://covid.ourworldindata.org/data/owid-covid-data.csv -O /opt/airflow/plugins/owid-covid-data.csv"
    )

위와 같이 task를 구성해주자.

사전작업은 scheduler에 wget 을 반드시 설치해야 한다.

docker exec --user="root" -it scheduler_container_id apt-get update && apt-get install wget

위의 명령어를 통해 해당 스케줄러 컨테이너에 wget을 설치할 수 있다.

docker exec webserver_container_id airflow tasks test Covid-19_ETL download_csv 2022-01-01 를 통해 해당 프로세스가 잘 동작하는지 체크해주자.

local_to_gcs

우리는 이제 로컬에 저장된 csv파일을 gcs에 올려야 한다.

우리는 gcp와 소통하기 위해서 몇가지 준비를 해야한다.

  1. 서비스 계정 생성
  2. admin에서 gcp와 소통하기 위한 connection 만들기

해당 내용은 본인이 이전에 포스팅한 내용을 참고하길 바란다.

https://velog.io/@hyunwoozz/airflow-bigquery-%EC%97%B0%EB%8F%99%EC%8B%9C%ED%82%A4%EA%B8%B0

이제 준비가 되었다. 해당 방법은 이미 구글에서 전부 만들어 놓았고 우리는 operator를 사용하기만 하면 된다.

에어 플로우는 버전 및 디펜던시에 아주 민감하다. 때문에 항상 독스에서 디펜던시를 체크하면서 진행하자.

우리는 아래 링크에 있는 오퍼레이터를 사용할 예정이다. 파라미터들은 아래의 링크에 전부 나와 있다.

https://airflow.apache.org/docs/apache-airflow-providers-google/1.0.0/operators/transfer/local_to_gcs.html

    local_to_gcs = LocalFilesystemToGCSOperator(
        task_id='local_to_gcs',
        gcp_conn_id="gcs_conn_id",
        src='/opt/airflow/plugins/owid-covid-data.csv',
        dst='covid-19.csv',
        bucket=gs_bucket
    )

위의 오퍼레이터를 사용해서 로컬 파일을 gcs로 옮길 수 있다.

위의 오퍼레이터를 test 해보자.

docker exec webserver_container_id airflow tasks test Covid-19_ETL local_to_gcs 2022-01-01

정상적으로 수행이 되는 모습이다.

하지만 현재 테스트만 진행해서 된다고 중간에 장치를 마련하지 않으면 과거의 데이터를 기반으로 반복해서 데이터 교착상태가 발생할 수 있다.

따라서 우리는 sensor를 통해 데이터 파이프라인에 안전장치를 부착하려 한다.

check_gcs_file

우리는 버킷에 파일이 정상적으로 올라갔는지 궁금하다. 해당 오퍼레이터는 아래의 링크에서 확인 가능하다. sensor를 적극적으로 활용하자.

https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/gcs/index.html

    check_gcs_file = GCSObjectExistenceSensor(
        task_id='check_gcs_file',
        bucket=gs_bucket,
        google_cloud_conn_id="gcs_conn_id",
        object='covid-19.csv'
    )

이제 위의 task를 테스트해보자.

docker exec webserver_container_id airflow tasks test Covid-19_ETL check_gcs_file 2022-01-01

정상적으로 작동한다.

이제 gcs에서 google bigquery로 데이터 immiragration을 진행시켜 보자.

profile
핵심은 같게, 생각은 다르게

0개의 댓글