[Airflow] Jinja Template

minyeamer·2025년 6월 1일
0

Apache Airflow 배우기

목록 보기
5/13
post-thumbnail

Jinja 템플릿

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

  • 파이썬 기반 웹 프레임워크 Flask, Django에서 주로 사용
  • HTML 템플릿을 만들고 화면에 보여질 때 값을 렌더링해서 출력
  • Airflow에서는 파라미터 입력 시 중괄호 2개 {{ }} 를 이용해 변수를 치환된 값으로 입력

변수 목록 (중괄호 생략)

  • data_interval_start : 스케줄의 시작 날짜이며,pendulum.DateTime 타입
  • data_interval_end : 스케줄의 종료 날짜(= 배치일)이며,pendulum.DateTime 타입
  • logical_date : DAG가 실행 중인 시점의 날짜이며,pendulum.DateTime 타입
  • ds : logical_dateYYYY-MM-DD 형태의 문자열로 변환한 값
    • ds 에서 - 을 제거한 YYYYMMDD 형태의 문자열 ds_nodash 변수도 제공
  • ts : logical_date2018-01-01T00:00:00+00:00 형태의 문자열로 변환한 값
    • ts_nodash_with_tz 또는 ts_nodash 등의 변형된 변수도 지원
  • ds 또는 ts 등은 {{ logical_date | ds }} 의 형태로도 표현 가능

적용 대상

  • BashOperator에서는 bash_command, env 파라미터에 템플릿 적용 가능
  • PythonOperator에서는 templates_dict, op_args, op_kwargs 파라미터에 템플릿 적용 가능
  • Airflow의 각 Operator 문서에서 Templating 부분 참고

BashOperator

Jinja 템플릿 변수 활용

  • Jinja 템플릿 변수를 그대로 출력하는 명령어를 실행하는 DAG 구성
  • 첫 번째 Task는 변수를 그대로 출력하고, 두 번째 Task는 env로 파라미터를 전달해서 출력
# dags/bash_template.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_template",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    bash_task1 = BashOperator(
        task_id="bash_task1",
        bash_command="echo \"End date is {{ data_interval_end }}\"",
    )

    bash_task2 = BashOperator(
        task_id="bash_task2",
        env={
            "START_DATE": "{{ data_interval_start | ds }}",
            "END_DATE": "{{ data_interval_end | ds }}"
        },
        bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
    )

    bash_task1 >> bash_task2

DAG 실행

  • DAG 실행 후 bash_task1 의 실행 로그에서 data_interval_end 가 시간대를 포함하여 전체 출력된 것을 확인
  • bash_task2 의 실행 로그에서는 data_interval_startdata_interval_end 이 YYYY-MM-DD 형태의 문자열로 출력된 것을 확인
# bash_task1

[2025-06-01, 19:59:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 19:59:27] INFO - Filling up the DagBag from /opt/airflow/dags/bash_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 19:59:27] INFO - Running command: ['/usr/bin/bash', '-c', 'echo "End date is 2025-05-31 15:00:00+00:00"']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - End date is 2025-05-31 15:00:00+00:00: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Command exited with return code 0: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:27] INFO - Pushing 
[2025-06-01, 19:59:28] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 19:59:28] INFO - Filling up the DagBag from /opt/airflow/dags/bash_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 19:59:28] INFO - Running command: ['/usr/bin/bash', '-c', 'echo "Start date is $START_DATE " && echo "End date is $END_DATE"']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Output:: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Start date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Command exited with return code 0: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-01, 19:59:28] INFO - Pushing 

PythonOperator 1

Jinja 템플릿 변수 출력

  • keyword argument로 전달되는 Jinja 템플릿 변수를 출력하는 명령어를 실행하는 DAG 구성
  • 이전에 한번 **kwargs 내용을 출력한적이 있었는데, 직접 전달하지 않았음에도 출력되었던 값들이 바로 Jinja 템플릿 변수에 해당
# dags/python_template1.py

from airflow.sdk import DAG, task
import pendulum

