1) 데코레이터 이용버전
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_decorator_with_param",
schedule="0 2 * * 1",
start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
catchup=False,
) as dag:
@task(task_id="python_task_1")
def regist3(name, sex, *args, **kwargs):
print(f'이름: {name}')
print(f'성별: {sex}')
print(f'기타옵션들: {args}')
email = kwargs['email'] or 'empty'
phone = kwargs['phone'] or 'empty'
print(f'email: {email}')
print(f'phone: {phone}')
print(f'data_interval_start : {kwargs['data_interval_start']}')
print(f'data_interval_end : {kwargs['data_interval_end']}')
# from pprint import pprint
# pprint(kwargs)
python_task_1 = regist3('hjkim', 'man', 'seoul',
email='hjkim_sun@naver.com', phone='010')
2) 외부 함수 이용
def regist2(name, sex, *args, **kwargs):
print(f'이름: {name}')
print(f'성별: {sex}')
print(f'기타옵션들: {args}')
email = kwargs['email'] or 'empty'
phone = kwargs['phone'] or 'empty'
if email:
print(email)
if phone:
print(phone)
data_interval_start = kwargs['data_interval_start']
data_interval_end = kwargs['data_interval_end']
print(data_interval_start)
print(data_interval_end)
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from common.common_func import regist2
with DAG(
dag_id="dags_python_with_op_kwargs",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 6, 14, tz="Asia/Seoul"),
catchup=False
) as dag:
regist2_t1 = PythonOperator(
task_id='regist2_t1',
python_callable=regist2,
op_args=['hjkim', 'man', 'kr', 'seoul'],
op_kwargs={'email': 'hjkim_sun@naver.com', 'phone': '010'}
)
regist2_t1
from jinja2 import Template
template = Template('my name is {{name}}')
new_template = template.render(name = 'yoonjae')
print(new_template)
# 결과
my name is yoonjae
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dag_bash_with_template",
schedule="10 0 * * *",
start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command='echo "data_interval_end: {{data_interval_end}}" '
)
bash_t2 = BashOperator(
task_id="bash_t2",
env={
'START_DATE': '{{data_interval_start | ds}}',
'END_DATE': '{{data_interval_end | ds}}'
},
bash_command='echo $START_DATE && echo $END_DATE'
)
bash_t1 >> bash_t2
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id="dags_python_template",
schedule="30 9 * * *",
start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
catchup=False
) as dag:
def python_function1(start_date, end_date, **kwargs):
print(start_date)
print(end_date)
python_t1 = PythonOperator(
task_id='python_t1',
python_callable=python_function1,
op_kwargs={'start_date':'{{data_interval_start | ds}}', 'end_date':'{{data_interval_end | ds}}'}
)
@task(task_id='python_t2')
def python_function2(**kwargs):
print(kwargs)
print('ds:' + kwargs['ds'])
print('ts:' + kwargs['ts'])
print('data_interval_start:' + str(kwargs['data_interval_start']))
print('data_interval_end:' + str(kwargs['data_interval_end']))
print('task_instance:' + str(kwargs['ti']))
python_t1 >> python_function2()
from datetime import datetime
from dateutil import relativedelta
now = datetime.now()
now_add_1d = now + relativedelta.relativedelta(days = 1)
now_add_1d
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dag_bash_with_macro_eg2",
schedule="10 0 * * 6#2",
start_date=pendulum.datetime(2024, 5, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
bash_task_2 = BashOperator(
task_id="bash_task_2",
env={'START_DATE': '{{(data_interval_start.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(days = 9)) | ds}}',
'END_DATE': '{{(data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days = 14)) | ds}}'},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"')
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_macro",
schedule="10 0 * * *",
start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='task_using_macros',
templates_dict={'start_date': '{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
})
def get_datetime_macro(**kwargs):
templates_dict = kwargs.get('templates_dict') or {}
if templates_dict:
start_date = templates_dict.get('start_date') or 'start_date 없음'
end_date = templates_dict.get('end_date') or 'end_date 없음'
print(start_date)
print(end_date)
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
from dateutil.relativedelta import relativedelta
data_interval_end = kwargs['data_interval_end']
prev_month_day_first = data_interval_end.in_timezone(
'Asia/Seoul') + relativedelta(months=-1, day=1)
prev_month_day_last = data_interval_end.in_timezone(
'Asia/Seoul').replace(day=1) + relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
get_datetime_macro() >> get_datetime_calc()
python에서는 template & macro 를 사용하여 날짜 연산을 해도 되고, kwargs 에서 context 를 꺼낸 후 파이썬 문법 이용 가능
bash 오퍼레이터는 template & macro 사용해야함