Bash 오퍼레이터 With macro

우상욱·2024년 3월 24일

Airflow Master Class

목록 보기
18/24

1. macro 변수 이해


DAG 스케줄은 매월 말일 (0 0 L * *)에 도는 스케줄인데
BETWEEN 값을 전월 마지막일부터 어제 날짜까지 주고 싶은데 어떻게 하지?

예를 들어
배치일이 1월 31일이면, 12월 31일부터 1월 30일까지
배치일이 2월 28일이면, 1월 31일부터 2월 27일까지 BETWEEN이 설정되었으면 좋겠어.

DAG 스케줄이 월 단위이니까
Tempalte 변수에서 date_interval_start 값은 한달 전 말일 이니까 시작일은 해결될 것 같은데 끝 부분은 어떻게 만들지? data_interval_end 에서 하루 뺀 값이 나와야 하는데...

sql = f'''
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN ?? AND
??
'''
  • from 값 : {{ data_interval_start }}

  • to 값 : {{ data_interval_end - 1 day }}
    1 day를 빼는 연산을 위해 macro 변수가 필요하다.

  • Template 변수 기반 다양한 날짜 연산이 가능하도록 연산 모듈을 제공하고 있음

  • Macro를 잘 쓰려면 파이썬 datetime 및 dateutil 라이브러리에 익숙해져야함

2. 파이썬의 datetime, dateutil 라이브러리


Airflow의 macro는 datetime과 dateutil을 활용해서 만들어졌습니다.
해당 방식을 이용해서 사용하면, 작성이 쉬워집니다.

from datetime import datetime
from dateutil import relativedelta

now = datetime(year=2023, month=3, day=30)
print('현재시간 ' + str(now))

# 월 연산
print(now + relativedelta.relativedelta(month=1)) # 1월로 변경
print(now.replace(month=1)) # 1월로 변경
print(now + relativedelta.relativedelta(months=-1)) # 1개월 빼기

# 일 연산
print(now + relativedelta.relativedelta(day=1)) # 1일로 변경
print(now.replace(day=1)) # 1일로 변경
print(now + relativedelta.relativedelta(days=-1)) # 1일 빼기

# 연산 여러개
print(now + relativedelta.relativedelta(months=-1) + relativedelta.relativedelta(days=-1)) # 1개월, 1일 빼기

3. Bash 오퍼레이터 with macro 실습


  • 예시1)
    매월 말일 수행되는 Dag에서 변수 START_DATE: 전월 말일, 변수 END_DATE 어제로 env 세팅하기
  • 예시2)
    매월 둘째주 토요일에 수행되는 DAG에서
    변수 START_DATE: 2주 전 월요일
    변수 END_DATE: 2주 전 토요일로 env 세팅하기
    • 변수는 YYYY-MM-DD 형식으로 나오도록

예제 1

공식문서 : https://airflow.apache.org/docs/apache-airflow/1.10.12/macros-ref.html

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_macro_eg1",
    schedule="10 0 L * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    # START_DATE: 전월 말일, END DATE: 1일 전
    bash_task_1 = BashOperator(
        task_id="bash_task_1",
        env={
            "START_DATE": '{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}',  # utc는 한국보다 아홉시간이 느리므로, 한국 타임존으로 맞춰줘야한다.
            "END_DATE": '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}',
        },
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"',
    )

예제2

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_macro_eg2",
    schedule="10 0 * * 6#2",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    # START_DATE: 2주전 월요일, END DATE: 2주전 토요일
    bash_task_1 = BashOperator(
        task_id="bash_task_1",
        env={
            "START_DATE": '{{ (data_interval_start.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds }}',
            "END_DATE": '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds }}',
        },
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"',
    )
  • timezone 설정은 .in_timezone("Asia/Seoul")
  • 날짜 연산은- macros.dateutil.relativedelta.relativedelta(days=14))
profile
데이터엔지니어

0개의 댓글