[Airflow] Python Operator

minyeamer·2025년 5월 31일
0

Apache Airflow 배우기

목록 보기
4/13
post-thumbnail

PythonOperator

https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/python.html

PythonOperator 종류

  • PythonOperator : 파이썬 함수를 실행시키기 위한 Operator
  • PythonVirtualenvOperator : 파이썬 가상환경 생성 후 작업을 수행하고 마무리되면 가상환경을 삭제하는 Operator
  • ExternalPythonOperator : 기존에 존재하는 파이썬 가상환경에서 작업을 수행하는 Operator
  • BranchPythonOperator : 파이썬 함수 실행 결과에 따라 다음 Task를 선택적으로 실행시킬 수 있는 Operator
  • BranchPythonVirtualenvOperator : 가상환경 생성/삭제 및 브랜치 처리 기능이 있는 Operator
  • BranchExternalPythonOperator : 기존 가상환경을 사용하면서 브랜치 처리 기능이 있는 Operator

PythonOperator 활용

# dags/python_operator.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
import random

with DAG(
    dag_id="python_operator",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "python"],
) as dag:
    def select_country():
        COUNTRIES = [
            "Argentina", "Australia", "Brazil", "Canada", "China", "France", "Germany",
            "India", "Indonesia", "Italy", "Japan", "Mexico", "Russia", "Saudi Arabia",
            "South Africa", "South Korea", "Turkey", "United Kingdom", "United States"
        ]
        print(random.choice(COUNTRIES))

    python_task = PythonOperator(
        task_id="python_task",
        python_callable=select_country,
    )
  • 앞서 BashOperator 예제를 가져오고 Task를 PythonOperator 로 대체
  • G20 국가 중 랜덤한 국가를 선택해 출력하는 함수 select_country() 를 정의
  • select_country() 함수를 실행하는 python_task 를 단일 Task로 사용

DAG 등록

  • Airflow UI에서 python_operator DAG가 올라온 것을 확인

python-operator

DAG 실행

  • DAG를 실행하여 정상적으로 수행되는지 확인
  • python_task 의 첫 번째 실행 로그에서는 "Argentina" 국가가 선택되어 출력
  • UI에서 Trigger 버튼을 눌러 python_task 를 수동 실행한 로그에서는 "Indonesia" 국가가 선택되어 출력

trigger

# scheduled run (first)

[2025-05-31, 11:33:06] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 11:33:06] INFO - Filling up the DagBag from /opt/airflow/dags/python_operator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 11:33:06] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 11:33:06] INFO - Argentina: chan="stdout": source="task"
# manual run (second)

[2025-05-31, 11:36:38] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 11:36:38] INFO - Filling up the DagBag from /opt/airflow/dags/python_operator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 11:36:38] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 11:36:38] INFO - Indonesia: chan="stdout": source="task"

Plugins

https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/plugins.html

  • plugins/ 경로에 파이썬 함수를 작성하고 외부에서 활용
  • DAG 선언 시 함수를 가져오기만 해도 되어서 깔끔해지고 같은 함수를 재활용할 수 있어서 편리
  • 예시로, plugins/ 경로 아래에 현재 시간을 출력하는 print_now() 함수를 정의
# plugins/common/common_func.py

import datetime as dt

def print_now():
    print(dt.datetime.now())

Plugins 활용

# dags/python_plugins.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from common.common_func import print_now
import pendulum

with DAG(
    dag_id="python_plugins",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "python"],
) as dag:
    python_plugins_task = PythonOperator(
        task_id="python_plugins_task",
        python_callable=print_now,
    )
  • plugins/ 에 정의한 함수를 import 해서 사용하는 python_plugins DAG 및 단일 Task를 생성
  • 이 때, VSCode 상에서는 plugins/ 경로를 생략한 import 문을 인식하지 못하기 때문에 .env 파일에 PYTHONPATH 를 추가
# .env

WORKSPACE_FOLDER=/Users/.../airflow
PYTHONPATH=${WORKSPACE_FOLDER}/plugins

DAG 실행

  • 마찬가지로, Airflow 컨테이너를 재시작하면 python_plugins DAG가 올라온 것을 확인
  • DAG를 실행한 후 python_plugins_task 의 실행 로그에서 현재 시간이 출력된 결과를 조회
  • 별도로 시간대를 지정하지 않았는데, UTC 시간대를 기준으로 시간이 가져와진 것을 확인
