Context, Template

yoon__0_0·2024년 6월 18일
0

이어드림 수업

목록 보기
71/103

Context란?

  • 특정 Task에 대한 정보 (스케쥴 시간, DAG 명, Task 명 등)을 담은 파이썬 딕셔너리
  • dag의 kwargs의 내용은 airflow가 자동적으로 넣어준 context
  • 이를 잘 활용하면 도움이 됨 : 즉 kwargs에서 꺼낼 수 있음.
    Airflow context

실습

1) 데코레이터 이용버전

from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_decorator_with_param",
    schedule="0 2 * * 1",
    start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    @task(task_id="python_task_1")
    def regist3(name, sex, *args, **kwargs):
        print(f'이름: {name}')
        print(f'성별: {sex}')
        print(f'기타옵션들: {args}')
        email = kwargs['email'] or 'empty'
        phone = kwargs['phone'] or 'empty'
        print(f'email: {email}')
        print(f'phone: {phone}')
        print(f'data_interval_start : {kwargs['data_interval_start']}')
        print(f'data_interval_end : {kwargs['data_interval_end']}')
        # from pprint import pprint
        # pprint(kwargs)

    python_task_1 = regist3('hjkim', 'man', 'seoul',
                            email='hjkim_sun@naver.com', phone='010')

2) 외부 함수 이용

  • common/common_func.py
def regist2(name, sex, *args, **kwargs):
    print(f'이름: {name}')
    print(f'성별: {sex}')
    print(f'기타옵션들: {args}')
    email = kwargs['email'] or 'empty'
    phone = kwargs['phone'] or 'empty'
    if email:
        print(email)
    if phone:
        print(phone)

    data_interval_start = kwargs['data_interval_start']
    data_interval_end = kwargs['data_interval_end']

    print(data_interval_start)
    print(data_interval_end)
  • dags_python_with_op_kwargs.py
 from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from common.common_func import regist2

with DAG(
    dag_id="dags_python_with_op_kwargs",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
    catchup=False
) as dag:

    regist2_t1 = PythonOperator(
        task_id='regist2_t1',
        python_callable=regist2,
        op_args=['hjkim', 'man', 'kr', 'seoul'],
        op_kwargs={'email': 'hjkim_sun@naver.com', 'phone': '010'}
    )

    regist2_t1

airflow의 날짜 개념

  • data_interval_start, data_interval_end
  • 배치일을 기준이 아니라 airflow에서는 데이터관점의 시작일을 기준으로 바라보게 됨.
  • 즉, data_interval_start 는 이전 배치 시점, data_interval_end 는 현재 배치 시점 .

Template

  • 문서에서 특정 양식으로 작성된 값을 런타임시 실제값으로 치환해주는 처리 엔진
  • 템플릿 엔진은 여러 솔루션이 존재하며 그중 Jinja 템플릿은 파이썬 언어에서 많이 사용하는 엔진
  • 파이썬 기반 웹 프레임 워크인 Flask, Django 뿐 아니라 수많은 app 에서 사용됨.
  • HTML 템플릿 저장 후 화면에 보여질 때 실제 값으로 변환해서 출력

jinja template 실습

  • 기본실습 :{{}} 로 사용
from jinja2 import Template

template = Template('my name is {{name}}')
new_template = template.render(name = 'yoonjae')
print(new_template)

# 결과
my name is yoonjae
  • airflow 에서 실습
    • 모든 파라미터에 template을 적용할 수 있는 것은 아님
    • 따라서 오퍼레이터 사용시 반드시 문서를 읽고 어떤 파라미터에 Template 적용이 가능한지 확인 필요
    • bash , python 에서 templated 혹은 template_fields 인 파라미터를 확인해서 사용해야함.



from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dag_bash_with_template",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:

    bash_t1 = BashOperator(
        task_id="bash_t1",
        bash_command='echo "data_interval_end: {{data_interval_end}}" '
    )

    bash_t2 = BashOperator(
        task_id="bash_t2",
        env={
            'START_DATE': '{{data_interval_start | ds}}',
            'END_DATE': '{{data_interval_end | ds}}'
        },
        bash_command='echo $START_DATE && echo $END_DATE'
    )

    bash_t1 >> bash_t2

