Apache Airflow 기반의 데이터 파이프라인 - Chapter02: Airflow DAG의 구조

김재민·2023년 6월 25일
1
post-thumbnail

1. Chapter02를 시작하기 전에


: Airflow 책을 학습하는 방법 을 참고하여 Github 에서 clone 받은 소스 중 Chapter02 경로로 이동한 후 아래와 같이 Docker Compose 명령어를 수행하여 Airflow 환경을 세팅한다.

Airflow 컴포넌트 컨테이너 실행

# 만약 chapter01 의 도커 컨테이너가 실행 중이라면 먼저  
# cd data-pipelines-with-apache-airflow/chapter01/ && docker-compose down 
# 명령어를 실행한다.

$ cd data-pipelines-with-apache-airflow/chapter02/
$ docker-compose up -d

Airflow WEB UI 접속

: 이 때 접속이 되지 않는다면, chapter02-init-1 컨테이너가 init 되어 종료된 후 다시 접속해 보면 될 것이다.

$ open http://localhost:8080


2. 로켓 발사 데이터 수집용 DAG 예제


Launch Library 2 오픈 API 데이터 확인

: chapter02 에서는 Launch Library 2 오픈 API 를 이용하여 로켓 발사 데이터를 수집 후 처리하는 DAG 를 예시로 설명한다. 아래 명령어를 터미널에서 실행하여 데이터를 확인해보자.

$ curl -L "https://ll.thespacedevs.com/2.0.0/launch/upcoming"

# -L, --location 옵션: 웹페이지의 정보를 가져올 때 주소가 바뀌어도 계속해서 정보를 찾아가게 해주는 명령어(Follow redirects)

# 반환 주요 정보(로켓 발사 정보)
## 로켓 ID, 로켓발사 시작 및 종료 시간 등
## 로켓 이미지 URL

DAG 의 TASK 흐름

: Chapter02 에서는 Launch Library 2 오픈 API 를 이용하여 아래 이미지와 같이 동작하는 DAG 예제를 소개 하고 있고, 작업은 3개의 Task 로 구성 된다. Task 의 흐름 순서는 화살표 방향과 동일하다.

Chapter02 - download_rocket_launches.py

: Chapter02/download_rocket_launches.py 예제에서는 위 그림의 과정을 수행 하기 위한 DAG 정의, Task 정의, Task 의존성(진행 방향) 정의를 어떻게 하는지 보여주고 있다. 나는 코드의 단락을 나누어 정리할 예정이다. 전체 코드를 한 번에 보기 위해서는 소제목에 링크를 걸어 두었으니 참고하길 바란다.

Airflow DAG 정의를 위한 라이브러리 호출

''' Airflow 를 정의하기 위한 라이브러리 호출 '''
import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

DAG 객체 정의

: 아래와 같이 dag_id 에 정의한 이름이 Airflow WEB UI 에서 DAG 이름으로 노출 된다. 또한 schedule_interval 에 정의한 실행 간격도 표시 된다.

''' Airflow DAG 객체 정의 '''
dag = DAG( # DAG 객체 정의 - 모든 워크플로우의 시작점이다.
    dag_id="download_rocket_launches", # Airflow WEB UI에 표시될 DAG의 이름이다.
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14), # 해당 DAG가 처음 실행을 시작하는 날짜이다.
    schedule_interval="@daily", # 해당 DAG가 주기적으로 실행 되는 간격을 정의한다. 여기서는 매일 00시(@daily) 로 지정하였다.
)

# 위 dag 와 같이 정의하고 실행 시키면 현재 시점의 14일 전부터 매일 dag 가 실행하여 총 14개의 dag 가 실행 될 것이다.(지금은 참고만 하면 된다.)

BashOperator - download_launches task 정의

: 해당 예제는 Launch Library 2 오픈 API 를 호출하여 결과값을 저장하는 작업을 download_launches 라는 Task 로 정의하고 있다. 이 때, airflow 의 BashOperator 라는 객체를 사용하여 Bash(Linux) 명령어로 작업을 수행할 수 있도록 정의 한다. 당장은 Operator 가 Task 를 수행하는 객체 정도라고만 생각하면 된다.

download_launches = BashOperator( # BashOperator 객체 선언
    task_id="download_launches", # 해당 작업(Task)의 아이디(이름) 정의한다. 또한, Airflow WEB UI의 Graph View 에서 task id 를 확인할 수 있다.
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501 # 해당 작업(Task) 가 수행 될 때 bash_command 에 선언해둔 명령어가 실행 된다.
    dag=dag, # 어떤 DAG 에 포함 되는 것인지 지정할 수 있다. 이런식으로 위에서 정의한 dag 객체를 선언해 주면 된다.
)

PythonOperator - get_pictures task 정의

