DAG 개념, 실습 / Airflow로 DAG 관리하기

아리엘 (Ariel)·2023년 7월 19일
2
post-thumbnail

1. ETL

데이터 파이프라인의 기능

  • 추출 (Extracting)
  • 변환 (Transforming)
  • 결합 (Combining)
  • 검증 (Validating)
  • 적재 (Loading)

ETL이란 데이터 파이프라인의 기능 중 추출, 변환, 적재 과정을 말한다. ETL은 데이터 파이프라인의 Subset으로 하위 개념에 해당한다.



2. DAG

2-1. DAG 개념

Airflow에서 ETL을 부르는 명칭이다.

  • 방향성이 있는 그래프라는 뜻으로 루프가 불가능하다.
  • DAG는 Task로 구성된다.
    • Task : DAG를 구성하는 작업 단위
    • ex) ETL DAG는 Extract, Transform, Load 3개의 Task로 구성될 수 있다.
    • Task는 Airflow의 Operator, Sensor, Hook으로 만들어진다. Operator를 주로 사용하며 Task를 어떻게 실행할지를 나타낸다.
      • Airflow에서 이미 다양한 종류의 오퍼레이터를 제공한다.
      • 경우에 맞게 사용 오퍼레이터를 결정하거나 필요하다면 직접 개발한다.
      • e.g., Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script

DAG 프로그래밍이라는 것은 할 일에 해당하는 오퍼레이터를 가져다 쓰거나, 오퍼레이터가 없다면 오퍼레이터 코드를 직접 구현하는 것을 말한다. 모든 일은 파이썬으로 진행된다.


2-2. DAG 예제


2-3. Task에 필요한 기본 정보

default_args = {
 'owner': 'keeyong',
 'start_date': datetime(2020, 8, 7, hour=0, minute=00),
 'end_date': datetime(2020, 8, 31, hour=23, minute=00),
 'email': ['keeyonghan@hotmail.com'], #에러 발생 시 누구에게 이메일을 보낼지 결정
 'retries': 1, #에러 발생 시 몇 번 재시도할 것인가
 'retry_delay': timedelta(minutes=3), #재시도 간 기다릴 시간
}


2-4. DAG에 필요한 기본 정보

from airflow import DAG

test_dag = DAG(
 "HelloWorld", #DAG이름
 schedule="0 9 * * *", #매일 9시 0분에 실행되는 DAG (시간은 UTC기준)
 tags=['test'] #Airflow에서 관리가 쉽도록 Tag를 단다.
 default_args=default_args #지정한 속성을 default_args로 전달한다.
)

schedule은 크론탭 문법을 따른다. (UTC 기준)

  • “0 * * * *”의 의미는? -> 0분에 실행한다. 즉, 0분은 매 시간마다 1번만 생기므로 1시간마다 실행되는 Hourly DAG이다.
  • “0 12 * * *”의 의미는? -> 0분 12시에 실행한다. 즉, 하루에 1번 실행하는 Daily DAG이다.
  • “30 6 * * 0”의 의미는? -> 30분 6시 매주 일요일에 실행한다. 즉, 일주일에 1번 실행하는 Weekly DAG이다.


3. DAG 실습

3-1. Operators 생성 예제 1

Bash는 Unix의 Command Line Shell이다. Bash Operator는 Bash라는 Shell Script를 실행하고, 명령어를 Command Line에 입력해 실행해주는 오퍼레이터이다. 실행할 명령어(Commnad)를 bash_command라는 파라미터에 적어준다. 이를 이용해 Task를 생성해보자.

from airflow.operators.bash import BashOperator #BashOperator 사용을 위해 호출

t1 = BashOperator(
 task_id='print_date', #Task 이름
 bash_command='date', #날짜를 출력한다.
 dag=test_dag) #속한 DAG이름
 
t2 = BashOperator(
 task_id='sleep', #Task 이름
 bash_command='sleep 5', #5초 동안 아무 동작을 하지 않는다.
 retries=3, #실패할 경우 재시도 횟수 
 dag=test_dag) #속한 DAG이름
 
t3 = BashOperator(
 task_id='ls', #Task 이름
 bash_command='ls /tmp', #tmp 디렉터리의 내용을 출력한다.
 dag=test_dag) #속한 DAG이름

t1, t2, t3 총 3개의 Task를 생성했으며 모두 test_dag라는 DAG에 속한다. 인자는 다음과 같다.

  • task_id #Task 이름
  • bash_command #Command Line에 입력할 명령문
  • retries #실패할 경우 재시도 횟수
  • dag #속한 DAG이름

각 bash_commnad에 전달한 명령어는 다음과 같다.

  • bash_command='date' : 날짜를 출력한다.
  • bash_command='sleep 5' : 5초 동안 아무 동작을 하지 않는다.
  • bash_command='ls /tmp' : tmp 디렉터리의 내용을 출력한다.

Task의 실행 순서를 다음과 같이 설정한다.

위의 관계를 아래와 같이 표현할 수 있다. upstream을 사용할 수도 있으나 일반적으로 사용하지 않는 표현이다.

# 방법 1
t1 >> t2
t1 >> t3

# 방법 2
t1 >> [t2, t3]

# 방법 3 : 일반적으로 사용하지 않는 표현
t2.set_upstream(t1) # t2의 상류로 t1을 세팅
t3.set_upstream(t1) # t3의 상류로 t1을 세팅


3-2. Operators 생성 예제 2