python operater

  • 직접 접근, template 둘다가능 (단, bash는 template 만 사용해야함)
  • python에서 context를 template 활용
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG(
    dag_id="dags_python_template",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    def python_function1(start_date, end_date, **kwargs):
        print(start_date)
        print(end_date)

    python_t1 = PythonOperator(
        task_id='python_t1',
        python_callable=python_function1,
        op_kwargs={'start_date':'{{data_interval_start | ds}}', 'end_date':'{{data_interval_end | ds}}'}
    )

    @task(task_id='python_t2')
    def python_function2(**kwargs):
        print(kwargs)
        print('ds:' + kwargs['ds'])
        print('ts:' + kwargs['ts'])
        print('data_interval_start:' + str(kwargs['data_interval_start']))
        print('data_interval_end:' + str(kwargs['data_interval_end']))
        print('task_instance:' + str(kwargs['ti']))


    python_t1 >> python_function2()

Macro

  • template 안에서 파이썬 문법을 이용해 포멧 변경, 날짜 연산 등이 가능한 방법

from datetime import datetime
from dateutil import relativedelta

now = datetime.now()
now_add_1d = now + relativedelta.relativedelta(days = 1)
now_add_1d
  • relativedelta는 days, hours, minutes, months 가능 , 빼기 덧셈 가능.
  • 파라미터 값에 - 값 줄수 있음.
  • 결과 값의 type은 datetime 으로 나옴.
  • days가 아니라(month = -1, day = 1) 을 하면 월을 빼고 1일로 만들어라 이런 뜻

bash operator 에서 활용

  • 매월 둘째주 토요일 0시 10분에 수행되는 DAG
  • bash 오퍼레이터를 활용하여 START_DATE: 2주전 월요일
    / END_DATE: 2주전 토요일이 나오도록 수행 (YYYY-MM-DD 형태)
  • 예시) 수행일이 6월 8일이라면 START_DATE: 5월 20일 / END_DATE: 5월 25일

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dag_bash_with_macro_eg2",
    schedule="10 0 * * 6#2",
    start_date=pendulum.datetime(2024, 5, 1, tz='Asia/Seoul'),
    catchup=False
) as dag:
    bash_task_2 = BashOperator(
        task_id="bash_task_2",
        env={'START_DATE': '{{(data_interval_start.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(days = 9)) | ds}}',
             'END_DATE': '{{(data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days = 14)) | ds}}'},
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"')

python operator 에서 사용

  • start_date : 전월 1일
  • end_date : 전월 말일이 되도록 DAG 구현
  • 스케쥴 : 매일 0시 10분
  • task 데코레이터에서 template_dict를 사용하기 위해서는 아래처럼 사용해야함.
from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_macro",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
    catchup=False
) as dag:
    @task(task_id='task_using_macros',
          templates_dict={'start_date': '{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
                          'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
                          })
    def get_datetime_macro(**kwargs):
        templates_dict = kwargs.get('templates_dict') or {}
        if templates_dict:
            start_date = templates_dict.get('start_date') or 'start_date 없음'
            end_date = templates_dict.get('end_date') or 'end_date 없음'
            print(start_date)
            print(end_date)

    @task(task_id='task_direct_calc')
    def get_datetime_calc(**kwargs):
        from dateutil.relativedelta import relativedelta

        data_interval_end = kwargs['data_interval_end']
        prev_month_day_first = data_interval_end.in_timezone(
            'Asia/Seoul') + relativedelta(months=-1, day=1)
        prev_month_day_last = data_interval_end.in_timezone(
            'Asia/Seoul').replace(day=1) + relativedelta(days=-1)
        print(prev_month_day_first.strftime('%Y-%m-%d'))
        print(prev_month_day_last.strftime('%Y-%m-%d'))

    get_datetime_macro() >> get_datetime_calc()

python에서는 template & macro 를 사용하여 날짜 연산을 해도 되고, kwargs 에서 context 를 꺼낸 후 파이썬 문법 이용 가능
bash 오퍼레이터는 template & macro 사용해야함

profile
신윤재입니다

0개의 댓글