Airflow

오민석·2022년 7월 9일
0

Airflow란

Scheduler - 가장 중요한 역할을 수행하며 모든 DAG와 태스크를 모니터링하고 관리한다. 주기적으로 실행해야 할 태스크를 찾고 해당 태스크를 실행 가능한 상태로 변경한다.

Webserver(Flask) - Airflow 웹 UI 서버

MetaDB - Airflow 메타데이터 저장소입니다. 어떤 DAG가 존재하고 어떤 태스크로 구성되었는지, 어떤 태스크가 실행 중이고, 또 실행 가능한 상태인지 등의 많은 정보가 있다.

Executor - 태스크 인스턴스를 실행하는 주체. Local, Celery, 쿠버네티스 등 종류가 다양하다.

Worker - 실제 작업을 수행하는 주체.

Nifi 비교해서 Airflow 장단점

장점

  1. 운영환경이 커지고 로직이 복잡해질 수록 Nifi에서 UI기반으로 트래킹하는게 쉽지 않다. 반면 Airflow에서는 Dag 및 Sub Dag으로 구성되어있어고 python code기반이라 확인하기 쉽다.

  2. ETL외에도 ML 등 다양하게 사용할 수 있다. 예를들어, test 데이터를 A,B,C M/L 모델에 학습시키고 10점이 넘는 모델만 선택한다고 할 때도, branching operation을 통해서 해당 작업 파이프라인을 구축할 수 있다.

  3. NIFI는 플로우(DAG)아닌 프로세서 단위로 동작해서 공통 workflow 공유 못한다.

  4. 처리량 증가 시 scale out 해야하는데 NIFI는 각 processor 들이 disable 하지 않는 이상 resource을 잡고 있어서 효과가 없음

  5. 성공/실패 탐지 가능하긴 하나 구현로직에 의존함.

단점

  1. Nifi는 Data를 A->B로 옮기는 이러한 작업에 특화되어있어서 publishKafka, GetHDFS, splitText 등 다양한 Processor들이 있다. 반면, Airflow는 ETL 툴이라기 보다 workflow를 코드로 관리하고, task 별 및 DAG 별로 dependency를 설정할 수 있다.

  2. Nifi는 Processor 사이에서 List queue를 보면 data가 어떻게 변환되었는지를 시각적으로 잘 확인할 수 있는 반면 Airflow는 task별 log에서 확인가능하다.

DAG 성능 포인트

Airflow는 Scale-out할 수 있는 방안들 중에서 executor의 개수를 늘리는 방법이 있다. Celery의 예시로 보면, 메세지 전달을 받아 비동기 작업을 하는 것을 worker라고 하는데 Celery가 Broker(RabbitMQ, redis 등)에서 메세지 받아 작업을 수행하게 된다. Airflow에서는 parallelism(32개), concurrency(16) 등 Airflow 클러스 전체에서 동시 수행 가능한 task 인스스턴스 개수, 혹은 하나의 DAG 내에서 동시 수행 가능한 task 개수 제한 등 병렬작업 개수 제한 관련 옵션 값들이 존재한다. 이러한 값들을 더 높이고, 더 나은 성능을 위해서는 Celery의 worker node의 개수를 늘리는 방안을 선택해야 한다.

다음과 같이 특정 task 별로 특정 worker node에 지정할 수 있다.

1번 node -> airflow worker -q q1
2번 node -> airflow worker -q q1

t1 = BashOperator(
    task_id='t1',
    bash_command="sleep 1",
    queue="q1",
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command="sleep 1",
    queue="q2",
    dag=dag)

Celery Executor 아키텍처

Workers는 DAG File, Meta DB 통해서 실행 관련 정보 읽어서 작업을 수행하고 결과를 Result backend에 저장.

Scheduler는 workers가 수행한 결과를 Result backend에서 읽고 DAG와 task를 스케쥴링한다.

Multi-node Cluster 구축 예제

https://skwrites.in/setting-up-apache-airflow-cluster/

작업 실패 시

기본적으로 DAG전체 및 Task 별로 retry를 지정할 수 있지만 에러가 나서 코드 수정 후 재처리 시 DAG 전체 / Task 별 재실행 하는 방법이다.

Re-run DAG

1. Catchup, Backfill

  • Catchup=False
    DAG 구성 시 파라미터로 Catchup=True 하면 정해진 시간에 실행되지 못한 DAG를 실행한다.

  • Backfill
    Catch=True로 지정되어 있으면 start_date, end_date로 조정하여 실행범위를 조절할 수 있다.

ex. 지정한 기간 동안 backfill수행(수행하지 않은 날짜만)
airflow dags backfill --start-date START_DATE --end-date END_DATE dag_id
ex. 지정한 기간 동안 backfill수행(모든 날짜)
airflow dags backfill --start-date START_DATE --end-date END_DATE --reset-dagruns dag_id

2. UI(Trigger DAG)

3. CLI

