Python 오퍼레이터 - op_kwargs로 변수 할당하기

우상욱·2024년 3월 23일

Airflow Master Class

목록 보기
13/24

파이썬 오퍼레이터 op_kwargs 사용법


  1. 함수에 일반 변수만 있을 경우
def regist(name, sex):
    print(f'이름은 {name}이고 성별은 {sex}dlqslek')

python_task = PythonOperator(
    task_id='python_task',
    python_callable=regist,
    op_kwargs={'name':'wsw', 'sex':'man'}
    )
  1. 함수에 일반 변수 + **kwagrs도 있을 경우
def regist(name, sex, **kwargs):
    print(name)
    print(sex)
    print(kwagrs)
 
python_task = PythonOperator(
    task_id='python_task',
    python_callable=regist,
    op_kwargs={'name':'wsw','sex':'man', 'country':'kr','city': 'seoul'}
    )
  1. 함수에 일반 파라미터 + *args + **kwargs가 모두 있는 경우
def regist(name, sex, *args, **kwargs):
   print(name)
   print(sex)
   print(args)
   print(kwargs)

python_task_2 = PythonOperator(
   task_id='python_task_2',
   python_callable=regist,
   op_args=['wsw','man','kr','seoul'],
   op_kwargs=['phone':010, 'email':'wjddm3@naver.com']
   )

DAG

  • common/common_func
def regist2(name, sex, *args, **kwargs):
    print(f"이름: {name}")
    print(f"성별: {sex}")
    print(f"기타 옵션들: {args}")
    email = kwargs["email"] or None
    phone = kwargs["phone"] or None
    if email:
        print(email)
    if phone:
        print(phone)
  • dags
from airflow import DAG
import datetime
import pendulum
from airflow.operators.python import PythonOperator
from common.common_func import regist2
import random

with DAG(
    dag_id="dags_python_with_op_kwargs",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    regist2_t1 = PythonOperator(
        task_id="regist2_t1",
        python_callable=regist2,
        op_args=["wsw", "man", "kr", "seoul"],
        op_kwargs={"email": "wsw@naver.com", "phone": "010"},
    )

    regist2_t1

실행 결과를 보면, op_args에 전달 된 파라미터들은, 파라미터의 수에 맞게 전달 되고, args에 있던 것들은 튜플로, kwargs에 전달된 것들은 딕셔너리로 전달되어 잘 출력된 것을 볼 수 있습니다.

profile
데이터엔지니어

0개의 댓글