get_data
를 통해 가지고 온 후 extract_gz
로 gz 파일의 압축을 풀어 준다. 이후 fech_page_views
를 통해 시간 단위의 페이지 뷰 수(조회 수)를 추출해 준다.postgres
를 통해 SQL 쿼리문
으로 페이지별로 페이지 뷰 수를 INSERT 즉, 적재해 준다.어떤 종류의 데이터로 작업을 하든 다음 과정은 필수적이다.
파이프라인을 구축하기 전 접근 방식에 대한 기술적 계획을 세운다
이 질문에 대한 답을 정하면 기술적 세부 사항에 대한 문제를 해결할 수 있다.
http://dumps.wikimedia.org/other/pageviews/{year}/{year}-{month}/pageviews-{year}{month}{day}{hour}0000.gz
위키미디어의 덤프 url은 해당 링크에서 자세하게 확인할 수 있다. 🔗위키미디어 덤프
gz 압축 파일은 파일의 이름과 동일한 하나의 텍스트 파일을 포함하고 있다.
bash에 압축 파일의 내용을 확인하면 다음과 같은 목록이 추출된다.
gz 압축 파일 내부 텍스트 파일 구조
이렇게 직접 데이터를 분석해 보게 되면 우리가 사용하기 위해 가지고 와야 하는 데이터가 무엇인지를 파악할 수 있다.
페이지 제목별로 페이지의 조회 수를 데이터베이스에 저장해 주는 작업이기 때문에 날짜, 페이지 제목, 조회 수 정보가 필요한 것을 알 수 있다.
get_data
를 진행해 준다.http://dumps.wikimedia.org/other/pageviews/{year}/{year}-{month}/pageviews-{year}{month}{day}{hour}0000.gz
BashOperator
와 PythonOperator
두 가지 방법으로 진행하였다. bash_command
를 사용해 주어야 한다.import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
dag_id="bash_test",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz"
),
dag=dag,
)
bash_command
를 첫 번째 태스크인 get_data
에 넣어 준다.
런타임 시작 시 변수를 삽입하고 이중 중괄호가 끝내야 한다. 이때 이중 중괄호는 Jinja 템플릿 문자열
이고, Jinja
는 런타임 시에 템플릿 문자열의 변수와 and, or 표현식을 대체하는 템플릿 엔진이다.
프로그래밍 시에는 알 수 없는 사용자로부터 입력받은 값들을 런타임 시에 할당하고 삽입할 때 사용한다.
"{{ execution_date.year }}/"
execution_date
는 실행한 날짜인데 이 값은 프로그래밍 시에는 알 수 없고 DAG의 태스크가 실행되면서 알 수 있게 된다. 이런 경우 Jinja 템플릿
을 써 준다고 생각하면 된다."{{ '{:02}'.format(execution_date.hour) }}0000.gz"
이때 Airflow에서는 날짜 타입을 Datetime의 Datetime을 쓸까?
Airflow
에서는 Pendulum
의 datetime
객체를 사용한다.datetime
을 썼을 때와 동일한 결과를 얻을 수 있다. Pendulum
의 datetime
은 네이티브 파이썬의 datetime
의 호환 객체이기 때문에 네이티브 파이썬의 datetime
에서 사용하는 모든 메소드를 사용할 수 있다.{{ '{:02}'.format(execution_date.hour) }}
Jinja 템플릿 문자열
내에서 이런 패딩 문자열 형식을 적용 가능하다.✍ curl (Client URL)로 지원하는 프로토콜들을 이용해 서버에 데이터를 보내거나 가져올 때 사용하는 도구이다.
📌 어떤 인수가 템플릿으로 지정될까? 또 템플릿화를 위해 사용할 수 있는 변수는 무엇일까?
- 오퍼레이터의 인수 중 템플릿으로 만들 수 있는 인수가 정해져 있고, 템플릿으로 만들 수 있는 속성의 허용 리스트가 존재한다.
- 만약 속성 리스트에 포함되지 않는 인수라면 문자열 그대로 해석이 된다. 예를 들어
name
이라는 속성이 템플릿으로 허용되지 않는 인수라면{{ name }}
을 작성해도 입력 값이 들어가는 게 아닌 그대로{{ name }}
으로 출력이 되는 것이다.
🔗 template_fields 목록
- 그렇다면 템플릿화를 위해 사용할 수 있는 변수는 무엇이 있을까. 이 변수들을
태스크 콘텍스크(Task context)
라고 부르게 된다.- 다음과 같이
PythonOperater
를 사용하면 전체태스크 콘텍스크(Task context)
를 출력할 수 있다.import airflow.utils.dates from airflow import DAG from airflow.operators.python import PythonOperator dag = DAG( dag_id="print_task_context_test", start_date=airflow.utils.dates.days_ago(3), schedule_interval="@daily", ) def _print_context(**kwargs): print(kwargs) print_context = PythonOperator( task_id="print_context", python_callable=_print_context, dag=dag )
PythonOperator
는 BashOperator
와 달리 pyhton_callable
인수를 사용하게 된다.Python
은 함수가 주요 요소이고, pyhton_callable
인수에 callable
을 제공한다. 이때 callable
이란 대상이 호출 가능한 객체인지 아닌지 확인해 주는 함수이다.a. 태스크 컨텍스트 변수를 통해 execution_date 추출
from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="python_test",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
)
def _get_data(execution_date):
year, month, day, hour, *_ = execution_date.timetuple()
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
output_path = "/tmp/wikipageviews.gz"
request.urlretrieve(url, output_path)
get_data = PythonOperator(task_id="get_data", python_callable=_get_data, dag=dag)
b. 키워드 인수를 통해 execution_date 추출
context
라고 작성해 준다. 이렇게 작성을 해 주게 되면 모든 콘텍스트 변수가 키워드 인수로 들어가게 된다.execution_date
와 next_execution_date
를 출력하는 코드를 작성해 본다면 다음과 같다.import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="python_context_test",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
def _print_context(**context):
start = context["execution_date"]
end = context["next_execution_date"]
print(f"Start: {start}, end: {end}")
#Start: 2019-07-13T14:00:00+00:00, end: 2019-07-13T15:00:00+00:00
print_context = PythonOperator(
task_id="print_context", python_callable=_print_context, dag=dag
)
c. 태스크 컨텍스트 변수와 키워드 인수를 통해 execution_date 추출
execution_date
변수를 알려 준다. 이때 키워드 인수보다 변수를 앞에 작성해야 하며 execution_date
변수에 값이 들어가게 되면 키워드 인수인 context
내부에는 execution_date
가 전달되지 않는다. def _get_data(execution_date, **context):
year, month, day, hour, *_ = execution_date.timetuple()
✍ 개인적으로 만약 두 개의 키워드 인수가 넘어간다면 어떻게 동작하게 될지 궁금했는데 다른 분이 해당 내용을 직접 airflow로 돌려 본 포스팅이 있었다. 만약 두 개의 키워드 인수 파라미터를 주게 되면 어떻게 될까?
get_data
에서 execution_date
를 따로 추출해 주지 않아도 output_path
를 구성할 수 있다. output_path
를 넣어 주는 것이다.def _get_data(output_path, **context):
year, month, day, hour, *_ = context["execution_date"].timetuple()
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
op_args
를 사용하는 방법과 op_kwargs
를 사용하는 방법이다. 전자는 변수로 제공이 되고, 후자는 키워드 인수로 제공이 된다.op_args = ["/tmp/wikipageviews.gz"]
, op_kwargs = {"output_path": "/tmp/wikipageviews.gz"}
다음과 같이 각각 콜러블 함수에서 전달된다. datetime
구성 요소를 따로 추출하지 않고 넘겨 줄 수도 있다. from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="output_path_test",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
)
def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
fetch_page_views
태스크에 해당한다. GOOGLE, AMAZON, APPLE, MICROSOFT, FACEBOOK
를 추출해 주었다.from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="fetch_test",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
max_active_runs=1,
)
def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
extract_gz = BashOperator(
task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)
def _fetch_pageviews(pagenames):
result = dict.fromkeys(pagenames, 0)
with open("/tmp/wikipageviews", "r") as f: #이전 태스크에서 작성한 파일 열기
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ") #줄에서 필요한 요소 추출
if domain_code == "en" and page_title in pagenames: #페이지명이 정해진 페이지 이름인지 확인하고 view_counts를 value로 설정
result[page_title] = view_counts
print(result)
#output: "{'Facebook': '778', 'Apple': '20', 'Google': '451', 'Amazon': '9', 'Microsoft': '119'}"
fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)
get_data >> extract_gz >> fetch_pageviews
postgres DB
에 넣어 주어야 된다라고 한다면 먼저 테이블을 생성해 주고, 해당 테이블에 추출한 값을 INSERT 쿼리를 통해 삽입해 주어야 한다.INSERT
구문 작성은 PythonOperator
가 제공해 주지만 결과를 데이터베이스에 입력하기 위해서는 XCom
을 사용해 주거나 영구적인 위치(디스크 또는 데이터베이스)
에 저장해 주어야 한다. (XCom
에 대한 설명은 5 장에 나온다고 함.)postgres DB
를 사용할 것이므로 PostgresOperator
를 이용해 데이터를 입력해 준다. PostgresOperator
클래스를 가지고 오기 위해서는 다음 공급자 패키지를 설치해 주어야 한다.pip install apache-airflow-providers-postgres
✍ 왜 공급자 패키지를 설치해야 하는가?
- Airflow는 Airflow 생태계를 위해 다양한 오퍼레이터를 통해 광범위한 외부 시스템 연결을 지원한다.
- 외부 시스템과 연결하기 위해서는 종종 부가적인 구성 요소를 설치해야 연결 후 통신할 수 있다.
- 그래서 postgres의 경우 추가적인 의존성 해결을 위해 앞서 말한 공급자 패키지를 설치해 주어야 하고, 다른 외부 시스템 역시 연계하기 위해서 여러 의존성 패키지를 설치해야 한다.
- 오케스트레이션 시스템의 특성 중 하나이다.
PostgreOperator
에 공급할 INSERT문을 작성한다.def _fetch_pageviews(pagenames, execution_date, **_):
result = dict.fromkeys(pagenames, 0) #페이지 뷰에 대한 결과를 0으로 초기화
with open("/tmp/wikipageviews", "r") as f:
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ")
if domain_code == "en" and page_title in pagenames:
result[page_title] = view_counts #페이지 뷰 저장
with open("/tmp/postgres_query.sql", "w") as f:
for pagename, pageviewcount in result.items(): #각 결과에 대한 SQL 쿼리 작성
f.write(
"INSERT INTO pageview_counts VALUES ("
f"'{pagename}', {pageviewcount}, '{execution_date}'"
");\n"
)
fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)
get_data >> extract_gz >> fetch_pageviews
SQL 쿼리
를 생성하는 부분이지 데이터를 직접적으로 INSERT
해 주는 부분이 아니다.from airflow.providers.postgres.operators.postgres import PostgresOperator
write_to_postgres = PostgresOperator(
task_id="write_to_postgres",
postgres_conn_id="my_postgres",
sql="postgres_query.sql",
dag=dag,
)
Jinja
를 사용할 때는 템플릿화할 수 있는 파일을 검색할 경로를 제공해야 하는데 /tmp
에 저장되면 Jinja
가 스스로 파일을 찾을 수 없다. 그래서 검색 경로를 추가하기 위해 DAG
에서 template_searchpath
의 인수를 주어 기본 경로와 추가 경로를 같이 탐색할 수 있게 해 준다. PostgresOperator
을 호출해 주어야 하고, 두 개의 인수를 넘겨 주어야 한다.postgres_conn_id
는 Postgres 데이터베이스에 대한 자격 증명 식별자이고, sql
은 실행할 sql 쿼리의 경로를 넣어 주면 된다.✍ 내부에서는 어떻게 자체적으로 처리해 주게 될까?
PostgresOperator
은 Postgres와 통신하기 위해훅(hook)
을 인스턴스화한다.- 이
훅(hook)
이 연결 생성, Postgres에 쿼리 전송 및 연결에 대한 종료 작업을 처리한다.- 오퍼레이터는 훅으로 사용자 요청을 전달하는 일만 한다.
- 오퍼레이터는 무엇을 해야 할지 결정하고 정한 무엇을 어떻게 해야 할지 결정한다.
- 그림에서 보는 것과 같이 훅은 Operator 내부에서 동작한다.