데이터 파이프라인의 기능
ETL이란 데이터 파이프라인의 기능 중 추출, 변환, 적재 과정을 말한다. ETL은 데이터 파이프라인의 Subset으로 하위 개념에 해당한다.
Airflow에서 ETL을 부르는 명칭이다.
DAG 프로그래밍이라는 것은 할 일에 해당하는 오퍼레이터를 가져다 쓰거나, 오퍼레이터가 없다면 오퍼레이터 코드를 직접 구현하는 것을 말한다. 모든 일은 파이썬으로 진행된다.
default_args = {
'owner': 'keeyong',
'start_date': datetime(2020, 8, 7, hour=0, minute=00),
'end_date': datetime(2020, 8, 31, hour=23, minute=00),
'email': ['keeyonghan@hotmail.com'], #에러 발생 시 누구에게 이메일을 보낼지 결정
'retries': 1, #에러 발생 시 몇 번 재시도할 것인가
'retry_delay': timedelta(minutes=3), #재시도 간 기다릴 시간
}
from airflow import DAG
test_dag = DAG(
"HelloWorld", #DAG이름
schedule="0 9 * * *", #매일 9시 0분에 실행되는 DAG (시간은 UTC기준)
tags=['test'] #Airflow에서 관리가 쉽도록 Tag를 단다.
default_args=default_args #지정한 속성을 default_args로 전달한다.
)
schedule은 크론탭 문법을 따른다. (UTC 기준)
Bash는 Unix의 Command Line Shell이다. Bash Operator는 Bash라는 Shell Script를 실행하고, 명령어를 Command Line에 입력해 실행해주는 오퍼레이터이다. 실행할 명령어(Commnad)를 bash_command라는 파라미터에 적어준다. 이를 이용해 Task를 생성해보자.
from airflow.operators.bash import BashOperator #BashOperator 사용을 위해 호출
t1 = BashOperator(
task_id='print_date', #Task 이름
bash_command='date', #날짜를 출력한다.
dag=test_dag) #속한 DAG이름
t2 = BashOperator(
task_id='sleep', #Task 이름
bash_command='sleep 5', #5초 동안 아무 동작을 하지 않는다.
retries=3, #실패할 경우 재시도 횟수
dag=test_dag) #속한 DAG이름
t3 = BashOperator(
task_id='ls', #Task 이름
bash_command='ls /tmp', #tmp 디렉터리의 내용을 출력한다.
dag=test_dag) #속한 DAG이름
t1, t2, t3 총 3개의 Task를 생성했으며 모두 test_dag라는 DAG에 속한다. 인자는 다음과 같다.
각 bash_commnad에 전달한 명령어는 다음과 같다.
Task의 실행 순서를 다음과 같이 설정한다.
위의 관계를 아래와 같이 표현할 수 있다. upstream을 사용할 수도 있으나 일반적으로 사용하지 않는 표현이다.
# 방법 1
t1 >> t2
t1 >> t3
# 방법 2
t1 >> [t2, t3]
# 방법 3 : 일반적으로 사용하지 않는 표현
t2.set_upstream(t1) # t2의 상류로 t1을 세팅
t3.set_upstream(t1) # t3의 상류로 t1을 세팅
DummyOperator는 아무 기능을 하지 않는다. 많은 수의 Task를 동시에 실행하면 가장 먼저 수행되는 Task가 무엇인지 알기 어렵기 때문에 시작 시간과 마지막 시간을 마킹하는 영도로 쓰인다. 이번 실습에서는 start, end를 통해 dag의 시작, 마지막 시간을 정확히 알 수 있도록 DAG를 구성해본다. 이때 시작 시간과 마지막 시간은 생략이 가능하다.
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(dag=dag, task_id="start", *args, **kwargs)
t1 = BashOperator(
task_id='ls1', #Task 이름
bash_command='ls /tmp/downloaded', #download 폴더의 내용을 출력한다.
retries=3, #실패 시 재시도할 횟수
dag=dag) #Task가 속한 DAG
t2 = BashOperator(
task_id='ls2', #Task 이름
bash_command='ls /tmp/downloaded', #download 폴더의 내용을 출력한다.
dag=dag) #Task가 속한 DAG
end = DummyOperator(dag=dag, task_id='end', *args, **kwargs)
Task의 실행 순서를 다음과 같이 설정한다.
위의 관계를 아래와 같이 표현할 수 있다.
# 방법 1
start >> t1 >> end
start >> t2 >> end
# 방법 2
start >> [t1, t2] >> end
Airflow는 Python 코드로 워크플로우(Workflow)를 작성하고, 스케쥴링, 모니터링 하는 플랫폼이다. Airflow를 통해서 데이터엔지니어링의 ETL 작업을 자동화하고, DAG 형태의 워크플로우 작성이 가능하다.
Docker / Airflow 설치
위의 문서를 참고해 설치한다.
📍 Airflow 설치 과정 개요
1. Airflow 설치를 위해 Docker 엔진을 먼저 설치한다.
2. Airflow 도커 이미지를 다운로드한다.
3. Airflow 도커 컨테이너를 실행한다.
4. Airflow를 웹 UI로 접근해 사용한다.
cf) 용어
도커 : 소프트웨어를 하나의 분리된 패키지로 만들어주는 것, 소프트웨어를 다른 환경과 완전히 분리된 가상의 컴퓨터처럼 분리된 환경을 만들어주는 것.
도커 이미지 : 소프트웨어 패키지
도커 컨테이너 : 도커 이미지를 실행하는 환경
7개의 컨테이너가 실행 중임을 볼 수 있다. init 컨테이너는 초기화 기능을 하며 한 번 실행 후 exit된 상태이다.
airflow-webserver 컨테이너 옆의 Port가 8080으로 포트 번호가 설정되어 있다. 해당 주소로 접속하면 Airflow를 웹 UI로 사용할 수 있다.
airflow docker 이미지는 자신이 실행된 폴더에 있는 dags 폴더를 찾아서 그 안의 dag들을 자동으로 웹 UI에 올려주는 형태로 동작한다.
dags 폴더 내에 있던 HelloWorld, HelloWorld_v2 대그를 볼 수 있다.
-> 개발할 때 dags 폴더 내에서 개발하면 airflow 인터페이스에서 바로 반영된다.
HelloWorld 대그를 실행하고 싶을 경우 활성화한다.
DAG를 클릭해서 세부단계로 들어간다.
이 DAG는 2개의 Task (print_hello, print_goodbye)가 있다.
print_hello 구성 살펴보기
print_hello 클릭 -> Log 클릭
중간 INFO - hello! 부분 -> 해당 Task의 기능은 hello!를 print하는 것
execution date는 incremental update시 읽어와야 할 데이터의 날짜 정보이다.
DAG 내부의 실행관계를 알고싶다면 상단 Graph를 클릭한다.
print_hello가 끝나면 print_goodbye를 실행하는 DAG 실행 순서를 갖고 있다.
상단 메뉴를 통해 Code를 볼 수 있다.
dag 코드의 schedule은 0 2 * * * 이다.
-> 매일 1번 UTC 기준 2시 0분에 실행된다.
2개의 PythonOperator로 만든 Task가 2개 있다.
Task가 실행될 때 python_collable 인자로 지정해준 함수인 print_hello, print_goodbye가 실행된다.
함수의 내용은 원하는 것을 넣어준다.
실행순서는 다음의 코드로 지정해주었다.
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
docker compose down
터미널에서 ctrl + z 입력
docker 엔진 프로그램에서 모두 선택 후
이번 글에서는 DAG의 구조를 보기 위한 기초 실습과 Airflow를 이용한 DAG 관리 방법을 정리했습니다. 다음 글에서는 ETL을 DAG로 구현해 조금 더 깊이 알아보도록 하겠습니다.
📌 Source
💡 질문과 피드백은 댓글에 남겨주시기 바랍니다.
❤️ 도움이 되셨다면 공감 부탁드립니다.
너무 잘 읽었습니다, 많은 것을 배웠습니다.