[2025-05-31, 12:10:35] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 12:10:35] INFO - Filling up the DagBag from /opt/airflow/dags/python_plugins.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 12:10:35] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 12:10:35] INFO - 2025-05-31 03:10:35.099945: chan="stdout": source="task"

Decorator

  • Airflow 공식 문서에서 PythonOperator 사용하는 것보다는 코드가 짧고 가독성이 좋은 @task 데코레이터를 활용하는 것을 권장

Decorator 활용

# dags/python_decorator.py

from airflow.sdk import DAG, task
import pendulum

with DAG(
    dag_id="python_decorator",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "python"],
) as dag:
    @task(task_id="python_decorator_task")
    def print_input(__input):
        print(__input)

    python_decorator_task = print_input("@task 데코레이터 실행")
  • Airflow에서 제공하는 예시 example_python_decorator 를 복제 및 일부를 수정하여 DAG 선언
  • 단순히 함수에 @task 데코레이터를 추가하고 task_id 파라미터를 부여

https://github.com/apache/airflow/blob/providers-standard/1.2.0/providers/standard/tests/system/standard/example_python_decorator.py

DAG 실행

  • DAG를 실행한 후 python_decorator_task 의 실행 로그에서 전달한 인수가 출력된 것을 확인
[2025-05-31, 12:29:22] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 12:29:22] INFO - Filling up the DagBag from /opt/airflow/dags/python_decorator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 12:29:22] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-05-31, 12:29:22] INFO - @task 데코레이터 실행: chan="stdout": source="task"

Parameter

  • 파이썬에서는 함수에 순서대로 인자를 전달하거나 키-값의 형태로 파라미터를 전달 가능
  • 순서대로 전달되는 인자를 배열로 받을 때는 *args 와 같이 * 를 붙여서 Tuple 타입의 객체를 받을 수 있음
  • 키-값의 형태로 전달되는 파라미터를 받을 때는 **kwargs 와 같이 ** 를 붙여서 딕셔너리 타입의 객체를 받을 수 있음
  • 인자와 파라미터를 모두 받아서 출력하는 함수 regist() 를 아래와 같이 구현
# plugins/common/common_func.py

def regist(name: str, age: int, *args, **kwargs):
    print(f"이름: {name}")
    print(f"나이: {age}")
    for __key, __value in kwargs.items():
        print(f"{__key}: {__value}")
    if args:
        print(f"기타 정보: {args}")

op_args, op_kwargs

  • PythonOperator 도 함수를 실행할 때 인자 또는 파라미터를 전달할 수 있는 방법을 제공
  • 인자를 전달할 때는 op_args 파라미터에 배열 객체를 전달
  • 파라미터를 전달할 때는 op_kwargs 파라미터에 딕셔너리 객체를 전달
# dags/python_parameter.py

from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from common.common_func import regist
import pendulum

with DAG(
    dag_id="python_parameter",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "python"],
) as dag:
    regist_task = PythonOperator(
        task_id="regist_task",
        python_callable=regist,
        op_args=["김철수", 20, "서울", "대한민국"],
        op_kwargs={"이메일":"su@example.com", "전화번호":"010-1234-5678"},
    )

DAG 실행

  • DAG를 실행한 후 regist_task 의 실행 로그에서 전달한 인자와 파라미터가 출력된 것을 확인
  • 실제로는 직접 전달한 파라미터 외에 ds 또는 ts 등 Airflow에서 만들어지는 파라미터가 같이 전달되는 것 같은데 앞으로 Airflow를 알게되면서 활용할 수 있을 것을 기대
[2025-05-31, 15:04:22] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 15:04:22] INFO - Filling up the DagBag from /opt/airflow/dags/python_parameter.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 15:04:23] INFO - 이름: 김철수: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 나이: 20: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 이메일: su@example.com: chan="stdout": source="task"
[2025-05-31, 15:04:23] INFO - 전화번호: 010-1234-5678: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 기타 정보: ('서울', '대한민국'): chan="stdout": source="task"
profile
데이터의 모든 것을 추구합니다.

0개의 댓글