1) **kwargs에 존재하는 ti(task_instance) 객체 활용
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1") ti.xcom_push(key="result2", value=[1, 2, 3])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key="result1")
# task_ids 까지 적어주면 더 확실하게 가져올 수 있음.
value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1') print(value1)
print(value2)
2) task 데코레이터 활용 & 파이썬 함수의 return 값 활용
@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
transaction_value = 'status Good'
return transaction_value
@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status, **kwargs):
print(status)
xcom_pull_by_return(xcom_push_by_return())
3) 혼용해서 사용하기
@task(task_id='xcom_push_by_return')
def xcom_push_return(**kwargs):
transaction_value = 'status Good'
return transaction_value
@task(task_id='xcom_pull_by_return')
def xcom_pull_return_by_method(**kwargs):
ti = kwargs['ti']
pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return')
print(pull_value)
xcom_push_return() >>̬xcom_pull_return_by_method()
1) ti 객체를 이용해서 실습해보기
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1,2,3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_2")
ti.xcom_push(key="result2", value=[1,2,3,4])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key="result1")
value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
print(value1)
print(value2)
xcom_push1() >> xcom_push2() >> xcom_pull()
결과
xcome
2) 두개 같이 사용해보기
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg2",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
# 기본적으로 key는 return values이다.
value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1)
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print('함수 입력값으로 받은 값:' + status)
python_xcom_push_by_return = xcom_push_result()
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_xcom",
schedule="10 0 * * *",
start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
do_xcom_push=False
)
bash_push >> bash_pull
결과값
=> first_bash_message COMPLETE
from airflow import DAG
import pendulum
from airflow.decorators import task
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_python_with_xcom",
schedule="30 9 * * *",
start_date=pendulum.datetime(2024, 6, 16, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status':'Good','data':[1,2,3],'options_cnt':100}
return result_dict
bash_pull = BashOperator(
task_id='bash_pull',
env={
'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'
},
bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull
bash_push = BashOperator(
task_id='bash_push',
bash_command='echo PUSH_START '
'{{ti.xcom_push(key="bash_pushed",value=200)}} && '
'echo PUSH_COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed')
return_value = ti.xcom_pull(task_ids='bash_push')
print('status_value:' + str(status_value))
print('return_value:' + return_value)
bash_push >> python_pull_xcom()
python push xcom | bash_pull의 결과 | bash_push xcom | python_pull_xcom결과 |
---|---|---|---|
![]() | ![]() | ![]() | ![]() |
from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
from airflow.operators.email import EmailOperator
with DAG(
dag_id="dags_python_email_operator",
schedule="0 8 1 * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='something_task')
def some_logic(**kwargs):
from random import choice
return choice(['Success','Fail'])
send_email = EmailOperator(
task_id='send_email',
to='email',
subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
{{ti.xcom_pull(task_ids="something_task")}} 했습니다 <br>'
)
some_logic() >> send_email
airflow의 전경변수로써 key, value 로 저장하고 다른 스케쥴과 DAG에서 접근 가능한 공유방법
저장한 변수는 메타 DB의 variable 테이블에 저장됨.
admin -> variavles
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.models import Variable
with DAG(
dag_id="dags_bash_with_variable",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 4, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
var_value = Variable.get("sample_key")
# 1번방법
bash_var_1 = BashOperator(
task_id="bash_var_1",
bash_command=f"echo variable:{var_value}"
)
# 2번방법
bash_var_2 = BashOperator(
task_id="bash_var_2",
bash_command="echo variable:{{var.value.sample_key}}"
)