[Airflow] 7. Implementing

Denver·2022년 9월 11일
0

Airflow

목록 보기
9/11
post-thumbnail

Xcom으로 task간 데이터 공유하기
SQLite : 2GB
Postgres : 1GB
MySQL : 64 KB

Operator가 작동하면 task 인스턴스가 생성됨

ti : task 인스턴스
ti 파라미터를 이용하는데 task 인스턴스 오브젝트 xcom 메서드로 접근하게해줌

def _t1(ti):
    ti.xcom_push(key='my_key', value=1234)

def _t2(ti):
    ti.xcom_pull(key='my_key', task_id='t1')
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1(ti):
    ti.xcom_push(key='my_key', value=1234)

def _t2(ti):
    print('value : ', ti.xcom_pull(key='my_key', task_id='t1'))

with DAG("xcom_dag", start_date=datetime(2022, 9, 1),
    schedule_interval='@daily', catchup=False) as dag:

    t1 = PythonOperator(
        task_id='t1',
        python_callable=_t1
    )

    t2 = PythonOperator(
        task_id='t2',
        python_callable=_t2
    )


    t1 >> t2

Dag를 실행시키고 상단 메뉴 Admin - XComs에가면 t1에 my_key : 1234를 확인할 수 있다.

t2의 로그를 보면

value가 출력되었다

task 상태에 따라 다음 task 정하기

value가 짝수이면 t2로, 아니면 t3이 실행되도록하였다.

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

def _t1(ti):
    ti.xcom_push(key='my_key', value=1234)

def _t2(ti):
    ti.xcom_pull(key='my_key', task_ids='t1')

def _branch(ti):
    value = ti.xcom_pull(key='my_key', task_ids='t1')
    if (value % 2) == 0:
        return 't2'
    return 't3'

with DAG("xcom_dag", start_date=datetime(2022, 1, 1),
    schedule_interval='@daily', catchup=False) as dag:

    t1 = PythonOperator(
        task_id='t1',
        python_callable=_t1
    )

    branch = BranchPythonOperator(
        task_id='branch',
        python_callable=_branch
    )

    t2 = PythonOperator(
        task_id='t2',
        python_callable=_t2
    )

    t3 = BashOperator(
        task_id='t3',
        bash_command="echo ''"
    )

    t4 = BashOperator(
        task_id='t4',
        bash_command="echo ''",
    )


선택되지 않은 task t3는 skip 된다(분홍)
t4는 t2, t3이 선행되어야 실행되기때문에 t3이 skip되었으므로 t4도 skip되었다.
t4를 t2,t3상태에 따라 실행시킬 수 있는데
trigger_rule 옵션을 사용하면 된다.

trigger_rule 옵션갑
all_success: (default) 모든 부모 노드 성공
all_failed: 모든 부모 노드 failed 혹은 upstream_failed
all_done: 모든 부모 노드 실행 완료
all_skipped : 모든 부모 노드 skipped 상태
one_failed: 부모 최소 1개 failed. 모든 부모 노드 완료시까지 가디리지 않음
one_success: 부모 최소 1개 성공. 모든 부모 노드 완료시까지 가디리지 않음
none_failed: 모든 부모 노드 실패(failed 혹은 upstream_failed) 아닌 경우. == 모든 부모 노드 성공 혹은 skip
none_failed_min_one_success : 모든 upstream tasks
failed 혹은 upstream_failed 가 아니고, 그리고 최소 1 upstream task 성공
none_skipped: 모든 부모 노드 skip 아닌 경우 == 모든 부모 노드 성공 / failed / upstream_failed state
always : 의존성 x. 항상

t4에 트리거 룰을 설정하면 t3이 스킵되었는데도 실행되었다.

    t4 = BashOperator(
        task_id='t4',
        bash_command="echo ''",
        trigger_rule='none_failed_min_one_success'
    )

profile
까먹었을 미래의 나를 위해

0개의 댓글