with DAG(
    dag_id="python_template1",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    @task(task_id="python_task")
    def show_templates(**kwargs):
        from pprint import pprint
        pprint(kwargs)

    show_templates()

DAG 실행

  • DAG 실행 후 python_task 의 실행 로그에서 data_interval_end, data_interval_start 등 Jinja 템플릿 변수가 출력된 것을 확인
[2025-06-01, 20:18:19] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-01, 20:18:19] INFO - Filling up the DagBag from /opt/airflow/dags/python_template.py: source="airflow.models.dagbag.DagBag"
[2025-06-01, 20:18:19] INFO - {'conn': <ConnectionAccessor (dynamic access)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'dag': <DAG: python_template>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'dag_run': DagRun(dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', logical_date=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), data_interval_start=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), run_after=datetime.datetime(2025, 5, 31, 15, 0, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 250768, tzinfo=TzInfo(UTC)), end_date=None, clear_number=0, run_type=<DagRunType.SCHEDULED: 'scheduled'>, conf={}, consumed_asset_events=[]),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'data_interval_end': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'data_interval_start': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ds': '2025-05-31',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ds_nodash': '20250531',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'inlet_events': InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={}),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'inlets': [],: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'logical_date': DateTime(2025, 5, 31, 15, 0, 0, tzinfo=Timezone('UTC')),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'macros': <MacrosAccessor (dynamic access to macros)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'map_index_template': None,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'outlet_events': <airflow.sdk.execution_time.context.OutletEventAccessors object at 0xffffaacaa810>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'outlets': [],: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'params': {},: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'prev_data_interval_end_success': <Proxy at 0xffffaad2b140 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad3cfe0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-06-01, 20:18:19] INFO -  'prev_data_interval_start_success': <Proxy at 0xffffaad2b0b0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad3cea0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'prev_end_date_success': <Proxy at 0xffffaacde870 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad0c7c0>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'prev_start_date_success': <Proxy at 0xffffaacde8d0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffffaad0c720>>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'run_id': 'scheduled__2025-05-31T15:00:00+00:00',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'task': <Task(_PythonDecoratedOperator): python_task>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'task_instance': RuntimeTaskInstance(id=UUID('01972b36-c56a-7d54-b52e-45bb8feb6594'), task_id='python_task', dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', try_number=1, map_index=-1, hostname='f6f932b48199', context_carrier={}, task=<Task(_PythonDecoratedOperator): python_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 315990, tzinfo=TzInfo(UTC)), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'task_instance_key_str': 'python_template__python_task__20250531',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'task_reschedule_count': 0,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'templates_dict': None,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ti': RuntimeTaskInstance(id=UUID('01972b36-c56a-7d54-b52e-45bb8feb6594'), task_id='python_task', dag_id='python_template', run_id='scheduled__2025-05-31T15:00:00+00:00', try_number=1, map_index=-1, hostname='f6f932b48199', context_carrier={}, task=<Task(_PythonDecoratedOperator): python_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 6, 1, 11, 18, 19, 315990, tzinfo=TzInfo(UTC)), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'triggering_asset_events': TriggeringAssetEventsAccessor(_events=defaultdict(<class 'list'>, {})),: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ts': '2025-05-31T15:00:00+00:00',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ts_nodash': '20250531T150000',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'ts_nodash_with_tz': '20250531T150000+0000',: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -  'var': {'json': <VariableAccessor (dynamic access)>,: chan="stdout": source="task"
[2025-06-01, 20:18:19] INFO -          'value': <VariableAccessor (dynamic access)>}}: chan="stdout": source="task"

PythonOperator 2

Jinja 템플릿 변수 활용

  • 이번에는 PythonOperator에 Jinja 템플릿 변수를 전달하여 출력하는 python_task1 정의
  • keyword argument로 전달되는 Jinja 템플릿 변수 중 일부 항목만 선택해서 출력하는 python_task2 정의
# dags/python_template2.py

from airflow.sdk import DAG, task
import pendulum

