Airflow Study11 - BashOperator with macro

박성현·2024년 6월 2일

Airflow

목록 보기
18/28

Airflow Macro변수란 ?

Airflow에서 매크로 변수는 DAG 실행 시점에 동적으로 생성되는 값을 나타내는 Jinja 템플릿입니다. Jinja 템플릿은 Airflow에서 널리 사용되는 텍스트 템플릿 엔진이며, 매크로 변수는 이 Jinja 템플릿의 특별한 형태입니다.

매크로 변수의 주요 특징:

DAG 실행 정보: 주로 DAG의 실행 날짜 및 시간, DAG ID 등 DAG 실행과 관련된 정보를 제공합니다.
Jinja 템플릿 내 사용: Jinja 템플릿 문법 (예: {{ ... }})을 사용하여 DAG 정의 파일 (Python 파일)이나 Airflow의 웹 UI에서 접근하고 활용할 수 있습니다.
동적 값 생성: DAG 실행 시점의 정보를 기반으로 값이 결정되므로, 매번 실행될 때마다 다른 값을 가질 수 있습니다.

사용 예시 :

스케쥴 : 매월 말일 스케쥴 ( 월배치 )
범위 : 전월 말 일부터 어제까지
ex )
1/31일 배치 시 : 12월31일 ~ 1월 30일
2/28일 배치 시 : 1월 31일 ~ 2월 27일

select name, address
from tbl
where reg_dt between {{data_interval_start}} and {{data_interval_end}}-1day ??? #이게 맞아 ? 
from datetime import datetime
from dateutil import relativedelta

now = datetime(year=2024, month=6, day =2)
print('현재시간' +str(now))

print('-----------월 연산 -----------')
print(now+relativedelta.relativedelta(month=1)) # 1월로 변경
print(now.replace(month=1)) # 1월로 변경
print(now+relativedelta.relativedelta(months=-1)) # 1월 빼기

print('----------- 일 연산  -----------')
print(now+relativedelta.relativedelta(day=1)) # 1일로 변경
print(now.replace(day=1)) # 1일로 변경
print(now+relativedelta.relativedelta(days=-1)) # 1일 빼기

print('----------- 복합 연산  -----------')
print(now + relativedelta.relativedelta(months=-1)+relativedelta.relativedelta(days=-1)) # 1개월 1일 전날 
현재시간: 2024-06-02 00:00:00

-----------월 연산 -----------
2024-01-02 00:00:00
2024-01-02 00:00:00
2024-05-02 00:00:00

----------- 일 연산  -----------
2024-06-01 00:00:00
2024-06-01 00:00:00
2024-06-01 00:00:00

-----------복합 연산  -----------
2024-05-01 00:00:00

실습 1

Case 1 :

매월 말일 수행되는 Dag에서
변수 start_date : 전월 말일,
변수 end_date : 어제로
env 셋팅

#배치 수행일 6/2일이면 , 5/31~ 6/1일 데이터 수행 필요 

start_date=pendulum.datetime(2024,4,1, tz='Asia/Seoul')
schedule='10 0 L * * ',

bash_task_1 =BashOperator(
	task_id = 'bash_task_1',
	env = {'START_DATE':'{{data_interval_start.in_timezone("Asia/Seoul") | ds}}' ,
           'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - 
           		macros.dateutil.relativedelta.relativedelta(days=1))| ds}}'  
          },
    bash_command='echo "START_DATE : $START_DATE" && echo "END_DATE: $END_DATE"'
)

6/2일 수행 된다고 가정하고 , (5/31~ 6/1일 데이터 수행 필요)
이전 배치는 4/30~5/30일까지, 다음 배치는 5/31~6/29
START_DATE:'{{data_interval_start.in_timezone("Asia/Seoul") | ds}}'
: 매월 말일 셋팅 되어 있으므로, 이번달 배치일은 6/30, 전 월 배치일은 5/31(data_interval_start)
어렵게 생각하지 말고, 일배치 인지, 월배치 인지보고,
월 배치라면 저번달 수행되어야 하는 날짜가 data_interval_start
일 배치라면 어제 날짜

END_DATE:'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1))| ds}}'
: data_interval_end 그냥 오늘 수행 시키면 오늘날짜, 스케쥴되로 수행된다면 스케쥴 수행되는 날짜일듯

profile
다소Good한 데이터 엔지니어

0개의 댓글