https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
{{ }}
를 이용해 변수를 치환된 값으로 입력data_interval_start
: 스케줄의 시작 날짜이며,pendulum.DateTime
타입data_interval_end
: 스케줄의 종료 날짜(= 배치일)이며,pendulum.DateTime
타입logical_date
: DAG가 실행 중인 시점의 날짜이며,pendulum.DateTime
타입ds
: logical_date
를 YYYY-MM-DD
형태의 문자열로 변환한 값ds
에서 -
을 제거한 YYYYMMDD
형태의 문자열 ds_nodash
변수도 제공ts
: logical_date
를 2018-01-01T00:00:00+00:00
형태의 문자열로 변환한 값ts_nodash_with_tz
또는 ts_nodash
등의 변형된 변수도 지원ds
또는 ts
등은 {{ logical_date | ds }}
의 형태로도 표현 가능bash_command
, env
파라미터에 템플릿 적용 가능templates_dict
, op_args
, op_kwargs
파라미터에 템플릿 적용 가능env
로 파라미터를 전달해서 출력# dags/bash_template.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="bash_template",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "template"],
) as dag:
bash_task1 = BashOperator(
task_id="bash_task1",
bash_command="echo \"End date is {{ data_interval_end }}\"",
)
bash_task2 = BashOperator(
task_id="bash_task2",
env={
"START_DATE": "{{ data_interval_start | ds }}",
"END_DATE": "{{ data_interval_end | ds }}"
},
bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
)
bash_task1 >> bash_task2
bash_task1
의 실행 로그에서 data_interval_end
가 시간대를 포함하여 전체 출력된 것을 확인bash_task2
의 실행 로그에서는 data_interval_start
와 data_interval_end
이 YYYY-MM-DD 형태의 문자열로 출력된 것을 확인# bash_task1
[2025-06-01, 19:59:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 19:59:27] INFO - Filling up the DagBag from /opt/airflow/dags/bash_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 19:59:27] INFO - Running command: ['/usr/bin/bash', '-c', 'echo "End date is 2025-05-31 15:00:00+00:00"']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - End date is 2025-05-31 15:00:00+00:00: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Command exited with return code 0: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Pushing
[2025-06-01, 19:59:28] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 19:59:28] INFO - Filling up the DagBag from /opt/airflow/dags/bash_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 19:59:28] INFO - Running command: ['/usr/bin/bash', '-c', 'echo "Start date is $START_DATE " && echo "End date is $END_DATE"']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Start date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Command exited with return code 0: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Pushing
**kwargs
내용을 출력한적이 있었는데, 직접 전달하지 않았음에도 출력되었던 값들이 바로 Jinja 템플릿 변수에 해당# dags/python_template1.py
from airflow.sdk import DAG, task
import pendulum
with DAG(
dag_id="python_template1",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "template"],
) as dag:
@task(task_id="python_task")
def show_templates(**kwargs):
from pprint import pprint
pprint(kwargs)
show_templates()
python_task
의 실행 로그에서 data_interval_end
, data_interval_start
등 Jinja 템플릿 변수가 출력된 것을 확인[2025-06-01, 20:18:19] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 20:18:19] INFO - Filling up the DagBag from /opt/airflow/dags/python_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 20:18:19] INFO - {'conn': <ConnectionAccessor (dynamic access)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'dag': <DAG: python_template>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'dag_run': DagRun(dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', logical_date=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), data_interval_start=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), run_after=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 250768, tzinfo=TzInfo(UTC)), end_date=None, clear_number=0, run_type=<DagRunType.SCHEDULED: 'scheduled'>, conf={}, consumed_asset_events=[]),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'data_interval_end': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'data_interval_start': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'ds': '2025-05-31',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'ds_nodash': '20250531',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'inlet_events': InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={}),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'inlets': [],: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'logical_date': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'macros': <MacrosAccessor (dynamic access to macros)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'map_index_template': None,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'outlet_events': <airflow.sdk.execution_time.context.OutletEventAccessors object at 0xffffaacaa810>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'outlets': [],: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'params': {},: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'prev_data_interval_end_success': <Proxy at 0xffffaad2b140 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad3cfe0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-06-01, 20:18:19] INFO - 'prev_data_interval_start_success': <Proxy at 0xffffaad2b0b0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad3cea0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'prev_end_date_success': <Proxy at 0xffffaacde870 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad0c7c0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'prev_start_date_success': <Proxy at 0xffffaacde8d0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad0c720>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'run_id': 'scheduled__2025-05-31T15:00:00+00:00',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'task': <Task(_PythonDecoratedOperator): python_task>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'task_instance': RuntimeTaskInstance(id=UUID('01972b36-c56a-7d54-b52e-45bb8feb6594'), task_id='python_task', dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', try_number=1, map_index=-1, hostname='f6f932b48199', context_carrier={}, task=<Task(_PythonDecoratedOperator): python_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 315990, tzinfo=TzInfo(UTC)), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'task_instance_key_str': 'python_template__python_task__20250531',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'task_reschedule_count': 0,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'templates_dict': None,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'ti': RuntimeTaskInstance(id=UUID('01972b36-c56a-7d54-b52e-45bb8feb6594'), task_id='python_task', dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', try_number=1, map_index=-1, hostname='f6f932b48199', context_carrier={}, task=<Task(_PythonDecoratedOperator): python_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 315990, tzinfo=TzInfo(UTC)), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'triggering_asset_events': TriggeringAssetEventsAccessor(_events=defaultdict(<class 'list'>, {})),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'ts': '2025-05-31T15:00:00+00:00',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'ts_nodash': '20250531T150000',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'ts_nodash_with_tz': '20250531T150000+0000',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'var': {'json': <VariableAccessor (dynamic access)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - 'value': <VariableAccessor (dynamic access)>}}: chan="stdout": source="task"
python_task1
정의python_task2
정의# dags/python_template2.py
from airflow.sdk import DAG, task
import pendulum
with DAG(
dag_id="python_template2",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "template"],
) as dag:
def print_period(start_date, end_date, **kwargs):
print(start_date)
print(end_date)
python_task1 = PythonOperator(
task_id="python_task1",
python_callable=print_period,
op_kwargs={
"start_date": "{{ data_interval_start | ds }}",
"end_date": "{{ data_interval_end | ds }}"
},
)
@task(task_id="python_task2")
def python_task2(**kwargs):
for __key in ["ds", "ts", "data_interval_start", "data_interval_end"]:
if __key in kwargs:
print(f"{__key}: {kwargs[__key]}")
python_task1 >> python_task2()
python_task1
의 실행 로그에서 전달한 data_interval_start
, data_interval_end
값이 YYYY-MM-DD 형태의 문자열로 출력된 것을 확인python_task2
의 실행 로그에서는 ds
, ts
, data_interval_start
, data_interval_end
값을 keyword argument로부터 꺼내서 그대로 출력# python_task1
[2025-06-02, 00:12:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-02, 00:12:27] INFO - Filling up the DagBag from /opt/airflow/dags/python_template2.py: source="airflow.models.dagbag.DagBag"
[2025-06-02, 00:12:27] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-06-02, 00:12:27] INFO - 2025-06-01: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - 2025-06-01: chan="stdout": source="task"
# python_task2
[2025-06-02, 00:12:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-02, 00:12:27] INFO - Filling up the DagBag from /opt/airflow/dags/python_template2.py: source="airflow.models.dagbag.DagBag"
[2025-06-02, 00:12:27] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-06-02, 00:12:27] INFO - ds: 2025-06-01: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - ts: 2025-06-01T15:00:00+00:00: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - data_interval_start: 2025-06-01 15:00:00+00:00: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - data_interval_end: 2025-06-01 15:00:00+00:00: chan="stdout": source="task"
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#macros
macros.datetime
: datetime.datetime
라이브러리 기반 연산 제공macros.timedelta
: datetime.timedelta
라이브러리 기반 연산 제공macros.dateutil
: dateutil
라이브러리 기반 연산 제공import datetime as dt
from dateutil.relativedelta import relativedelta
today = dt.date.today()
# 1일로 변경
first_date = today.replace(day=1) # datetime 연산
first_date = today + relativedelta(day=1) # dateutil 연산
# 1일 빼기
yesterday = today - dt.timedela(days=1) # timedela 연산
yesterday = today - relativedelta(days=1) # dateutil 연산
bash_macros1
은 매월 말에 실행되도록 스케줄을 지정하고, 직전 배치일에서 1일을 추가한 날짜를 START_DATE
, 배치일을 END_DATE
로 설정하여 1일부터 말일이 출력되기를 기대# dags/bash_macros1.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="bash_macros1",
schedule="0 0 L * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "template"],
) as dag:
bash_task1 = BashOperator(
task_id="bash_task1",
env={
"START_DATE": "{{ (data_interval_start.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(days=1)) | ds }}",
"END_DATE": "{{ data_interval_end.in_timezone(\"Asia/Seoul\") | ds }}"
},
bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
)
bash_task1
bash_macros2
는 매월 둘째주 토요일에 실행되도록 스케줄을 지정하고, 직전 배치일과 배치일을 1일로 변경해서 전월 1일과 당월 1일이 출력되기를 기대# dags/bash_macros2.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="bash_macros2",
schedule="0 0 * * 6#2",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "template"],
) as dag:
bash_task2 = BashOperator(
task_id="bash_task2",
env={
"START_DATE": "{{ (data_interval_end.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(day=1)) | ds }}",
"END_DATE": "{{ (data_interval_end.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(day=1)) | ds }}"
},
bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
)
bash_task2
data_interval_start
, data_interval_end
가 동일한 값을 가지고 있다고 생각됨# bash_task1 (bash_macros1)
[2025-06-03, 10:54:43] INFO - Start date is 2025-06-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 10:54:43] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
# bash_task2 (bash_macros2)
[2025-06-03, 10:57:23] INFO - Start date is 2025-05-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 10:57:23] INFO - End date is 2025-05-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
bash_macros1
의 마지막 실행 내역에 대해 Details 탭에서 실행 정보를 조회했을 때, data_interval_start
, data_interval_end
값이 모두 동일한 배치일로 나타나는 것을 확인data_interval_start
, data_interval_end
를 결정하는 기준이 변경된 것을 인지data_interval
계산 알고리즘에 영향을 주는 create_cron_data_intervals
파라미터가 도입되었는데, 3.0 버전부터 기본값이 기존 False
에서 True
로 변경되면서, 기존 CronDataIntervalTimetable
대신 CronTriggerTimetable
알고리즘이 사용되도록 변경됨data_interval
을 고려하지 않도록 변경되어 data_interval_start
와 data_interval_end
값이 모두 실제 DAG이 실행된 날짜로 표현The
create_cron_data_intervals
configuration is nowFalse
by default. This means that theCronTriggerTimetable
will be used by default instead of theCronDataIntervalTimetable
CronDataIntervalTimetable
의 경로를 파악해서 bash_macros1
DAG의 스케줄을 재설정START_DATE
는 배치일 기준 1일, END_DATE
는 배치일인 말일이 출력되는 것을 확인# dags/bash_macros1.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.timetables.interval import CronDataIntervalTimetable
import pendulum
with DAG(
dag_id="bash_macros1",
schedule=CronDataIntervalTimetable("0 0 L * *", timezone="Asia/Seoul"),
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "template"],
) as dag:
bash_task1 = BashOperator(
task_id="bash_task1",
env={
"START_DATE": "{{ (data_interval_start.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(days=1)) | ds }}",
"END_DATE": "{{ data_interval_end.in_timezone(\"Asia/Seoul\") | ds }}"
},
bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
)
bash_task1
# bash_task1 (bash_macros1)
[2025-06-03, 12:10:27] INFO - Start date is 2025-05-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 12:10:27] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
CronTriggerTimetable
알고리즘으로도 data_interval
을 적용할 수 있는 방법이 있는데, interval
파라미터로 timedelta
를 전달하면 가능bash_macros2
DAG의 스케줄에 CronTriggerTimetable
알고리즘을 적용하면서, interval
파라미터로 1주의 간격을 지정 (초기 의도인 매월 둘째주 토요일과는 다르게 매주 토요일로 변경)END_DATE
는 배치일인 5월 31일, START_DATE
는 직전 배치일인 5월 24일이 출력되는 것을 확인# dags/bash_macros2.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.timetables.trigger import CronTriggerTimetable
import datetime as dt
import pendulum
with DAG(
dag_id="bash_macros2",
schedule=CronTriggerTimetable(
"0 0 * * 6",
timezone="Asia/Seoul",
interval=dt.timedelta(weeks=1),
),
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "template"],
) as dag:
bash_task2 = BashOperator(
task_id="bash_task2",
env={
"START_DATE": "{{ data_interval_start.in_timezone(\"Asia/Seoul\") | ds }}",
"END_DATE": "{{ data_interval_end.in_timezone(\"Asia/Seoul\") | ds }}"
},
bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
)
bash_task2
# bash_task2 (bash_macros2)
[2025-06-03, 12:20:03] INFO - Start date is 2025-05-24: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 12:20:03] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"