Python 오퍼레이터 - 외부 파이썬 함수 수행하기

우상욱·2024년 3월 23일

Airflow Master Class

목록 보기
9/24

파이썬 모듈 경로 이해


from airflow.operators.python import PythonOperator
airflow 폴더 아래 operators 폴더 아래 python 파일 아래에서 PythonOperator 클래스를 가져와라

  • 파이썬은 위 경로를 어떻게 찾을까?
    dag에서 우리가 만든 외부 함수를 import 해야하는데, import 경로를 어떻게 작성할까?

  • 파이썬은 sys.path 변수에서 모듈의 위치를 검색

  • sys.path에 값을 추가하는 방법
    1) 명시적으로 추가 (ex. sys.path.append('/home/wsw') )
    2) OS 환경변수 PYTHONPATH에 값을 추가

  • Airflow는 자동적으로 dags 폴더와 plugins 폴더를 python.path에 추가함
    컨테이너에서 airflow info 명령 수행해보자.

  • python_path를 자세히 보면, plugins 폴더와 dags 폴더가 추가 되어있는 것을 볼 수 있음

  • 그 말은, plugins 폴더를 자동으로 탐색하기 때문에, plugins 폴더를 from 절에 작성하지 않고, 그 하위 폴더부터 from 절에 작성하면 된다는 것

plugins 폴더 이용하기


  1. 공통함수 작성
  2. 재사용성 증가
  3. DAG 깔끔

실습


  • plugins/common/common_func.py
def get_sftp():
    print("sftp 작업을 시작합니다.")
  • dag
from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from common.common_func import get_sftp

with DAG(
    dag_id="dags_python_import_func",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    task_get_sftp = PythonOperator(
        task_id="task_get_sftp",
        python_callable=get_sftp,
    )
  • 실행 결과 확인
profile
데이터엔지니어

0개의 댓글