airflow tasks clear

Re-run Task

1. UI

2. CLI

airflow tasks clear

task상태 알림 받기(slack)

DAG의 deafult_args에 on_failure_callback 혹은 Operator에서 on_failure_callback에서 작업 fail 시 slack, email 등으로 보내는 로직을 작성하면 된다. callback 함수에서 dag_id, task_id, execution_date 등 값을 확인할 수 있다.

from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator

alert.py

class SlackAlert:
    def __init__(self, channel):
        self.slack_channel = channel
        self.slack_token = BaseHook.get_connection('slack').password

    def slack_fail_alert(self, context):
        alert = SlackAPIPostOperator(
            task_id='slack_failed',
            channel=self.slack_channel,
            token=self.slack_token,
            text="""
                :red_circle: Task Failed.
                *Task*: {task}  
                *Dag*: {dag}
                *Execution Time*: {exec_date}  
                *Log Url*: {log_url}
                """.format(
                    task=context.get('task_instance').task_id,
                    dag=context.get('task_instance').dag_id,
                    exec_date=context.get('execution_date'),
                    log_url=context.get('task_instance').log_url,
                    )
                  )
        return alert.execute(context=context)

test.py

alert = SlackAlert('#slack_test') # 슬랙 채널명

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'on_failure_callback': alert.slack_fail_alert
}

dag = DAG(
   'slack',
    default_args=default_args,
    schedule_interval='0 10 * * *',
)

t1 = BashOperator(
    task_id='test_bash',
    bash_command='date',
    dag=dag,
)

trigger_rules

기본적으로 upstream의 task가 모두 성공일 때 다음 task가 실행된다. 하지만 상황에 따라서 해당 rule를 변경할 수 있다. 예를 들어, 이전 작업의 성공 유무 없이 email알림을 받아야 할 때가 있다. all_success, all_done등 다양한 옵션이 있으니 자세한 내용은 공식 다큐먼트 참고

task4 = DummyOperator(task_id='task4', dag=dag,
                      trigger_rule=TriggerRule.ALL_DONE)

Scheduler SPOF

Scheduler가 하나의 Node에만 있으면 SPOF가 되기 때문에
https://github.com/teamclairvoyant/airflow-scheduler-failover-controller 해당 문서 참고하여 ASFC를 이용하면 된다. 각 master node에 ASFC(daemon)를 설치하고, master node가 ssh 연결을 설정하면 master node끼리 지속적으로 heartbeat를 주고 받아서 master node가 죽으면 standby node를 실행하게 되는 메커니즘을 갖는다.

추가사항

Execution Date

run,test,backfill,trigger등 CLI환경에서 설정할 수 있는 유일한 parmaeter로 execution_date를 명시해야한다. 스케쥴러로 실행할 때는 자동으로 설정된다. 해당 parameter는 pipeline 실패 혹은 재처리 시 사용된다. start_date, end_date가 execution_date 기반이기 때문에 해당 변수값을 사용하게 된다. 마치 transaction_id 처럼 고유 id라고 볼 수 있다.

예제.
usage: airflow test dag_id task_id execution_date
airflow test client-log-batch RevenueMonthly 2019-06-10

10분 단위로 batch인데 Execution Date는 logical 상의 시간이고, UI에서의 start date와 end date는 실제 작업의 시작/종료 시간이다. script에서의 start_date와는 다른 개념이다. Airflow의 스케쥴링 컨셉은 일배치면 하루 전 기준으로 돌고, 시간배치면 시간 전 기준으로 돈다. 예를 들어 하루에 한 번 12시에 도는 컨셉이면 2021-03-08 12:00 에 2021-03-07 12:00기준으로 실행된다.

Reference

https://velog.io/@hyunwoozz/airflow%EC%9D%98-%EC%8B%B1%EA%B8%80-%EB%85%B8%EB%93%9C-%EB%A9%80%ED%8B%B0-%EB%85%B8%EB%93%9C-%EC%95%84%ED%82%A4%ED%85%8D%EC%B3%90

https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/

https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html

https://dydwnsekd.tistory.com/98

https://velog.io/@jjongbumeee/Airflow4

(airflow 재처리)
https://sheerheart.tistory.com/entry/Apache-Airflow-%EC%9E%AC%EC%88%98%ED%96%89-%EB%B0%A9%EB%B2%95-%EC%A0%95%EB%A6%AC

(task상태 알림 받기)
https://jaeyung1001.tistory.com/241

https://medium.com/@knoldus/apache-airflow-automate-email-alerts-for-task-status-c1b7e6b19c7e

https://velog.io/@hamdoe/Airflow-%EC%A1%B0%EA%B7%B8%EB%A7%A3%EA%B2%8C-%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0-dag-%EC%99%84%EB%A3%8C-%ED%9B%84-%EC%95%8C%EB%A6%BC-%EB%B0%9B%EA%B8%B0

0개의 댓글