[내일 출근인데 어떡하지] python 코드에 Airflow 적용하기

Judy·2023년 6월 4일
0

데이터 수집 파이프라인을 Airflow 로 구축해 주세요

지금까지는 데이터 수집 파이프라인을 간단히 쉘 스크립트로 만들어 이용했던 Judy,
이번에는 Airflow 를 이용해 보라는 퀘스트를 받았습니다 😵‍💫
Airflow 란 무엇이고, 어떻게 코드를 작성하고 사용하면 되는지
오늘도 출근 전날에 빠르게 읽어보면 바로 실전에 뛰어들 수 있도록 정리해 보았습니다!

(이 글에서 설명하는 python 코드와 airflow DAG 코드는 모두 제 GitHub 에 올려 두었습니다.)

1. Airflow?

Airflow는 데이터 파이프라인을 관리하기 위한 오픈 소스 플랫폼입니다. 데이터 파이프라인은 데이터의 흐름을 관리하고 데이터 처리 작업을 자동화하기 위해 사용됩니다. Airflow는 작업들 간의 종속성을 정의하고 예약된 작업을 실행하며, 작업의 상태와 진행 상황을 모니터링할 수 있습니다.

Airflow의 핵심 개념은 작업(task)과 작업 간의 종속성(dag dependency)입니다. 작업은 데이터 처리 또는 작업 단위로 볼 수 있으며, Python으로 작성된 작업들을 포함할 수 있습니다. 작업들은 작업 간의 종속성을 정의하여 실행 순서를 결정할 수 있습니다. 이러한 종속성은 방향성 비순환 그래프(Directed Acyclic Graph, DAG)로 표현됩니다.


https://zzsza.github.io/data/2018/01/04/airflow-1/

Airflow 구성 요소

  • Airflow Webserver - 웹 UI를 표현하고, workflow 상태 표시하고 실행, 재시작, 수동 조작, 로그 확인 등 가능
  • Airflow Scheduler
    • 작업 기준이 충족되는지 여부를 확인
    • 종속 작업이 성공적으로 완료되었고, 예약 간격이 주어지면 실행할 수 있는 작업인지, 실행 조건이 충족되는지 등
    • 위 충족 여부가 DB에 기록되면, task들이 worker에게 선택되서 작업을 실행함
  • Airflow worker
    • 실제로 작업이 이루어지는 서버

DAG

  • Directed Acyclic Graph의 약자
  • Airflow에선 workflow라고 설명함
  • Task의 집합체

이 글에서 Airflow 설치 방법은 생략하겠습니다.
(저는 Docker 컨테이너를 만들어 설치했는데, 지난 '내출어' 를 참고하시면 저와 똑같이 설치하실 수 있어요!)

2. Practice!

Airflow 공부를 하는 동안 여러 가지 튜토리얼과 블로그, 책을 참고했는데요,
개인적으로 저는 우선 원하는 코드를 python 으로 짜 보고,
이 코드를 Airflow DAG 로 변환하는 것이 가장 스스로가 이해하기에 쉬웠습니다.

제 나름대로의 튜토리얼을 만들기 위해 세운 기준은 다음 4가지였는데요,

  1. 최소 3개 이상의 함수 사용
  • 각 작업이 순차적으로 실행되는 것을 충분히 검증하기 위해
  1. 한 개 이상의 함수에서 한 개 이상의 인자 받기
  • 함수 인자 전달 로직을 검증하기 위해
  1. 하나 이상의 함수에서 리턴값 존재
  • 함수 리턴값 검증을 위해
  1. 하나 이상의 함수에서 선행 함수의 리턴값을 인자로 받아 이용하기
  • 함수 간 정보 전달이 원활하게 이루어짐을 검증하기 위해

이 기준들을 만족해야 작업이 꼬리에 꼬리를 무는 파이프라인으로서 완성도를 보장할 수 있다고 생각했기 때문입니다.
(개인적인 생각입니다. ^^;;)

2-1. Pipeline 정의

저는 다음과 같은 간단한 작업을 수행하는 pipeline 을 정의했습니다.

  1. 숫자가 담긴 리스트 정의
  2. 리스트에 임의의 숫자 추가
  3. 리스트의 합계 계산
  4. 계산된 합계를 output.txt 파일에 저장

2-2. Python code

