우리는 airflow docker 환경에서 각종 프로바이더들과 환경들을 세팅했다.
오늘부터는 직접적으로 dag를 만들어 진행하려고 한다.
위는 airflow라는 폴더안에 들어있는 파일들이다.
dag 폴더 구조를 위와 같이 만들어서 각종 정보가 담긴 config.py를 임포트해서 사용하도록 하자.
내 config file 내용이다.
또한 .gitinore 파일을 만들어서 쓸데없는 내용이 들어가지 않게 세팅한다.
pipeline의 각각의 task들을 test 하면서 진행하기
gcs 버킷에서 덮어쓰기가 가능하나 개체에 대한 이름 변경등을 할 수 없다.
https://techblog-history-younghunjo1.tistory.com/27
위의 링크에서 자세한 내용이 확인 가능하다.
아무리 테스트를 진행하면서 하더라도 sensor를 이용해서 pipeline 핸들링 하기
bash command의 경우 scheduler 에서 진행이 된다.
때문에 본인도 많이 찾아보다가 겨우 해결했다... ㅠ 도커이미지만들고 도커 컴포즈할껄
해당 파이프라인을 아래와 같이 구성하려고 하고 있다.
해당 오퍼레이터는 DAG의 시작을 알리는 더미 오퍼레이터이다.
start_pipeline = DummyOperator(
task_id = 'start_pipeline'
)
위와 같이 구성해주자.
해당 task는 bash operator를 사용하여 컨테이너 내부에 저장하려고 한다.
download_csv = BashOperator(
do_xcom_push=False,
task_id="download_csv",
bash_command="wget https://covid.ourworldindata.org/data/owid-covid-data.csv -O /opt/airflow/plugins/owid-covid-data.csv"
)
위와 같이 task를 구성해주자.
사전작업은 scheduler에 wget 을 반드시 설치해야 한다.
docker exec --user="root" -it scheduler_container_id apt-get update && apt-get install wget
위의 명령어를 통해 해당 스케줄러 컨테이너에 wget을 설치할 수 있다.
docker exec webserver_container_id airflow tasks test Covid-19_ETL download_csv 2022-01-01
를 통해 해당 프로세스가 잘 동작하는지 체크해주자.
우리는 이제 로컬에 저장된 csv파일을 gcs에 올려야 한다.
우리는 gcp와 소통하기 위해서 몇가지 준비를 해야한다.
해당 내용은 본인이 이전에 포스팅한 내용을 참고하길 바란다.
https://velog.io/@hyunwoozz/airflow-bigquery-%EC%97%B0%EB%8F%99%EC%8B%9C%ED%82%A4%EA%B8%B0
이제 준비가 되었다. 해당 방법은 이미 구글에서 전부 만들어 놓았고 우리는 operator를 사용하기만 하면 된다.
에어 플로우는 버전 및 디펜던시에 아주 민감하다. 때문에 항상 독스에서 디펜던시를 체크하면서 진행하자.
우리는 아래 링크에 있는 오퍼레이터를 사용할 예정이다. 파라미터들은 아래의 링크에 전부 나와 있다.
local_to_gcs = LocalFilesystemToGCSOperator(
task_id='local_to_gcs',
gcp_conn_id="gcs_conn_id",
src='/opt/airflow/plugins/owid-covid-data.csv',
dst='covid-19.csv',
bucket=gs_bucket
)
위의 오퍼레이터를 사용해서 로컬 파일을 gcs로 옮길 수 있다.
위의 오퍼레이터를 test 해보자.
docker exec webserver_container_id airflow tasks test Covid-19_ETL local_to_gcs 2022-01-01
정상적으로 수행이 되는 모습이다.
하지만 현재 테스트만 진행해서 된다고 중간에 장치를 마련하지 않으면 과거의 데이터를 기반으로 반복해서 데이터 교착상태가 발생할 수 있다.
따라서 우리는 sensor를 통해 데이터 파이프라인에 안전장치를 부착하려 한다.
우리는 버킷에 파일이 정상적으로 올라갔는지 궁금하다. 해당 오퍼레이터는 아래의 링크에서 확인 가능하다. sensor를 적극적으로 활용하자.
check_gcs_file = GCSObjectExistenceSensor(
task_id='check_gcs_file',
bucket=gs_bucket,
google_cloud_conn_id="gcs_conn_id",
object='covid-19.csv'
)
이제 위의 task를 테스트해보자.
docker exec webserver_container_id airflow tasks test Covid-19_ETL check_gcs_file 2022-01-01
정상적으로 작동한다.
이제 gcs에서 google bigquery로 데이터 immiragration을 진행시켜 보자.