1) all_done
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.exceptions import AirflowException
import pendulum
with DAG(
dag_id='dags_python_with_trigger_rule_eg1',
start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
bash_upstream_1 = BashOperator(
task_id='bash_upstream_1',
bash_command='echo upstream1'
)
@task(task_id='python_upstream_1')
def python_upstream_1():
raise AirflowException('downstream_1 Exception!')
@task(task_id='python_upstream_2')
def python_upstream_2():
print('정상 처리')
# all_done : 성공이건 실패건 돈다
@task(task_id='python_downstream_1', trigger_rule='all_done')
def python_downstream_1():
print('정상 처리')
[bash_upstream_1, python_upstream_1(), python_upstream_2()] >> python_downstream_1()
2) none_skipped
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
import pendulum
with DAG(
dag_id='dags_python_with_trigger_rule_eg2',
start_date=pendulum.datetime(2024, 6, 17, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
@task.branch(task_id='branching')
def random_branch():
import random
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item == 'B':
return 'task_b'
elif selected_item == 'C':
return 'task_c'
task_a = BashOperator(
task_id='task_a',
bash_command='echo upstream1'
)
@task(task_id='task_b')
def task_b():
print('정상 처리')
@task(task_id='task_c')
def task_c():
print('정상 처리')
@task(task_id='task_d', trigger_rule='none_skipped')
def task_d():
print('정상 처리')
random_branch() >> [task_a, task_b(), task_c()] >> task_d()