각 작업을 함수로 만들고, main 함수에서 순차적으로 함수를 호출하여 실행합니다.
각 함수는 지정된 작업을 수행하고, 적절하게 값을 리턴하고 인자를 받아 작업에 활용합니다.

# 1. Get 'num_list' as an argument of function
# 2. Insert 10 to 'num_list'
# 3. Return list
def func_insert_list(num_list):
    num_list.append(10)
    return num_list

# 1. Get num_list (Return value of 'func_get_list')
# 2. Get sum value of the list
# 3. Return sum value
def func_sum_list(response_num_list):
    sum_val = sum(response_num_list)
    return sum_val

# Store data to output.txt
def func_save(sum_val):
    # Get return value of function 'func_sum_list' using task id 'id_sum_list'
    with open("output.txt", "w") as file:
        file.write(f"Sum of list: {sum_val}")


if __name__ == "__main__":
    num_list = [1, 2, 3]
    response_num_list = func_insert_list(num_list)
    sum_val = func_sum_list(response_num_list)
    func_save(sum_val)

2-3. Airflow DAG

2-2 에서 만든 python 코드를 약간 수정하여 Airflow DAG 로 만들어 보겠습니다.

(1) 함수 정의

선행 함수의 return 값을 이용하기 위해 xcom 코드를 함수에 추가합니다.
(선행 함수의 task_id 는 (3) Task 정의 - DAG 의 PythonOperator 에 명시합니다.)
context['task_instance'].xcom_pull( task_ids='[선행 함수의 task_id]')

Airflow는 task 간에 작업 결과를 공유하기 위해(= 데이터를 전달하고 저장하기 위해) XCom이라는 데이터 교환 메커니즘을 제공합니다.

context는 현재 작업의 실행 컨텍스트를 나타내는 객체입니다. context['task_instance']는 현재 작업 인스턴스에 대한 참조를 제공합니다. xcom_pull 함수는 이 작업 인스턴스를 사용하여 다른 작업의 결과를 가져옵니다.

위의 코드에서는 'id_insert_number_list' 작업의 결과를 가져오기 위해 xcom_pull 함수를 사용하고 있습니다. 이를 통해 'id_insert_number_list' 작업에서 생성된 데이터를 현재 작업에서 활용할 수 있습니다. 결과를 가져올 때는 xcom_pull 함수의 task_ids 인자에 해당 작업의 식별자를 지정합니다.

# 1. Get 'num_list' as an argument of function
# 2. Insert 10 to 'num_list'
# 3. Return list
def func_insert_list(num_list):
    num_list.append(10)
    return num_list

# 1. Get num_list (Return value of 'func_get_list')
# 2. Get sum value of the list
# 3. Return sum value
def func_sum_list(**context):
    # Get return value of funtion 'func_insert_list' using task id 'id_insert_number_list'
    response_num_list = context['task_instance'].xcom_pull(
        task_ids='id_insert_number_list')
    logging.info(response_num_list)
    sum_val = sum(response_num_list)
    return sum_val

# Store data to output.txt
def func_save(**context):
    # Get return value of function 'func_sum_list' using task id 'id_sum_list'
    sum_val = context['task_instance'].xcom_pull(task_ids='id_sum_list')
    logging.info(f"Sum of list: {sum_val}")
    with open("output.txt", "w") as file:
        file.write(f"Sum of list: {sum_val}")

(2) DAG 정의

Pipeline 을 Airflow DAG 로 만들어 주기 위해서 가장 먼저 DAG 를 정의해야 합니다.

with DAG(...) as dag: 안에는 기본적인 DAG 수행 조건을 정의해 줍니다.

  • DAG 이름
  • 시작 시간
  • 반복 조건 (몇 분마다 실행할지, 한 번만 실행할지 등등...)
  • 함수 리턴값을 이용하는 경우 context 사용 여부 설정
default_args = {
    'owner': 'airflow',
    "start_date": datetime(2023, 1, 1),
    # Receive the context information as kwargs in the function
    "provide_context": True
}

with DAG(
    # DAG name
    'sum_of_list',
    default_args=default_args,
    # Run every 10 minutes
    schedule_interval=timedelta(minutes=10),
) as dag:

(3) Task 정의

