Airflow 조그맣게 시작하기 - dag 완료 후 알림 받기

햄도·2021년 6월 15일
1

Airflow slowstart

목록 보기
5/6

알림의 스팸화

원래 dag의 on_success_callback과 on_failure_callback만 이용하여 알림을 받고 있었는데, 한 dag당 여러 task를 수행하다 보니 당장 대응하지 않아도 되는 task에도 알림이 계속 와서 스팸처럼 느껴졌다.

task가 아닌 dag에 콜백을 설정할 수는 없는지 찾아보니, 그런 방법은 아직 없고 알림을 발송하는 task를 가장 마지막 task로 추가하면 될 것 같았다.

(참고한 글 - dag 시작 시 알림을 보내는 케이스)

요구사항은 다음과 같다.

  • 모든 task가 끝나면 성공하든 실패하든 알림을 보낸다.
  • 알림에는 각 task의 상태를 보여준다. (success/failed)
    • failed task가 하나라도 있는 경우 수행 실패, 그 외 수행 완료를 표기한다.
  • 다른 dag에서도 편하게 사용할 수 있도록 만든다.

구현 과정

모든 task가 끝나면 성공하든 실패하든 알림을 보낸다.

테스트를 위해 test dag에 task 세 개를 생성하고, 셋 중 하나는 실패하도록 했다.

import pendulum
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from utils.noti.hangouts import send_card_message
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator

ssh_hook = SSHHook(ssh_conn_id="my_connection")
KST = pendulum.timezone("Asia/Seoul")
args = {
    "owner": "hamdoe",
}

def send_execute_message(**kwargs):
    # 메세지 전송
    pass

dag = DAG(
    dag_id="test",
    default_args=args,
    start_date=datetime(2021, 6, 10, tzinfo=KST),
    schedule_interval=None,
)
# 병렬 수행 task를 묶어주는 역할
t0 = DummyOperator(
    task_id="dummy_task",
    dag=dag,
)

t1 = SSHOperator(
    task_id="task1",
    command="df",
    ssh_hook=ssh_hook,
    dag=dag,
)
t2 = SSHOperator(
    task_id="task2",
    command="echo $ZONE",
    ssh_hook=ssh_hook,
    dag=dag,
)
# test가 존재하지 않는 command이기 때문에 실패한다.
t3 = SSHOperator(
    task_id="task3",
    command="test",
    ssh_hook=ssh_hook,
    dag=dag,
)
tasks = [t1, t2, t3]

send_message = PythonOperator(
    task_id="send_message",
    python_callable=send_execute_message,
)

t0 >> tasks >> send_message

위와 같이 task1~3이 모두 수행된 후 send_message가 수행되도록 했는데, 메세지가 전송되지 않았다.

찾아보니 send_message의 trigger_rule을 바꿔줘야 했다. 이 값을 설정해주지 않으면 해당 task의 모든 upstream task가 성공한 후 수행되게 된다. (airflow trigger rules)

내가 하려는 것은 성공하든 실패하든 모든 upstream task가 끝난 후 알림을 보내는 것이기 때문에, PythonOperator에 trigger_rule을 'all_done'으로 넘겨주었더니 정상적으로 수행됐다.

send_message = PythonOperator(
    task_id="send_message",
    python_callable=send_execute_message,
    trigger_rule="all_done",
)

알림에는 각 task의 상태를 보여준다.

task 상태는 어디에서 가져와야 할까?

구글링한 결과 아래와 같은 예시가 있었다.

from airflow.models import TaskInstance

dag_instance = kwargs['dag']
operator_instance = dag_instance.get_task("task_id")
task_status = TaskInstance(operator_instance, execution_date).current_state()

kwargs는 대체 뭔가.. PythonOperator를 수행할 때 callable에 kwargs로 다음과 같은 것들이 전달된다.

  • conf: 사용중인 ConfigParser 객체
  • dag: 어떤 dag를 수행중인지
  • dag_run: 수행중인 dag 인스턴스
  • ds: 수행일, '2021-06-15' 형태
  • ds_nodash: nodash 수행일, '20210615' 형태
  • execution_date: 주문일, DateTime(2021, 6, 15, 1, 44, 43,146969, tzinfo=Timezone('+00:00')) 형태
  • task: 현재 task
  • task_instance: 현재 task 인스턴스
  • 자세한 내용

원래는 위 정보들이 dag 실행시 context라는 변수에 전달되고, PythonOperator의 provide_context 인자를 True로 전달해야 kwargs에 전달되었던 것 같다. (구버전 참고)

context가 뭔지, 어디에서 넘겨주는건지는 모르겠다. jinja template에서 사용할 수 있는 데이터들과 동일한 것 같긴 한데 공식 문서에는 정의되어있는 곳을 찾을 수가 없다..

airflow 2부터는 provide_context가 deprecated되었고, PythonOperator에 전달되는 callable에는 context와 사용자가 op_kwargs로 입력한 값들이 같이 들어간다.

# PythonOperator의 execute 메소드
def execute(self, context: Dict):
  context.update(self.op_kwargs)
  context['templates_dict'] = self.templates_dict

  self.op_kwargs = determine_kwargs(self.python_callable, self.op_args, context)

  return_value = self.execute_callable()
  self.log.info("Done. Returned value was: %s", return_value)
  return return_value

callable에서 다음과 같이 받아서 사용할 수 있다.

def print_context(ds, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print_the_context',
    python_callable=print_context,
)

문제는 위 예시와 같이 구현해도 TaskInstance(operator_instance, execution_date).current_state() 의 결과가 None 이었다.

몇 차례 바꿔서 시도한 끝에, 현재 Dag에서 수행한 task와는 관련이 없는, 새 TaskInstance의 state를 가지고 오기 때문에 None이 되는 것 같다는 결론을 냈다.

PythonOperator, DagRun, TaskInstance 코드까지 까보고 현재 Dag에서 수행한 task들의 상태를 가져올 수 있었다.

def send_execute_message(**kwargs):
    title = f"{kwargs['dag'].dag_id} 수행 결과"
    message = ""
    # dag_run 객체를 가져온다.
    dag_instance = kwargs["dag_run"]
    # dag_run에서 task instances를 가져온다.
    task_instances = dag_instance.get_task_instances()
    completed = True
    for ti in task_instances:
        # 각 task instance의 id와 state를 확인한다.
        task_id = ti.task_id
        state = ti.current_state()
        if state == "failed":
            completed = False
        message += f"task {task_id} state ::: {state}\n"
    if completed:
        status = "수행 완료"
    else:
        status = "수행 실패"
    send_message(title=title, status=status, message=message)

다른 dag에서도 편하게 사용할 수 있도록 만든다.

위와 같이 알림을 보내는 모듈을 다른 dag에서도 사용하도록 하고 싶은데, Custom operator를 만들거나 subdag를 만드는 방법 외에 딱히 좋은 방법이 없어보였다.

일단은 send_execute_message를 다른 모듈로 분리하고 각 dag에서 import해 PythonOpertor만 만드는 방식으로 사용하려 한다.

참고

profile
developer hamdoe

3개의 댓글

comment-user-thumbnail
2021년 11월 5일

오 좋은 글 감사합니다!

답글 달기
comment-user-thumbnail
2021년 11월 28일

멋지네요!! 많이 배워갑니다

답글 달기
comment-user-thumbnail
2021년 12월 23일

오 좋은정보 감샇삽니다!
email이랑 slack이랑 동시에 실행결과 메시지를 보내고 싶엇는데, 이걸로 해보려고 합니다!

답글 달기