: 로컬에 저장한 launches.json 파일에 있는 로켓 이미지 URL을 파싱 하여 이미지를 다운로드 받는 작업을 get_pictures 라는 Task 로 정의하고 있다. 이 때, airflow 의 PythonOperator 를 사용하는데, PythonOperator 를 선언하고 해당 오퍼레이터가 실행 될 때 실제로 수행 할 Python 함수 _get_pictures() 를 인자로 넘겨주어 사용할 수 있다.

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True) # 경로가 없으면 디렉토리 생성

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]] # 이 전 Task 에서 다운로드 받은 launches.json 파일에서 로켓 이미지 URL 을 추출
        for image_url in image_urls:
            try: # 로켓 이미지 다운로드 시작
                response = requests.get(image_url) 
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator( # PythonOperator 선언
    task_id="get_pictures",  # Task 아이디(이름) 정의
    python_callable=_get_pictures, # PythonOperator 가 실행 될 떄 실제로 수행할 Python 함수를 인자로 넘긴다.
    dag=dag # BashOperator 와 동일하게 어떤 DAG 에 포함 되는지 지정할 수 있다.
)

BashOperator - notify task 정의

: 로켓 이미지를 로컬에 다운로드 한 후 다운로드 결과를 알려주는 작업을 notify 라는 Task 로 정의하고 있다. 이번에도 BashOperator 를 사용하여 /tmp/images 경로에 존재하는 자료 개수를 알려준다.

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

Task 실행 순서 정의

: 위에서 정의한 3개의 Task 를 사용자 지정 순서에 맞게 동작하도록 shift 연산자를 이용하여 선언하고 있다. 이는 DAG 스크립트 맨 하단에 지정해 두면 된다.

download_launches >> get_pictures >> notify

위와 같이 선언하면, download_launches task 가 실행 완료 된 후 get_pictures task 가 실행 되고, 그 다음 get_pictures task 가 완료 되어야 notify task 가 실행 된다. Airflow 에서는 이를 의존성(dependency) 이라고 얘기한다. 즉, download_launches task 가 아직 실행되지 않았거나, 실패 하였다면 get_pictures task 는 실행 되지 않는다. 이는 Airflow WEB UI 의 Tree View 탭에서 확인할 수 있다.

DAG 실행 결과 확인

: 아래 이미지와 같이 download_rocket_launches DAG 를 활성화 시켜주면 자동으로 스케줄링 되어 실행 된다.
(만약, Dag 정의 부분에서 schedule_interval 값을 None 으로 설정할 경우 Dag 를 활성화 하여도 실행 되지 않는다. 이것은 Chapter03 에서 자세히 다룬다.)

: 실행 결과를 확인해보니 DAG 객체 정의 부분에서 주석으로 설명한 내용처럼 14 번 DAG 가 실행 되었고 중간에 download_launches task 가 실패한 부분이 보이는데, 의존성에서 언급한 바와 같이 앞 단계 task 가 실패하면 뒤에 task 는 실행 되지 않는 것을 확인할 수 있다.

📍 참고: download_rocket_launches DAG 를 실행 하였을 때 나 처럼 Task 실패 지점이 있을 수도 있고, 모두 Success 로 처리 되었을 수도 있다. 한 번에 14일 치 Dag 가 수행하면서 API 호출 제한이 걸려서 실패할 가능성이 있으니 크게 신경쓰지 않아도 된다.


3. Task 와 Operator 의 차이점


: 사실, Airflow 공식 문서나 지금 학습 중인 도서에서는 오퍼레이터와 태스크를 같은 개념으로 취급하고 두 단어를 혼용해서 사용한다. 때문에, 깊이 있게 차이점을 구분 하는 것에 시간을 많이 쏟지 않길 바란다.
(나처럼...)

Operator

: 단일 작업 수행 지점 또는 역할

Task

: Operator 의 래퍼(Wrapper) 또는 매니저(Manager) 역할

👨🏻: 'DAG 실행 시 각 작업을 수행하기 위해서 Task 라는 작업 매니저가 할당 되고, Task 안에서 사용자가 지정한 Operator 가 수행 된다' 정도의 개념으로 이해하면 될 것 같다. Task 가 매니저라고 언급 되는 이유는 Operator 의 상태를 체크하고 관리하기 때문이다. 또한 DAG 는 Operator 집합에 대한 실행, 정지, 예약, 의존성 보장 등 포함하는 '오케스트레이션' 역할이다.


4. Operator(Task) 사용 방법 요약


: 지금까지 코드를 보면서 하나의 DAG 안에서 기능적으로 각 작업 단계를 정의하기 위해서는 airflow 에서 제공하는 Operator 를 사용하면 된다는 것을 알았을 것이다. 실제로, Airflow 는 BashOperator, PythonOperator 외에도 다양한 Operator 객체를 제공한다. 따라서, 구현하려는 기능에 맞게 Operator 를 찾아서 사용하면 된다.
이 때, 사용하고 싶은 Operator 를 정의하면서 task_id 를 지정해야 한다는 것도 자연스럽게 알 수 있을 것이다. Operator 에 설정 된 task_id 값이 Airflow WEB UI 그래프에서 task 이름으로 노출 되는 것이다.


참고


profile
안녕하세요. 데이터 엔지니어 김재민 입니다.

0개의 댓글