이어서 각각의 python task 를 pythonOperator 를 이용해 정의해 줍니다.

  • insert_list는 Airflow에서 정의된 작업(Operator)을 나타내는 객체입니다. 이 작업은 Python 함수인 func_insert_list를 실행하는 역할을 담당합니다.
	# Define task
    # Airflow에서 정의된 작업(Operator)을 나타내는 객체
    # Python 함수인 func_insert_list를 실행하는 역할
    # task 실행 순서를 지정할 때 이용함
    insert_list = PythonOperator(
    
        # Define task ID
        # task 의 고유 식별자
        # 함수 리턴값이 있는 경우 함수 정의에서 사용됨
        task_id='id_insert_number_list',
        
        # Function to run
        # 실행할 함수
        python_callable=func_insert_list,
        # Specify the arguments for the function
        
        # 함수 인자가 있는 경우 인자 명시
        op_args=[num_list],
        
        # dag
        dag=dag
    )

    # Define task
    sum_list = PythonOperator(
        # Define task ID
        task_id='id_sum_list',
        # Function to run
        python_callable=func_sum_list,
        dag=dag
    )

    # Define task
    save = PythonOperator(
        # Define task ID
        task_id='id_saving_data',
        # Function to run
        python_callable=func_save,
        dag=dag
    )

함수 인자 및 리턴값을 사용하는 경우 어디에 뭘 추가할지 헷갈리는데요,

  • 함수가 인자를 받는 경우
    • task 를 정의할 때 op_args 를 추가하고
  • 함수가 값을 리턴하는 경우
    • 함수를 정의할 때 xcom 코드를 추가해 줍니다.

(4) Task 실행 순서 정의

모든 정의가 끝나면 >> 연산자를 이용해 task 실행 순서를 정의해 줍니다.

	# Task relationship
    insert_list >> sum_list >> save

(5) 전체 코드

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import logging


# 1. Get 'num_list' as an argument of function
# 2. Insert 10 to 'num_list'
# 3. Return list
def func_insert_list(num_list):
    num_list.append(10)
    return num_list

# 1. Get num_list (Return value of 'func_get_list')
# 2. Get sum value of the list
# 3. Return sum value
def func_sum_list(**context):
    # Get return value of funtion 'func_insert_list' using task id 'id_insert_number_list'
    response_num_list = context['task_instance'].xcom_pull(
        task_ids='id_insert_number_list')
    logging.info(response_num_list)
    sum_val = sum(response_num_list)
    return sum_val

# Store data to output.txt
def func_save(**context):
    # Get return value of function 'func_sum_list' using task id 'id_sum_list'
    sum_val = context['task_instance'].xcom_pull(task_ids='id_sum_list')
    logging.info(f"Sum of list: {sum_val}")
    with open("output.txt", "w") as file:
        file.write(f"Sum of list: {sum_val}")

#########################################################
# Define DAG

default_args = {
    'owner': 'airflow',
    "start_date": datetime(2023, 1, 1),
    # Receive the context information as kwargs in the function
    "provide_context": True
}

with DAG(
    # DAG name
    'sum_of_list',
    default_args=default_args,
    # Run every 10 minutes
    schedule_interval=timedelta(minutes=10),
) as dag:

    # Define number list
    num_list = [1, 2, 3]

    # Define task
    insert_list = PythonOperator(
        # Define task ID
        task_id='id_insert_number_list',
        # Function to run
        python_callable=func_insert_list,
        # Specify the arguments for the function
        op_args=[num_list],
        dag=dag
    )

    # Define task
    sum_list = PythonOperator(
        # Define task ID
        task_id='id_sum_list',
        # Function to run
        python_callable=func_sum_list,
        dag=dag
    )

    # Define task
    save = PythonOperator(
        # Define task ID
        task_id='id_saving_data',
        # Function to run
        python_callable=func_save,
        dag=dag
    )

    # Task relationship
    insert_list >> sum_list >> save

Reference

공식문서
https://airflow.apache.org/docs/apache-airflow/stable/index.html

Apache Airflow - Workflow 관리 도구(1)
https://zzsza.github.io/data/2018/01/04/airflow-1/

AirFlow DAG 소개와 기본 구조
https://www.bearpooh.com/151

Airflow Xcom 인터페이스
https://velog.io/@gnlenfn/Airflow-Xcom-%EC%9D%B8%ED%84%B0%ED%8E%98%EC%9D%B4%EC%8A%A4

profile
NLP Researcher

0개의 댓글