with DAG(
    dag_id="python_template2",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    def print_period(start_date, end_date, **kwargs):
        print(start_date)
        print(end_date)

    python_task1 = PythonOperator(
        task_id="python_task1",
        python_callable=print_period,
        op_kwargs={
            "start_date": "{{ data_interval_start | ds }}",
            "end_date": "{{ data_interval_end | ds }}"
        },
    )

    @task(task_id="python_task2")
    def python_task2(**kwargs):
        for __key in ["ds", "ts", "data_interval_start", "data_interval_end"]:
            if __key in kwargs:
                print(f"{__key}: {kwargs[__key]}")

    python_task1 >> python_task2()

DAG 실행

  • DAG 실행 후 python_task1 의 실행 로그에서 전달한 data_interval_start, data_interval_end 값이 YYYY-MM-DD 형태의 문자열로 출력된 것을 확인
  • python_task2 의 실행 로그에서는 ds, ts, data_interval_start, data_interval_end 값을 keyword argument로부터 꺼내서 그대로 출력
# python_task1

[2025-06-02, 00:12:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-02, 00:12:27] INFO - Filling up the DagBag from /opt/airflow/dags/python_template2.py: source="airflow.models.dagbag.DagBag"
[2025-06-02, 00:12:27] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-06-02, 00:12:27] INFO - 2025-06-01: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - 2025-06-01: chan="stdout": source="task"
# python_task2

[2025-06-02, 00:12:27] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-02, 00:12:27] INFO - Filling up the DagBag from /opt/airflow/dags/python_template2.py: source="airflow.models.dagbag.DagBag"
[2025-06-02, 00:12:27] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-06-02, 00:12:27] INFO - ds: 2025-06-01: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - ts: 2025-06-01T15:00:00+00:00: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - data_interval_start: 2025-06-01 15:00:00+00:00: chan="stdout": source="task"
[2025-06-02, 00:12:27] INFO - data_interval_end: 2025-06-01 15:00:00+00:00: chan="stdout": source="task"

Macro 변수

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#macros

  • Jinja 템플릿 변수 기반으로 다양한 날짜 연산이 가능하도록 연산 모듈을 제공

변수 목록

  • macros.datetime : datetime.datetime 라이브러리 기반 연산 제공
  • macros.timedelta : datetime.timedelta 라이브러리 기반 연산 제공
  • macros.dateutil : dateutil 라이브러리 기반 연산 제공
import datetime as dt
from dateutil.relativedelta import relativedelta

today = dt.date.today()

# 1일로 변경
first_date = today.replace(day=1) # datetime 연산
first_date = today + relativedelta(day=1) # dateutil 연산

# 1일 빼기
yesterday = today - dt.timedela(days=1) # timedela 연산
yesterday = today - relativedelta(days=1) # dateutil 연산

Macro 변수 활용

  • 첫 번째 DAG bash_macros1 은 매월 말에 실행되도록 스케줄을 지정하고, 직전 배치일에서 1일을 추가한 날짜를 START_DATE, 배치일을 END_DATE 로 설정하여 1일부터 말일이 출력되기를 기대
# dags/bash_macros1.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_macros1",
    schedule="0 0 L * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    bash_task1 = BashOperator(
        task_id="bash_task1",
        env={
            "START_DATE": "{{ (data_interval_start.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(days=1)) | ds }}",
            "END_DATE": "{{ data_interval_end.in_timezone(\"Asia/Seoul\") | ds }}"
        },
        bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
    )

    bash_task1
  • 두 번째 DAG bash_macros2 는 매월 둘째주 토요일에 실행되도록 스케줄을 지정하고, 직전 배치일과 배치일을 1일로 변경해서 전월 1일과 당월 1일이 출력되기를 기대
# dags/bash_macros2.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="bash_macros2",
    schedule="0 0 * * 6#2",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    bash_task2 = BashOperator(
        task_id="bash_task2",
        env={
            "START_DATE": "{{ (data_interval_end.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(day=1)) | ds }}",
            "END_DATE": "{{ (data_interval_end.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(day=1)) | ds }}"
        },
        bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
    )

    bash_task2

DAG 실행

  • 하지만 실행 로그를 확인했을 때 기대와 다른 결과가 확인되었는데, 마치 data_interval_start, data_interval_end 가 동일한 값을 가지고 있다고 생각됨
# bash_task1 (bash_macros1)

[2025-06-03, 10:54:43] INFO - Start date is 2025-06-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 10:54:43] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
# bash_task2 (bash_macros2)

