[Airflow] Jinja Template

식빵·2025년 6월 12일
0

Airflow

목록 보기
4/9
post-thumbnail

이 게시물은 airflow 2.10.5 버전을 사용해서 작성됐습니다.
3.x 버전과 조금 다를 수 있으니 유의하시기 바랍니다.

Jinja template ?

Jinja는 Python을 위한 템플릿 엔진으로,
동적인 HTML, XML 및 텍스트를 생성하는 데 사용되는 라이브러리입니다.
Airflow 에서도 이런 Jinja Template 을 사용합니다.


Jinja Template 의 사용법이 다양하지만, Airflow 에서는 Operator
Parameter 값Airflow 내부적으로 생성되는 변수들을 {{ }} 안에
작성해서, Task 가 실행되는 시점에 동적으로 해당 변수의 값이 들어가는 방식으로 사용합니다.

말이 어렵죠? 예를 들어서 다음과 같이 쓰면 된다는 말입니다.

# BashOperator 에서 jinja 템플릿을 사용하는 예시
bash_task = BashOperator(
    task_id="bash_task",
    env={
        "MY_END_DATE": '{{ data_interval_end.in_timezone("Asia/Seoul") }}'
    },
    bash_command='echo "RUN TIME = $MY_END_DATE"',
)
  • Operator파라미터 값 에서 사용하고 {{ }} 를 통해 Jinja template 을 사용
  • {{}} 사이에 있는 data_interval_end 가 바로 airflow 가 미리 지정한 변수!
  • 동적으로 생성된 값을 bash_command 에서 사용 가능


그렇다면 Airflow 에서 미리 지정한 변수들에는 뭐가 있을까요?
이 부분은 Airflow 공식 문서에 상세히 나오지만,
그중에서도 자주 사용되는 것을 뽑아내자면 다음과 같습니다.

  • {{ data_interval_start }} : data_interval 의 start 지점
  • {{ data_interval_end }} : data_interval 의 end 지점
  • {{ macros }} : jinja 템플릿에서 macro 사용 시 사용
  • {{ var.value }} : Airflow 에 지정한 전역변수를 조회 시 사용

참고로 저 각각의 변수들은 어떤 타입(Type)인지에 따라 메소드를 호출하거나,
갖고 있는 멤버 변수를 조회할 수도 있습니다.

예를들어 data_interval_end 의 타입은 pendulum.DateTime 이라서
다음과 같이 메소드를 호출할 수 있습니다.

data_interval_end.in_timezone("Asia/Seoul")

더 자세히 알고 싶으면 Airflow 공식 문서 참조해주세요.
변수도 많고, 타입도 다양해서 여기에 다 쓰기는 힘드네요 😂




어떤 파라미터에서 사용할 수 있을까?

OperatorParameter 값에 Jinja template 을 활용할 수 있다고 했지만,
사실 모든 파라미터에서 사용할 수 있는 건 아닙니다!

정확히 어떤 파라미터에 사용할 수 있는지는 Airflow
Operator 공식문서 또는 Operator 소스파일을 봐야 정확히 알 수 있습니다.

저는 BashOperator 클래스의 소스파일을 열어보겠습니다.
그리고 문자열 찾기 기능을 사용해서 template_fields 라는 문구를 찾아 보겠습니다.

그러면 위 그림처럼 Parameter 이름들이 주르륵 나열된 것을 볼 수 있는데,
저게 바로 Jinja Template 을 사용할 수 있는 Parameter 들입니다.




간단한 실습 코드

from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="dags_bash_operator_jinja1",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:

	# bash_command 파라미터에서 Jinja 사용
    my_bash1 = BashOperator(
        task_id="my_bash1",
        bash_command='echo "data_interval_end : {{ data_interval_end }}"',
    )

	# env 파라미터에서 Jinja 사용하
    my_bash2 = BashOperator(
        task_id="my_bash2",
        env={
            "MY_START_DATE": "{{data_interval_start | ds}}",
            "MY_END_DATE": "{{data_interval_end | ds}}",
        },
        bash_command='echo $START_DATE && echo $END_DATE',
    )

    bash_t1 >> bash_t2
  • | ds 라는 표현을 볼 수 있는데, 저 | dsFilter 를 활용한 모습입니다.
  • 출력 문자열을 보다 보기 좋게 뽑아낼 때 Filter 를 사용합니다.
  • | ds 의 경우에는 YYYY-MM-DD 같은 형태로 날짜 포맷의 문자열을 만들어줍니다.




macro 사용

Jinja Template 에는 Macros 를 사용해서 보다 동적인 문자열을 얻을 수 있습니다.
Macros 는 유틸리티성 패키지를 jinja 내부에서 접근할 수 있도록 돕는 객체입니다.
다음과 같은 패키지들을 Macros 통해서 접근할 수 있습니다.

출처: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#macros

예를 들어서 dateutil 패키지의 relativedelta 를 사용하고 싶다면 아래처럼 사용하면 됩니다.

# PythonOperator 예시, 참고로 template_dict 파라미터에서 jinja 사용 가능!
@task(
    task_id="using_macros_example",
    templates_dict={
        "end_date": '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }} ',
    }
)
def using_macros_example(**kwargs):
    template_dicts = kwargs.get("templates_dict") or {}
    if template_dicts:
        end_date = template_dicts.get('end_date')
        print(end_date)

조금 장황하지만 이런 식으로 사용할 수 있다는 것만 아시면 되겠습니다.
더 자세하게 사용하는 방법들은 스스로 실습을 해보시면 되겠습니다.



보충: CustomOperator 과 Jinja

저희가 CustomOperator, 즉 BaseOperator 를 상속한 클래스를 만들 때도
위에서 배운 Jinja Template 을 적용할 수 있는 클래스 필드를 지정할 수 있습니다.

from airflow.models import BaseOperator


class DynamicCsvCreateOperator(BaseOperator):

	# Jinja Template 을 적용하고자 하는 필드명 지정, 여러 개 지정도 가능!
    template_fields = ('file_name', )

    def __init__(self, path, **kwargs):
        super().__init__(**kwargs)
        
        # csv 파일 경로, 생성자 파라미터 그대로 사용.
        self.path = path
        
        # csv 파일 명은 동적으로 생성. 
        # ex: "20180101T000000.csv" 같은 형태의 CSV 파일이름 생성 
        self.file_name = '{{ data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}.csv'

	def execute(self, context):
    	# TODO: 랜덤 데이터를 갖는 csv 생성
    	pass

간단하죠?




참고한 것들

profile
백엔드 개발자로 일하고 있는 식빵(🍞)입니다.

0개의 댓글