https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/python.html
PythonOperator
: 파이썬 함수를 실행시키기 위한 OperatorPythonVirtualenvOperator
: 파이썬 가상환경 생성 후 작업을 수행하고 마무리되면 가상환경을 삭제하는 OperatorExternalPythonOperator
: 기존에 존재하는 파이썬 가상환경에서 작업을 수행하는 OperatorBranchPythonOperator
: 파이썬 함수 실행 결과에 따라 다음 Task를 선택적으로 실행시킬 수 있는 OperatorBranchPythonVirtualenvOperator
: 가상환경 생성/삭제 및 브랜치 처리 기능이 있는 OperatorBranchExternalPythonOperator
: 기존 가상환경을 사용하면서 브랜치 처리 기능이 있는 Operator# dags/python_operator.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
import pendulum
import random
with DAG(
dag_id="python_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "python"],
) as dag:
def select_country():
COUNTRIES = [
"Argentina", "Australia", "Brazil", "Canada", "China", "France", "Germany",
"India", "Indonesia", "Italy", "Japan", "Mexico", "Russia", "Saudi Arabia",
"South Africa", "South Korea", "Turkey", "United Kingdom", "United States"
]
print(random.choice(COUNTRIES))
python_task = PythonOperator(
task_id="python_task",
python_callable=select_country,
)
BashOperator
예제를 가져오고 Task를 PythonOperator
로 대체select_country()
를 정의select_country()
함수를 실행하는 python_task
를 단일 Task로 사용python_operator
DAG가 올라온 것을 확인python_task
의 첫 번째 실행 로그에서는 "Argentina" 국가가 선택되어 출력python_task
를 수동 실행한 로그에서는 "Indonesia" 국가가 선택되어 출력# scheduled run (first)
[2025-05-31, 11:33:06] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 11:33:06] INFO - Filling up the DagBag from /opt/airflow/dags/python_operator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 11:33:06] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 11:33:06] INFO - Argentina: chan="stdout": source="task"
# manual run (second)
[2025-05-31, 11:36:38] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 11:36:38] INFO - Filling up the DagBag from /opt/airflow/dags/python_operator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 11:36:38] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 11:36:38] INFO - Indonesia: chan="stdout": source="task"
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/plugins.html
plugins/
경로에 파이썬 함수를 작성하고 외부에서 활용plugins/
경로 아래에 현재 시간을 출력하는 print_now()
함수를 정의# plugins/common/common_func.py
import datetime as dt
def print_now():
print(dt.datetime.now())
# dags/python_plugins.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from common.common_func import print_now
import pendulum
with DAG(
dag_id="python_plugins",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "python"],
) as dag:
python_plugins_task = PythonOperator(
task_id="python_plugins_task",
python_callable=print_now,
)
plugins/
에 정의한 함수를 import 해서 사용하는 python_plugins
DAG 및 단일 Task를 생성plugins/
경로를 생략한 import 문을 인식하지 못하기 때문에 .env
파일에 PYTHONPATH
를 추가# .env
WORKSPACE_FOLDER=/Users/.../airflow
PYTHONPATH=${WORKSPACE_FOLDER}/plugins
python_plugins
DAG가 올라온 것을 확인python_plugins_task
의 실행 로그에서 현재 시간이 출력된 결과를 조회[2025-05-31, 12:10:35] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 12:10:35] INFO - Filling up the DagBag from /opt/airflow/dags/python_plugins.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 12:10:35] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-05-31, 12:10:35] INFO - 2025-05-31 03:10:35.099945: chan="stdout": source="task"
PythonOperator
사용하는 것보다는 코드가 짧고 가독성이 좋은 @task
데코레이터를 활용하는 것을 권장# dags/python_decorator.py
from airflow.sdk import DAG, task
import pendulum
with DAG(
dag_id="python_decorator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "python"],
) as dag:
@task(task_id="python_decorator_task")
def print_input(__input):
print(__input)
python_decorator_task = print_input("@task 데코레이터 실행")
example_python_decorator
를 복제 및 일부를 수정하여 DAG 선언@task
데코레이터를 추가하고 task_id
파라미터를 부여python_decorator_task
의 실행 로그에서 전달한 인수가 출력된 것을 확인[2025-05-31, 12:29:22] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 12:29:22] INFO - Filling up the DagBag from /opt/airflow/dags/python_decorator.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 12:29:22] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
[2025-05-31, 12:29:22] INFO - @task 데코레이터 실행: chan="stdout": source="task"
*args
와 같이 *
를 붙여서 Tuple 타입의 객체를 받을 수 있음**kwargs
와 같이 **
를 붙여서 딕셔너리 타입의 객체를 받을 수 있음regist()
를 아래와 같이 구현# plugins/common/common_func.py
def regist(name: str, age: int, *args, **kwargs):
print(f"이름: {name}")
print(f"나이: {age}")
for __key, __value in kwargs.items():
print(f"{__key}: {__value}")
if args:
print(f"기타 정보: {args}")
PythonOperator
도 함수를 실행할 때 인자 또는 파라미터를 전달할 수 있는 방법을 제공op_args
파라미터에 배열 객체를 전달op_kwargs
파라미터에 딕셔너리 객체를 전달# dags/python_parameter.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from common.common_func import regist
import pendulum
with DAG(
dag_id="python_parameter",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "python"],
) as dag:
regist_task = PythonOperator(
task_id="regist_task",
python_callable=regist,
op_args=["김철수", 20, "서울", "대한민국"],
op_kwargs={"이메일":"su@example.com", "전화번호":"010-1234-5678"},
)
regist_task
의 실행 로그에서 전달한 인자와 파라미터가 출력된 것을 확인ds
또는 ts
등 Airflow에서 만들어지는 파라미터가 같이 전달되는 것 같은데 앞으로 Airflow를 알게되면서 활용할 수 있을 것을 기대[2025-05-31, 15:04:22] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-05-31, 15:04:22] INFO - Filling up the DagBag from /opt/airflow/dags/python_parameter.py: source="airflow.models.dagbag.DagBag"
[2025-05-31, 15:04:23] INFO - 이름: 김철수: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 나이: 20: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 이메일: su@example.com: chan="stdout": source="task"
[2025-05-31, 15:04:23] INFO - 전화번호: 010-1234-5678: chan="stdout": source="task"
...
[2025-05-31, 15:04:23] INFO - 기타 정보: ('서울', '대한민국'): chan="stdout": source="task"