: Airflow 책을 학습하는 방법 을 참고하여 Github 에서 clone 받은 소스 중 Chapter04 경로로 이동한 후 아래와 같이 Docker Compose 명령어를 수행하여 Airflow 환경을 세팅한다.
# 만약 chapter03 의 도커 컨테이너가 실행 중이라면 먼저
# cd data-pipelines-with-apache-airflow/chapter03/ && docker-compose down
# 명령어를 실행한다.
$ cd data-pipelines-with-apache-airflow/chapter04/
$ docker-compose up -d
참고: chapter04-init-1 컨테이너는 airflow db(postgres) 를 init 한 후 Exit 되므로, 실습 중 컨테이너 목록에 없어져도 신경쓰지 않아도 된다.
: ./data-pipelines-with-apache-airflow/chapter08chatper04
경로에서 docker compose 명령어로 실습에 필요한 컨테이너를 모두 실행 하였다면, chapter04-postgres-1
컨테이에서 실행 중인 postgres 데이터베이스에 아래 연결 정보를 이용하여 연결할 수 있다. 해당 DB 에는 실습 과정에서 추출한 데이터를 적재할 예정이다.
(DBeaver, Datgrip 등 자신이 사용하는 DB 연결 툴을 이용하면 된다.)
- Host: localhost
- Port: 5432
- Username: airflow
- Password: airflow
- Database: airflow
: Chapter04 에서는 아래 이미지와 같이 'Wikipedia' 의 페이지 뷰 데이터를 .zip 파일로 다운로드 받은 후 압축 해제 한다. 그리고 페이지 뷰 수 데이터를 추출하여 Postgres 데이터베이스에 적재하여 Airflow 에서 데이터를 증분처리하고 외부 시스템에 연동하여 데이터를 적재하는 방법을 배운다.
: 우리는 이미 Chapter03 에서 데이터 증분처리 하는 방법에 대해 학습했다. 위키피디아의 페이지 뷰 데이터를 증분처리 하여 다운로드 받기 위해서는 아래 URL 양식에 맞춰 년, 월, 일, 시간
정보를 증분하여 넣어주면 된다.
https://dumps.wikimedia.org/other/pageviews/{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz
ex) # 2023년 06월 01일 15시 00분 00초 페이지 뷰 데이터 다운로드(.zip)
https://dumps.wikimedia.org/other/pageviews/2023/2023-06/pageviews-20230601-150000.gz
# pageviews-20230601-150000.gz 압축해제 후 내용
도메인코드 페이지제목 조회수 응답크기(byte)
aa Main_Page 7 0
aa Special:Book 1 0
aa Special:Log 1 0
aa Special:UserLogin 1 0
aa User:Alexbot 1 0
aa User:BotMultichill 1 0
...
위 데이터는 2023-06-01 14:00:00 ~ 2023-06-01 15:00:00
까지의 집계 데이터를 의미한다.
만약, 각 필드 정보가 en.m
/ American_Bobtail
/ 6
/ 0
이라면, 2023년 06월 01일 14시 ~ 15시 사이에 https://en.m.wikipedia.org/wiki/American_Bobtail
라는 주소의 페이지 뷰가 6 이라는 뜻이다.
: ./data-pipelines-with-apache-airflow/chapter08/dags/listing_4_1.py
예제를 테스트하여 위키피디아 URL 이 어떻게 증분처리 되는지 확인해보자.
# data-pipelines-with-apache-airflow/chapter08/dags/listing_4_1.py
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
# 3일 전부터 매시간 정각에 DAG 가 스케줄링 된다.
dag = DAG(
dag_id="listing_4_01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
: BashOperator
를 이용하여 입력 된 년
, 월
, 일
, 시간
에 맞는 페이지뷰 압축 파일이 /tmp
경로에 wikipageviews.gz
라는 이름으로 다운로드 되도록 하였다.
# data-pipelines-with-apache-airflow/chapter08/dags/listing_4_1.py
# BashOperator 를 이용하여 위키피디아 페이지 뷰 데이터를 /tmp/wikipageviews.gz 경로에 저장하도록 한다.
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz"
),
dag=dag,
)
위에서 bash_command 에 있는 템플릿을 간단히 살펴보자
{{ execution_date }}
: start_date - schedule_interval
의 YYYYMMDD
값{{ execution_date.year }}
: execution_date 의 년도 정보{{ execution_date.month }}
: execution_date 의 월 정보{{ '{:02}'.format(execution_date.month) }}
: execution_date 의 월 정보이면서, 빈자리를 0 으로 채운다. (예: 7월 -> 07){{ '{:02}'.format(execution_date.day) }}
: execution_date 의 일 정보이면서, 빈자리를 0 으로 채운다. (예: 8일 -> 08): 아래 이미지와 같이 3일 전부터 1시간 간격으로 배치가 수행된 것을 확인할 수 있고, 작성일 기준 가장 최신 배치 작업인 2023년 07월 03일 15시(UTC) 작업의 log 를 확인해보면 아래와 같다.
로그에 보이는 것처럼 BashOperator 에서 수행할 명령어 양식에 템플릿으로 지정한 시간이 정상적으로 증분처리 된 것을 확인할 수 있다.
또한, chapter04-scheduler-1
컨테이너 쉘에 들어간 후 /tmp
경로로 이동해보면 wikipageviews.gz
압축파일이 정상적으로 다운로드 되어 있는 것도 확인할 수 있다.
# chapter04-scheduler-1 컨테이너로 접속하여 파일 확인하기
# 컨테이너 접속
$ docker exec -it chapter04-scheduler-1 /bin/bash
# 파일 확인
$ cd /tmp && ls
pymp-5ihi2zak pymp-dj8tqp5_ tmp4szip8kz wikipageviews.gz
예제 1번으로 위키피디아 URL 을 어떻게 증분처리 하는지 확인해 보았다. 그렇다면, execution_date 외에 어떤 템플릿이 있는지도 살펴보자. airflow 에서는 템플릿을 사용하려면 {{ }} 와 같이 중괄호 2개로 감싼 인수를 문자열에 넣으면 사용할 수 있다. 이를 Jinja 템플릿이라고 한다. 단, 모든 인수를 중괄호 2개로 감싼다고 템플릿으로 사용가능한 것은 아니다. 그렇다면, airflow 에서 사용가능한 템플릿 인수는 어떤 것들이 있을지 살펴보자.
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
위에 독스에도 있고
BashOperator 에서 상속받고 있는 BaseOperator 클래스에 가보면 from airflow.utils.context import Context 를 확장하고 있다. 여기서 Context 를 살펴보면 아래와 같이 템플릿에 사용되는 인수들을 정의하고 있다. 맞나..? 일단 더 확인해보자
class Context(MutableMapping[str, Any]):
"""Jinja2 template context for task rendering.
This is a mapping (dict-like) class that can lazily emit warnings when
(and only when) deprecated context keys are accessed.
"""
_DEPRECATION_REPLACEMENTS: dict[str, list[str]] = {
"execution_date": ["data_interval_start", "logical_date"],
"next_ds": ["{{ data_interval_end | ds }}"],
"next_ds_nodash": ["{{ data_interval_end | ds_nodash }}"],
"next_execution_date": ["data_interval_end"],
"prev_ds": [],
"prev_ds_nodash": [],
"prev_execution_date": [],
"prev_execution_date_success": ["prev_data_interval_start_success"],
"tomorrow_ds": [],
"tomorrow_ds_nodash": [],
"yesterday_ds": [],
"yesterday_ds_nodash": [],
}
: Airflow 에는 Chapter03 에서 사용하였던 execution_date
외에도 여러가지 시간 정보를 지원하는 템플릿이 있다. 위키피디아 URL