DummyOperator는 아무 기능을 하지 않는다. 많은 수의 Task를 동시에 실행하면 가장 먼저 수행되는 Task가 무엇인지 알기 어렵기 때문에 시작 시간과 마지막 시간을 마킹하는 영도로 쓰인다. 이번 실습에서는 start, end를 통해 dag의 시작, 마지막 시간을 정확히 알 수 있도록 DAG를 구성해본다. 이때 시작 시간과 마지막 시간은 생략이 가능하다.

from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator 

start = DummyOperator(dag=dag, task_id="start", *args, **kwargs)

t1 = BashOperator(
 task_id='ls1', #Task 이름
 bash_command='ls /tmp/downloaded', #download 폴더의 내용을 출력한다.
 retries=3, #실패 시 재시도할 횟수
 dag=dag) #Task가 속한 DAG
 
t2 = BashOperator(
 task_id='ls2', #Task 이름
 bash_command='ls /tmp/downloaded', #download 폴더의 내용을 출력한다.
 dag=dag) #Task가 속한 DAG
 
end = DummyOperator(dag=dag, task_id='end', *args, **kwargs)

Task의 실행 순서를 다음과 같이 설정한다.

위의 관계를 아래와 같이 표현할 수 있다.

# 방법 1
start >> t1 >> end
start >> t2 >> end

# 방법 2
start >> [t1, t2] >> end


4. Aiflow로 DAG 관리하기

4-1. Airflow 개념

Airflow는 Python 코드로 워크플로우(Workflow)를 작성하고, 스케쥴링, 모니터링 하는 플랫폼이다. Airflow를 통해서 데이터엔지니어링의 ETL 작업을 자동화하고, DAG 형태의 워크플로우 작성이 가능하다.


4-2. Airflow 설치

Docker / Airflow 설치
위의 문서를 참고해 설치한다.

📍 Airflow 설치 과정 개요
1. Airflow 설치를 위해 Docker 엔진을 먼저 설치한다.
2. Airflow 도커 이미지를 다운로드한다.
3. Airflow 도커 컨테이너를 실행한다.
4. Airflow를 웹 UI로 접근해 사용한다.

cf) 용어
도커 : 소프트웨어를 하나의 분리된 패키지로 만들어주는 것, 소프트웨어를 다른 환경과 완전히 분리된 가상의 컴퓨터처럼 분리된 환경을 만들어주는 것.
도커 이미지 : 소프트웨어 패키지
도커 컨테이너 : 도커 이미지를 실행하는 환경



4-3. Airflow 실행


7개의 컨테이너가 실행 중임을 볼 수 있다. init 컨테이너는 초기화 기능을 하며 한 번 실행 후 exit된 상태이다.

airflow-webserver 컨테이너 옆의 Port가 8080으로 포트 번호가 설정되어 있다. 해당 주소로 접속하면 Airflow를 웹 UI로 사용할 수 있다.

airflow docker 이미지는 자신이 실행된 폴더에 있는 dags 폴더를 찾아서 그 안의 dag들을 자동으로 웹 UI에 올려주는 형태로 동작한다.

dags 폴더 내에 있던 HelloWorld, HelloWorld_v2 대그를 볼 수 있다.
-> 개발할 때 dags 폴더 내에서 개발하면 airflow 인터페이스에서 바로 반영된다.


4-4. Airflow에서 DAG 알아보기

  1. HelloWorld 대그를 실행하고 싶을 경우 활성화한다.

  2. DAG를 클릭해서 세부단계로 들어간다.

    이 DAG는 2개의 Task (print_hello, print_goodbye)가 있다.

  3. print_hello 구성 살펴보기
    print_hello 클릭 -> Log 클릭

    중간 INFO - hello! 부분 -> 해당 Task의 기능은 hello!를 print하는 것
    execution date는 incremental update시 읽어와야 할 데이터의 날짜 정보이다.

  4. DAG 내부의 실행관계를 알고싶다면 상단 Graph를 클릭한다.

    print_hello가 끝나면 print_goodbye를 실행하는 DAG 실행 순서를 갖고 있다.

  5. 상단 메뉴를 통해 Code를 볼 수 있다.

dag 코드의 schedule은 0 2 * * * 이다.
-> 매일 1번 UTC 기준 2시 0분에 실행된다.

2개의 PythonOperator로 만든 Task가 2개 있다.
Task가 실행될 때 python_collable 인자로 지정해준 함수인 print_hello, print_goodbye가 실행된다.
함수의 내용은 원하는 것을 넣어준다.

실행순서는 다음의 코드로 지정해주었다.

#Assign the order of the tasks in our DAG
print_hello >> print_goodbye


4-5. Docker 컨테이너 실행 중지

  1. 터미널에 명령어 입력
docker compose down

  1. 터미널에서 ctrl + z 입력

  2. docker 엔진 프로그램에서 모두 선택 후

  • pause : 잠깐 중단, 이후 지금 상황 그대로 실행할 수 있다.
  • stop : 재실행 시 초기 상태로 시작된다.


이번 글에서는 DAG의 구조를 보기 위한 기초 실습과 Airflow를 이용한 DAG 관리 방법을 정리했습니다. 다음 글에서는 ETL을 DAG로 구현해 조금 더 깊이 알아보도록 하겠습니다.

ETL DAG 구현하기


📌 Source



💡 질문과 피드백은 댓글에 남겨주시기 바랍니다.
❤️ 도움이 되셨다면 공감 부탁드립니다.

profile
Data Analyst / Engineer

2개의 댓글

comment-user-thumbnail
2023년 7월 19일

너무 잘 읽었습니다, 많은 것을 배웠습니다.

1개의 답글