{{ }}
<h1> 제 이름은 {{name}} 입니다. </h1>
{% %}
<ul>
{% for item in items %}
<li> {{ item }} </li>
{% endfor %}
</ul>
(templated)
라고 붙어있는 인자만 가능# jinja 사용한 예시
task = BashOperator(
task_id="task1",
dag = dag,
bash_command = 'echo "Hi, {{ params.name }}!"',
params = {'name' : 'John'}
)
{{ ds }}
: execution_date를 YYYY-MM-DD 형태로 반환{{ ds_nodash }}
: execution_date를 YYYYMMDD 헝태{{ ts }}
: execution_date를 timestamp 형태로 반환{{ dag }}
: dag 이름 ( . 을 통해 그 안의 정보에 접근할 수 있다.){{ task }}
: task 이름 ( . 을 통해 그 안의 정보에 접근할 수 있다.){{ dag_run }}
: 넘어온 'conf'의 값을 불러오는 변수{{ var.value }}
: variable 값{{ var.value.get("변수key", "변수가없을떄default") }}
{{ var.json}}
: variable의 json 형태일 때{{ var.json.my_dict_var.key1 }}
{{ conn }}
: connection의 정보{{ conn.my_conn_id.host }}
mode=reschedule
: worker를 잡아서 조건 완료를 확인하고 실패라면 release 했다가 일정 주기후 다시 worker를 잡는 방식mode=poke
: worker 하나를 계속 잡고 조건 완료를 확인하고 실패라면 sleep 했다가 다시 확인하는 방식poke
, 느슨하게 확인해도 된다면 reschedule
[ Dag 실행 방법 ]
TriggerDagOperator
)ExternalTaskSensor
)BranchPythonOperator
)LatestOnlyOperator
)TriggerRules
)airflow.cfg
의 설정이 필요dag_run_conf_overrides_params=true
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
task_id = "trigger_B",
trigger_dag_id = '트리거하려는 DAG 이름',
conf = {'path' : '/opt/ml/conf'}, # DAG B에 넘겨주고 싶은 정보
execution_date = "{{ ds }}", # DAG A의 execution_date를 그대로 넘겨줘도 되고, 특정 날짜로 계산해도 되고, 새로운 값을 넘겨줘도 된다.
reset_dag_run = True, # DAG B가 끝날 때까지 기다릴지 여부 (default : False)
wait_for_completion = True # True일 경우, DAG B가 이미 주어진 execution_date에 대해 실행되었더라도 다시 재실행
)
schedule_interval
, execution_date
사용해야 한다.schedule_interval
이 안 맞으면 execution_delta
, execution_date_fun
을 사용하여 맞춰줘야 한다.execution_date
이 맞지 않으면 매칭이 되지 않는다.from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id = "task_id",
external_dag_id = "DAG A 이름",
external_task_id = "기다리는 task 이름",
timeout = 5*60, # poke할 주기(단위 : 초)
mode='rescheule'/'poke'
)
from airflow.operators.python import BranchPythonOperator
def choose_task():
if 조건:
return ['task2']
else:
return ['task3']
branching = BranchPythonOperator(
task_id = "task1",
python_collable = choose_task
)
operator2 = ~~Operator(task_id = 'task2')
operator3 = ~~Operator(task_id = 'task3')
branching >> [operator2, operator3]
from airflow.operators.latest_only import LatestOnlyOperator
with DAG(
...,
start_date=datetime(~),
catchup=True
) as day:
t1 = ...
t2 = LatestOnlyOperator(task_id="latest_only")
t3 = ...
t1 >> t2 >> t3
# backfill의 경우 t2에서 DAG가 정지한다.
trigger_rule
파라미터로 결정airflow.utils.trigger_rule.TriggerRule
안에 있는 변수 사용ALL_SUCCESS
: 앞선 task가 모두 성공했을 때 실행(default)ALL_Failed
: 앞선 task가 모두 실패했을 때 실행ALL_DONE
: 앞선 task가 성공/실패 여부와는 상관없이 모두 완료했을 때 실행ONE_SUCCESS
: 앞선 task 하나라도 성공하면 즉시 실행ONE_FAILED
: 앞선 task 하나라도 실패하면 즉시 실행NONE_FAILED
: 모두 실패하지 않았을 때 (성공 + 스킵 상황일 때)NONE_FAILED_MIN_ONE_SUCCESS
: 모두 실패하지 않았고 적어도 1개 이상의 task가 성공했을 때from airflow.utils.task_group import TaskGroup
start = ~operator(~)
with TaskGroup("그룹이름", tooltip="airflow web에서 확인할 내용") as section_1:
task1 =
task2 =
task3 =
task1 >> [task2, task3]
start >> section_1
3강 생각보다 내용이 많았고 이 내용이 꽤나 진국이었다. airflow 배우면서 궁금했던 내용들의 대부분이 오늘을 위해서 남겨둔거였을까...? 정말 신기하고 재미있었다. 4강까지 가는것이 목표였는데 조금 아쉽긴하다...
빨리 배운 내용을 가지고 직접 DAG 작성하고 싶다.