: python에서 널리 사용되는 템플릿 엔진이다. Django 템플릿 엔진에서 영감을 받아 개발되었다고 한다. 현재는 Flask에서 많이 사용된다고 한다.
<h1>name: {{name}}<h1>
# 반복문 예시
{% for item in items %}
...
{% endfor %}
: 작업 이름, 파라미터 또는 SQL 쿼리와같은 작업 매개변수를 템플릿화된 문자열로 정의 할 수 있다.
{{ ds }}
# params를 활용해서 사용도 가능하다.
task = BashOperator(
...
bash_command='echo "test,{{params.name}}"",
params={'name':'pori'},
dag=dag
)
Operators — Airflow Documentation
{{ ds }} | 연도-월-일 |
---|---|
{{ ds_nodash }} | 대시없이 ds 출력 |
{{ ts }} | 연도-월-일-시-분-초 |
{{ dag }} | dag이름 |
{{ task }} | task에 대한 정보 |
{{ dag_run }} | |
{{ var.value}}: {{ var.value.get(’my.var’, ‘fallback’) }} | Variable 읽어오기 (value) |
{{ var.json }}: {{ var.json.my_dict_var.key1 }} | Variable 읽어오기 (json) |
{{ conn }}: {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password } | Connection 생성 |
conf = { ‘name’: ‘pori’ }
{{ dag_run.conf[”name”] }}
kwargs['dag_run'].conf.get('name')
dag_run_conf_overrides_params
가 True로 되어야한다.# TriggerDagRunOperator
trigger_task = TriggerDagRunOperator(
...
conf = {...}
execution_date = '{{ ds }}'
...
dag=dag
)
# targetDAG
task1 = BashOperator(
...
bash_command ="""echo '{{ds}}, {{ dag_run.conf.get("name","none")' """
)
: 특정 조건이 충족될 때까지 대기하는 Operator, 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용하게 사용된다.
: 앞의 DAG의 특정 Task가 완료되었는지를 확인한다.
: 상황에 따라 뒤에서 실행되어야할 태스크를 동적으로 결정해주는 Operator
# 조건을 걸어줄 함수를 생성
def decide_branch(**context):
current_hour = datetime.now().hour
if current_hour < 12:
return 'morning_task'
else:
return 'afternoon_task'
# BranchPythonOperator정의, python함수를 호출한다.
branching_operator = BranchPythonOperator(
...
python_callable=decide_branch,
dag=dag
)
# branch의 결과에 따라서 실행되는 operator들
morning_task = EmptyOperator(
task_id='morning_task'
)
afternoon_task= EmptyOperator(
task_id='afternoon_task'
)
# 실행 순서 설정
branching_operator >> morning_task
branching_operator >> afternoon_task
Time-sensitive한 task들이 과거의 backfill시 실행되는 것을 막기 위해 사용된다.
현재 시간이 execution_date보다 미래이고, 다음execution_date보다 과거인 경우에만 실행을 이어가고 아니면 중단된다. → 현재보다 과거의 경우에는 중단!
from airflow.operators.latest_only import LatestOnlyOperator
with DAG(
...
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id='latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')
)
t1 >> t2 >> [t3,t4]
: Upstream task의 상황에 따라서 뒷단의 task의 실행 여부를 결정하기위해 사용
# 예시 task1,2가 모두 성공해야 task3가 실행된다.
from airflow.utils.trigger_rule import TriggerRule
...
t1 = BashOperator(...)
t2 = BashOperator(...)
t3 = BashOperator(
...
trigger_rule=TriggerRule.ALL_DONE
)
[t1,t2] >> t3
: task들을 성격에 따라서 관리하는 경우에 용이하다.
from airflow.utils.task_group import TaskGroup
start = EmptyOperator(task_id="start")
with TaskGroup("Download", tooltip="Tasks for downloading daga") as section_1:
task1 = ...
task2 = ...
...
task_1 >> task2
# nesting
with TaskGroup(...) as inner_serction_2:
...
start >> section_1 >>
: 템플릿과 yaml을 기반으로 dag를 동적으로 만드려는 것. 비슷한 dag를 계속해서 매뉴얼하게 개발하는 것을 방지한다.
# config_appl.yml
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
# templated_dag.jinja2
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(dag_id="get_price_{{ dag_id }}",
start_date = ...
schedule = '{{ schedule }}",
catchup = {{ catchup or True }} # catchup을 사용하거나 값이 없으면 True로 설정
) as dag:
@task
def extract(symlbol):
return symbol
@task
def process(symbol):
return symbol
@task
def store(symbol):
return symbol
store(process(extract("{{ symbol }}")))
# generator.py
from jinja2 import Environment, FileSystemLoader
import yaml
import os
# 현재 실행중인 파일의 폴더의 절대 경로를 반환한다.
fire_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')
for f in os.listdir(file_dir): # 파일 디렉토리 내에서 모든 파일 읽어오기
if f.endswith(".yml"): # yml파일 읽기
with open(f"{file_dir}/{f}","r") as cf: #yml 파일을 읽기모드로 열기
config = yaml.safe_load(cf)
with open(f"dags/get_price{config['dag_id']}.py","w") as f: # 쓰기모드로 dag 생성
# yml로 읽은 것을 template에 render 한 후 파일에 쓰는 작업 수행
f.write(template.render(config))