Airflow의 Sensor는 특정 조건이나 상태를 감지하고, 그에 따라 작업을 수행하는 Airflow에서 제공하는 Operator입니다. DAG에서 특정 이벤트가 수행되었는지 확인하고 그 다음 Downstream task를 진행하게 됩니다.
Airflow에서는 다양한 이벤트에 대응할 수 있는 여러가지 Sensor들을 제공하고 있습니다.
주로 의존성에 대한 관리가 필요할 때 사용합니다.
데이터를 수집하는 task를 가지고 있는 DAG A가 있고, 수집한 데이터를 처리하는 task를 가지고 있는 DAG B가 있다고 가정해봅시다. 이때 DAG B에서 ExternalTaskSensor를 사용하면 DAG A가 완료될 때 까지 대기할 수 있습니다.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def collect_data():
print('done')
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 3, 1),
}
dag_a = DAG('dag_a', default_args=default_args, schedule_interval='@daily')
start_task = DummyOperator(task_id='start', dag=dag_a)
collect_data_task = PythonOperator(
task_id='collect_data',
python_callable=collect_data,
dag=dag_a,
)
end_task = DummyOperator(task_id='end', dag=dag_a)
start_task >> collect_data_task >> end_task
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def process_data():
print("done")
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 3, 1),
}
dag_b = DAG('dag_b', default_args=default_args, schedule_interval='@daily')
start_task = DummyOperator(task_id='start', dag=dag_b)
# ExternalTaskSensor를 이용하여 dag_a가 완료될 때 까지 대기
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_dag_id='dag_a',
external_task_id='collect_data',
dag=dag_b,
mode='poke',
timeout=600,
poke_interval=30,
)
process_data_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag_b,
)
end_task = DummyOperator(task_id='end', dag=dag_b)
start_task >> wait_for_dag_a >> process_data_task >> end_task
코드는 여기서 확인할 수 있습니다.
모든 센서들이 그렇듯 기본적으로 ExternalTaskSensor 역시 BaseSensorOperator를 상속합니다.
Basics
external_dag_id : Upstream DAG의 ID를 의미합니다. 즉 Sensor가 기다려야하는 task를 포함한 DAG 입니다.
external_task_id : Upstream DAG에서 기다려야하는 task의 ID입니다.
external_task_ids : 기다려야하는 Task들의 ID 리스트 입니다. Default로는 Sensor는 DAG를 기다리게 됩니다.
external_task_group_id : 기다려야하는 Task의 task_group_id 입니다.
Mode : 모드에는 기본적으로 Poke와 Reschedule 모드가 있습니다.
아무런 설정을 하지 않는다면 Poke로 동작하게 됩니다. Poke의 경우 Worker slot을 점유하고 Reschedule 모드 설정 시 매 확인 시 마다 slot을 점유 해제하기 때문에 긴 실행시간을 예측하는 경우에 적합한 모드입니다. Reschedule의 경우 확인되지 않는 경우에 대하여 UP_FOR_RETRY 상태로 남게 되어 다시 스케줄링 됩니다.
poke_interval : 초로 정의해준 간격대로 상태를 poking 합니다.
스케줄 시간 관련
execution_delta과 execution_date_fn를 통해 현재 DAG와 외부 DAG의 실행 차이 시간을 정의할 수 있습니다. 두 가지를 모두 지정할 경우 에러가 발생합니다. (한가지만 지정 가능)
실패 처리 및 Skipped 상태
기본적으로 soft_fail==True 을 정의해주지 않는다면 외부에 체킹해야하는 Task가 Skipped 상태가 되더라도 ExternalTaskSensor는 Skipped 상태가 되지 않습니다. soft_fail==True 은 failed_status에 지정해준 상태에 대하여 관여하게 됩니다.
Airflow의 코드 상에서는 기본적으로 Failed 된 상태의 Task를 카운트하며, soft_fail 설정 여부를 통해 Sensor의 Task를 Failed 상태로 떨어뜨릴지 혹은 Skipped 상태로 둘지 결정하게 됩니다.
soft_fail==True 만을 사용하게 될 경우 Target task가 failed 혹은 timeout 되었을 때 Sensor Task를 Skip하게 됩니다.
skipped_states를 사용하게 될 경우 지정한 리스트 내의 Task에 대하여 상태를 체크하고, 만약 여러개의 타깃 Task 중 하나라도 Failed 상태를 가지고 있을 경우 Sensor 역시 Skip하게 됩니다. 여기까지는 soft_fail==True의 동작과 동일하지만 만약 타깃이 Timeout 될 경우 Sensor가 Failed 상태로 접어들게 되는 점에서 차이가 있습니다.