Apache Airflow 기반의 데이터 파이프라인 - Chaper04: Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기(작성중)

김재민·2023년 7월 3일
1

1. Chapter04 들어가기 전


: Airflow 책을 학습하는 방법 을 참고하여 Github 에서 clone 받은 소스 중 Chapter04 경로로 이동한 후 아래와 같이 Docker Compose 명령어를 수행하여 Airflow 환경을 세팅한다.

Airflow 컴포넌트 컨테이너 실행

# 만약 chapter03 의 도커 컨테이너가 실행 중이라면 먼저  
# cd data-pipelines-with-apache-airflow/chapter03/ && docker-compose down 
# 명령어를 실행한다.

$ cd data-pipelines-with-apache-airflow/chapter04/
$ docker-compose up -d

참고: chapter04-init-1 컨테이너는 airflow db(postgres) 를 init 한 후 Exit 되므로, 실습 중 컨테이너 목록에 없어져도 신경쓰지 않아도 된다.


Chapter04 DB 연결하기

: ./data-pipelines-with-apache-airflow/chapter08chatper04 경로에서 docker compose 명령어로 실습에 필요한 컨테이너를 모두 실행 하였다면, chapter04-postgres-1 컨테이에서 실행 중인 postgres 데이터베이스에 아래 연결 정보를 이용하여 연결할 수 있다. 해당 DB 에는 실습 과정에서 추출한 데이터를 적재할 예정이다.
(DBeaver, Datgrip 등 자신이 사용하는 DB 연결 툴을 이용하면 된다.)

- Host: localhost
- Port: 5432
- Username: airflow
- Password: airflow
- Database: airflow


2. Chapter04 예제 Task 흐름 미리보기


: Chapter04 에서는 아래 이미지와 같이 'Wikipedia' 의 페이지 뷰 데이터를 .zip 파일로 다운로드 받은 후 압축 해제 한다. 그리고 페이지 뷰 수 데이터를 추출하여 Postgres 데이터베이스에 적재하여 Airflow 에서 데이터를 증분처리하고 외부 시스템에 연동하여 데이터를 적재하는 방법을 배운다.

위키피디아 페이지 뷰 데이터 다운로드 방법

: 우리는 이미 Chapter03 에서 데이터 증분처리 하는 방법에 대해 학습했다. 위키피디아의 페이지 뷰 데이터를 증분처리 하여 다운로드 받기 위해서는 아래 URL 양식에 맞춰 년, 월, 일, 시간 정보를 증분하여 넣어주면 된다.

https://dumps.wikimedia.org/other/pageviews/{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz

ex) # 2023년 06월 01일 15시 00분 00초 페이지 뷰 데이터 다운로드(.zip)
https://dumps.wikimedia.org/other/pageviews/2023/2023-06/pageviews-20230601-150000.gz
# pageviews-20230601-150000.gz 압축해제 후 내용

도메인코드	  	페이지제목				조회수		응답크기(byte)
aa 			Main_Page 			7 			0
aa 			Special:Book 		1 			0
aa 			Special:Log 		1 			0
aa 			Special:UserLogin 	1 			0
aa 			User:Alexbot 		1 			0
aa 			User:BotMultichill 	1 			0
...

위 데이터는 2023-06-01 14:00:00 ~ 2023-06-01 15:00:00 까지의 집계 데이터를 의미한다.
만약, 각 필드 정보가 en.m / American_Bobtail / 6 / 0 이라면, 2023년 06월 01일 14시 ~ 15시 사이에 https://en.m.wikipedia.org/wiki/American_Bobtail 라는 주소의 페이지 뷰가 6 이라는 뜻이다.


3. 위키피디아 URL 증분처리하기(예제: listing_4_1.py)


: ./data-pipelines-with-apache-airflow/chapter08/dags/listing_4_1.py 예제를 테스트하여 위키피디아 URL 이 어떻게 증분처리 되는지 확인해보자.

listing_4_1.py 코드 분석

Airflow DAG 정의를 위한 라이브러리 호출

# data-pipelines-with-apache-airflow/chapter08/dags/listing_4_1.py

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator

DAG 객체 정의

# 3일 전부터 매시간 정각에 DAG 가 스케줄링 된다.
dag = DAG(
    dag_id="listing_4_01",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@hourly",
)

BashOperator - get_data task 정의

: BashOperator 를 이용하여 입력 된 , , , 시간에 맞는 페이지뷰 압축 파일이 /tmp 경로에 wikipageviews.gz 라는 이름으로 다운로드 되도록 하였다.