[2025-06-03, 10:57:23] INFO - Start date is 2025-05-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 10:57:23] INFO - End date is 2025-05-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
  • 실제로 bash_macros1 의 마지막 실행 내역에 대해 Details 탭에서 실행 정보를 조회했을 때, data_interval_start, data_interval_end 값이 모두 동일한 배치일로 나타나는 것을 확인
  • 참고 자료로 활용한 강의에서 사용했던 Airflow 2.x 버전과, 현재 사용하는 Airflow 3.x 버전에서 data_interval_start, data_interval_end 를 결정하는 기준이 변경된 것을 인지

bash_macros1

Airflow 3.0 업데이트

  • 2.9 버전에서 data_interval 계산 알고리즘에 영향을 주는 create_cron_data_intervals 파라미터가 도입되었는데, 3.0 버전부터 기본값이 기존 False 에서 True 로 변경되면서, 기존 CronDataIntervalTimetable 대신 CronTriggerTimetable 알고리즘이 사용되도록 변경됨
  • 즉, 3.0 버전부터 기본적으로 data_interval 을 고려하지 않도록 변경되어 data_interval_startdata_interval_end 값이 모두 실제 DAG이 실행된 날짜로 표현

The create_cron_data_intervals configuration is now False by default. This means that the CronTriggerTimetable will be used by default instead of the CronDataIntervalTimetable

https://airflow.apache.org/docs/apache-airflow/stable/installation/upgrading_to_airflow3.html#breaking-changes

CronDataIntervalTimetable 활용

  • 이전 버전의 알고리즘인 CronDataIntervalTimetable 의 경로를 파악해서 bash_macros1 DAG의 스케줄을 재설정
  • 실행 로그를 조회했을 때, 초기 의도대로 START_DATE 는 배치일 기준 1일, END_DATE 는 배치일인 말일이 출력되는 것을 확인
# dags/bash_macros1.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.timetables.interval import CronDataIntervalTimetable
import pendulum

with DAG(
    dag_id="bash_macros1",
    schedule=CronDataIntervalTimetable("0 0 L * *", timezone="Asia/Seoul"),
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    bash_task1 = BashOperator(
        task_id="bash_task1",
        env={
            "START_DATE": "{{ (data_interval_start.in_timezone(\"Asia/Seoul\") + macros.dateutil.relativedelta.relativedelta(days=1)) | ds }}",
            "END_DATE": "{{ data_interval_end.in_timezone(\"Asia/Seoul\") | ds }}"
        },
        bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
    )

    bash_task1
# bash_task1 (bash_macros1)

[2025-06-03, 12:10:27] INFO - Start date is 2025-05-01: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 12:10:27] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

CronTriggerTimetable 활용

  • 3.0 버전 이후에서 사용되는 CronTriggerTimetable 알고리즘으로도 data_interval 을 적용할 수 있는 방법이 있는데, interval 파라미터로 timedelta 를 전달하면 가능
  • bash_macros2 DAG의 스케줄에 CronTriggerTimetable 알고리즘을 적용하면서, interval 파라미터로 1주의 간격을 지정 (초기 의도인 매월 둘째주 토요일과는 다르게 매주 토요일로 변경)
  • 실행 로그를 조회했을 때, END_DATE 는 배치일인 5월 31일, START_DATE 는 직전 배치일인 5월 24일이 출력되는 것을 확인
# dags/bash_macros2.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.timetables.trigger import CronTriggerTimetable
import datetime as dt
import pendulum

with DAG(
    dag_id="bash_macros2",
    schedule=CronTriggerTimetable(
        "0 0 * * 6",
        timezone="Asia/Seoul",
        interval=dt.timedelta(weeks=1),
    ),
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "template"],
) as dag:
    bash_task2 = BashOperator(
        task_id="bash_task2",
        env={
            "START_DATE": "{{ data_interval_start.in_timezone(\"Asia/Seoul\") | ds }}",
            "END_DATE": "{{ data_interval_end.in_timezone(\"Asia/Seoul\") | ds }}"
        },
        bash_command="echo \"Start date is $START_DATE \" && echo \"End date is $END_DATE\"",
    )

    bash_task2
# bash_task2 (bash_macros2)

[2025-06-03, 12:20:03] INFO - Start date is 2025-05-24: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
[2025-06-03, 12:20:03] INFO - End date is 2025-05-31: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
profile
데이터의 모든 것을 추구합니다.

0개의 댓글