Airflow PythonOperator

BAO.DE·2025년 3월 28일

Apache Airflow

목록 보기
10/20
post-thumbnail

PythonOperator

airflow에서 bashoperator와 함께 자주 쓰이는 오퍼레이터

airflow.operators.python
python operator document
자주 쓰이는 건 이렇게 두개가 있다.
PythonOperator : 파이썬 함수 실행을 위한 오퍼레이터

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return "Whatever you return gets printed in the logs"
run_this = print_context()

PythonBranchOperator : 파이썬 실행 결과에 따라 branch를 나누어 실행 할 수 있는 오퍼레이터

@task.branch()
  def branching(choices: list[str]) -> str:
      return f"branch_{random.choice(choices)}"

Example DAG (feat.과일송)

from airflow import DAG
from airflow.operators.python import PythonOperator
import random
import pendulum 

with DAG (
    dag_id ="dags_python_operators",
    schedule ="*/5 * * * *" , ##cron
    start_date = pendulum.datetime(2025,1,1,tz="Asia/Seoul"),
    catchup = False, ## 누락된 구간 x 소급적용 x
)as dag: ## 실행할 bash 명령어
   def select_fruit():
      fruit = [ 'APPLE' , 'BANANA' , 'ORANGE' , 'LEMON']
      i = 0
      while(i<10):
         rand_int = random.randint(0,3)
         if (rand_int == 0):
            print(fruit[rand_int] + "는 빨개")
         elif (rand_int == 1):
            print("하얀건 "+fruit[rand_int] )
         elif (rand_int == 2):
            print(fruit[rand_int] + "은 맛있어")
         elif (rand_int == 3):
            print(fruit[rand_int] + "은 너무 셔" )
         i+=1

      py_t1 = PythonOperator(
         task_id = "py_t1"
         python_callable= select_fruit
      )

      py_t1 

airflow dags git pull

Scheduler 확인

하얀건 바나나 ㅎ

데코레이터

기존 작성된 task를 수정하지 않고 동작 방식을 변경할 수 있다. (wrapping)

✅ 1. Task Decorator의 장점
코드 가독성 향상 → PythonOperator보다 더 직관적인 DAG 코드 작성 가능

데이터 전달 간소화 → XCom을 자동으로 활용하여 Task 간 데이터 전달 가능

코드량 감소 → 불필요한 Operator 설정 없이 간단하게 Task 정의 가능

ex )

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task 
import pendulum 



with DAG (
    dag_id ="dags_python_task_decorator",
    schedule ="0 2 * * 1" , ##cron
    start_date = pendulum.datetime(2025,1,1,tz="Asia/Seoul"),
    catchup = False, ## 누락된 구간 x 소급적용 x
)as dag: ## 실행할 bash 명령어

    @task(task_id="python_task_1")
    def print_context(some_input):
        print(some_input)
        
    python_task_1 = print_context("Run task_decorator!!")
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

# DAG 정의
@dag(schedule="@daily", start_date=days_ago(1), catchup=False)
def my_simple_dag():
    @task
    def extract():
        return {"data": [1, 2, 3, 4, 5]}  # XCom을 통해 전달됨

    @task
    def transform(data):
        return [x * 2 for x in data["data"]]  # 변환 후 데이터 반환

    @task
    def load(transformed_data):
        print(f"Loaded Data: {transformed_data}")

    # Task 실행 순서 정의
    raw_data = extract()
    transformed_data = transform(raw_data)
    load(transformed_data)

# DAG 인스턴스 생성
my_simple_dag()

0개의 댓글