# data-pipelines-with-apache-airflow/chapter08/dags/listing_4_1.py
# BashOperator 를 이용하여 위키피디아 페이지 뷰 데이터를 /tmp/wikipageviews.gz 경로에 저장하도록 한다.
get_data = BashOperator(
    task_id="get_data",
    bash_command=(
        "curl -o /tmp/wikipageviews.gz "
        "https://dumps.wikimedia.org/other/pageviews/"
        "{{ execution_date.year }}/"
        "{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
        "pageviews-{{ execution_date.year }}"
        "{{ '{:02}'.format(execution_date.month) }}"
        "{{ '{:02}'.format(execution_date.day) }}-"
        "{{ '{:02}'.format(execution_date.hour) }}0000.gz"
    ),
    dag=dag,
)

위에서 bash_command 에 있는 템플릿을 간단히 살펴보자

  • {{ execution_date }} : start_date - schedule_intervalYYYYMMDD
  • {{ execution_date.year }} : execution_date 의 년도 정보
  • {{ execution_date.month }} : execution_date 의 월 정보
  • {{ '{:02}'.format(execution_date.month) }} : execution_date 의 월 정보이면서, 빈자리를 0 으로 채운다. (예: 7월 -> 07)
  • {{ '{:02}'.format(execution_date.day) }} : execution_date 의 일 정보이면서, 빈자리를 0 으로 채운다. (예: 8일 -> 08)

listing_4_1 DAG 실행 및 로그 확인

: 아래 이미지와 같이 3일 전부터 1시간 간격으로 배치가 수행된 것을 확인할 수 있고, 작성일 기준 가장 최신 배치 작업인 2023년 07월 03일 15시(UTC) 작업의 log 를 확인해보면 아래와 같다.

로그에 보이는 것처럼 BashOperator 에서 수행할 명령어 양식에 템플릿으로 지정한 시간이 정상적으로 증분처리 된 것을 확인할 수 있다.
또한, chapter04-scheduler-1 컨테이너 쉘에 들어간 후 /tmp 경로로 이동해보면 wikipageviews.gz 압축파일이 정상적으로 다운로드 되어 있는 것도 확인할 수 있다.

# chapter04-scheduler-1 컨테이너로 접속하여 파일 확인하기

# 컨테이너 접속
$ docker exec -it chapter04-scheduler-1 /bin/bash

# 파일 확인
$ cd /tmp && ls
pymp-5ihi2zak  pymp-dj8tqp5_  tmp4szip8kz  wikipageviews.gz

예제 1번으로 위키피디아 URL 을 어떻게 증분처리 하는지 확인해 보았다. 그렇다면, execution_date 외에 어떤 템플릿이 있는지도 살펴보자. airflow 에서는 템플릿을 사용하려면 {{ }} 와 같이 중괄호 2개로 감싼 인수를 문자열에 넣으면 사용할 수 있다. 이를 Jinja 템플릿이라고 한다. 단, 모든 인수를 중괄호 2개로 감싼다고 템플릿으로 사용가능한 것은 아니다. 그렇다면, airflow 에서 사용가능한 템플릿 인수는 어떤 것들이 있을지 살펴보자.

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

위에 독스에도 있고
BashOperator 에서 상속받고 있는 BaseOperator 클래스에 가보면 from airflow.utils.context import Context 를 확장하고 있다. 여기서 Context 를 살펴보면 아래와 같이 템플릿에 사용되는 인수들을 정의하고 있다. 맞나..? 일단 더 확인해보자

class Context(MutableMapping[str, Any]):
    """Jinja2 template context for task rendering.

    This is a mapping (dict-like) class that can lazily emit warnings when
    (and only when) deprecated context keys are accessed.
    """

    _DEPRECATION_REPLACEMENTS: dict[str, list[str]] = {
        "execution_date": ["data_interval_start", "logical_date"],
        "next_ds": ["{{ data_interval_end | ds }}"],
        "next_ds_nodash": ["{{ data_interval_end | ds_nodash }}"],
        "next_execution_date": ["data_interval_end"],
        "prev_ds": [],
        "prev_ds_nodash": [],
        "prev_execution_date": [],
        "prev_execution_date_success": ["prev_data_interval_start_success"],
        "tomorrow_ds": [],
        "tomorrow_ds_nodash": [],
        "yesterday_ds": [],
        "yesterday_ds_nodash": [],
    }



N. Airflow 의 다양한 템플릿( 이걸 차라리 마지막에 설명 )

: Airflow 에는 Chapter03 에서 사용하였던 execution_date 외에도 여러가지 시간 정보를 지원하는 템플릿이 있다. 위키피디아 URL

profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글