Airflow 공부 - 8 Python 오퍼레이터(2) 외부 함수 실행하기

Kangjik Kim·2024년 5월 5일
  • 파이썬 모듈 경로

    • from airflow.operators.python import PythonOperator
    • Airflow/operators/python 파일에서 PythonOperator 클래스를 가져온다
    • airflow는 자동적으로 dags 폴더와 plugins 폴더를 sys.path에 추가한다.
    • /opt/airflow/plugins/tmp/tmp_fuc.py에def tmp() 라면
      • from tmp.tmp_func import tmp()가 가능
      • plugins에 공통적으로 쓰이는 파이썬 함수들을 선언해놓고 활용하자
      • 위의 예시처럼 import해오면 실행은 정상적으로 되지만 vscode상에서 에러가 표시된다.
        • 작업 폴더의 루트 디렉토리에
        • .vscode 폴더를 만들고
          • settings.json 파일을 만든다.
            {
                "python.analysis.extraPaths": [
                    "./plugins"
                ]
            }
          • 이렇게 만들면 에러가 표시되지 않는다.
  • 학습 코드

    from airflow import DAG
    import pendulum
    import datetime
    from airflow.operators.python import PythonOperator
    from common.common_func import get_sftp
    
    with DAG(
        dag_id="dags_python_import_func",
        schedule="30 6 * * *", 
        start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
        catchup=False
    ) as dag:
        
        task_get_sftp = PythonOperator(
            task_id="task_get_sftp",
            python_callable=get_sftp
        )
            
        task_get_sftp
    def get_sftp():
        print('sftp 작업을 시작합니다.')

0개의 댓글