
airflow에서 bashoperator와 함께 자주 쓰이는 오퍼레이터
airflow.operators.python
python operator document
자주 쓰이는 건 이렇게 두개가 있다.
PythonOperator : 파이썬 함수 실행을 위한 오퍼레이터@task(task_id="print_the_context") def print_context(ds=None, **kwargs): """Print the Airflow context and ds variable from the context.""" pprint(kwargs) print(ds) return "Whatever you return gets printed in the logs" run_this = print_context()
PythonBranchOperator : 파이썬 실행 결과에 따라 branch를 나누어 실행 할 수 있는 오퍼레이터
@task.branch() def branching(choices: list[str]) -> str: return f"branch_{random.choice(choices)}"
from airflow import DAG
from airflow.operators.python import PythonOperator
import random
import pendulum
with DAG (
dag_id ="dags_python_operators",
schedule ="*/5 * * * *" , ##cron
start_date = pendulum.datetime(2025,1,1,tz="Asia/Seoul"),
catchup = False, ## 누락된 구간 x 소급적용 x
)as dag: ## 실행할 bash 명령어
def select_fruit():
fruit = [ 'APPLE' , 'BANANA' , 'ORANGE' , 'LEMON']
i = 0
while(i<10):
rand_int = random.randint(0,3)
if (rand_int == 0):
print(fruit[rand_int] + "는 빨개")
elif (rand_int == 1):
print("하얀건 "+fruit[rand_int] )
elif (rand_int == 2):
print(fruit[rand_int] + "은 맛있어")
elif (rand_int == 3):
print(fruit[rand_int] + "은 너무 셔" )
i+=1
py_t1 = PythonOperator(
task_id = "py_t1"
python_callable= select_fruit
)
py_t1
airflow dags git pull



하얀건 바나나 ㅎ
기존 작성된 task를 수정하지 않고 동작 방식을 변경할 수 있다. (wrapping)
✅ 1. Task Decorator의 장점
코드 가독성 향상 → PythonOperator보다 더 직관적인 DAG 코드 작성 가능
데이터 전달 간소화 → XCom을 자동으로 활용하여 Task 간 데이터 전달 가능
코드량 감소 → 불필요한 Operator 설정 없이 간단하게 Task 정의 가능
ex )
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
import pendulum
with DAG (
dag_id ="dags_python_task_decorator",
schedule ="0 2 * * 1" , ##cron
start_date = pendulum.datetime(2025,1,1,tz="Asia/Seoul"),
catchup = False, ## 누락된 구간 x 소급적용 x
)as dag: ## 실행할 bash 명령어
@task(task_id="python_task_1")
def print_context(some_input):
print(some_input)
python_task_1 = print_context("Run task_decorator!!")
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
# DAG 정의
@dag(schedule="@daily", start_date=days_ago(1), catchup=False)
def my_simple_dag():
@task
def extract():
return {"data": [1, 2, 3, 4, 5]} # XCom을 통해 전달됨
@task
def transform(data):
return [x * 2 for x in data["data"]] # 변환 후 데이터 반환
@task
def load(transformed_data):
print(f"Loaded Data: {transformed_data}")
# Task 실행 순서 정의
raw_data = extract()
transformed_data = transform(raw_data)
load(transformed_data)
# DAG 인스턴스 생성
my_simple_dag()