[Airflow] Operator 기본 사용법

식빵·2025년 6월 4일
0

Airflow

목록 보기
2/9
post-thumbnail

이 게시물은 airflow 2.10.5 버전을 사용해서 작성됐습니다.
3.x 버전과 조금 다를 수 있으니 유의하시기 바랍니다.

🔥 Operator 사용법을 알아보자

이전 글에서는 기본적인 airflow 개발환경 세팅법을 알아봤습니다.
이번 글에서는 Task 를 생성하는 Operator 클래스에 대해 알아보겠습니다.

이를 위해 Airflow 에서는 기본으로 제공하는 Operator 중에서
BashOperatorPythonOperator 를 통해 기본적인 사용법을 익혀보겠습니다.



📝 BashOperator

BashOperator 는 말 그대로 Bash 명령어를 실행하는 Task 를 만드는 클래스입니다.
BashOperator간단한 명령어 뿐만아니라 shell script 도 실행할 수 있습니다.
지금부터 그 방법을 알아보죠.

1. 간단한 명령어 실행

from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id="dags_bash_operator",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:

    bash_hello = BashOperator(
        task_id="bash_hello",
        bash_command="echo 'Hello World'" 
    )

    bash_hello



2. Shell Script 실행

Shell Script Path

BashOperatorshell script 를 실행하려면
airflow-worker 가 접근 가능한 Path (경로) 에 스크립트를 저장해야 합니다.

참고: airflow-workertask 를 실제 실행시키는 주체입니다.

그렇다면 정확히 어떤 Path 에 저장해야 할까요?
크게 2가지 방법이 있습니다.


  1. python 으로 작성한 DAG 파일에 가까운 상대 경로에 저장 (참고링크)
  2. airflow info 명령어를 통해서 조회된 python_path 경로에 저장

위 2가지 방식에 대한 간단한 테스트를 위해서 아래와 같이 shell script 를 저장했습니다.

DAG 파일(=hello_world_bash.py)과 가까운 위치(=scripts/helloworld.sh)에
shell script 를 저장해놨습니다.

이제 각 경우에 코드를 어떻게 DAG 코드를 작성해야되는지 알아봅시다.


상대 경로 사용 시

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.decorators import task
import pendulum

with DAG(
    dag_id="hello_world_bash",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    task1 = BashOperator(
        task_id="task1",
        bash_command="scripts/helloworld.sh",
        # 참고: env 파라미터로 환경변수도 줄 수 있습니다!
        env={'MY_NAME': 'CODING_TOAST'}
    )

    task1

참고로 스크립트 내용은 다음과 같습니다.

#!/bin/bash
# 파일명은 helloworld.sh 입니다.
echo 'Hello World'
echo "my name is ... $MY_NAME!"

이제 DAG 를 실행하고 Log 보겠습니다.



airflow info 경로 사용 시

먼저 airflow info 명령어를 통해서 python_path 경로를 정확히 알아봅시다.
저는 dockerairflow 를 실행 중이라서 아래와 같이 명령어를 입력했습니다.

docker exec compose-v2-airflow-webserver-1 airflow info

명령어 입력 후 여러 정보가 나오지만, Paths info > python_path line 에 나오는
모든 경로들이 저희의 shell script 를 저장하고, 참고할 수 있는 위치입니다.

예를 들어서 python_path 나온 것 중 /opt/airflow/plugins 를 사용하고 싶다면
해당 경로 하단에 스크립트를 넣고, BashOperator 를 아래처럼 작성하면 끝입니다.

task1 = BashOperator(
	task_id="task1",
	bash_command="/opt/airflow/plugins/helloworld.sh",
	env={'MY_NAME': 'CODING_TOAST'}
)    

참고: docker-compose.yaml 사용 중이신 분들은 yml 파일이 있는
위치의 plugins 디렉토리가 container 내부의 /opt/airflows/plugins
와 매핑된 상태입니다. 참고해서 실습해보시기 바랍니다.




🐍 PythonOperator

1. 간단 사용법

PythonOperatorpython 함수 를 실행하는 Operator 입니다.
간단한 사용법을 먼저 알아봅시다.

# 파일명: dags_python_op1.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
import pendulum

with DAG(
    dag_id="dags_python_op1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    def good_bye_world(name, age, *args, **kwargs) -> None:
    	print('good_by_world')
    	print('my name is ...', name)
    	print('my age is ...', age)
    	print('arg list : ', args)
    	print('kwargs -> key_sample1 : ', kwargs['key_sample1'])
    
    task_t1 = PythonOperator(
        task_id="task_t1",
        python_callable=good_bye_world,
        op_args=['coding_toast', 20, 'arg sample1'],
        op_kwargs={'key_sample1':'value_sample1'}
    )
    
    
    # 데코레이터 방식을 사용할 수도 있습니다.
    @task(task_id="task_t2")
    def task_t2():
        print('task 2 executed')
    
    task_t1 >> task_t2()

2. 공통모듈 함수

아까 본 예시에서는 간단하게 PythonOperator 와 함수를 같은 DAG 코드
안에 작성하고 사용했습니다.

그런데 만약에 많은 DAG 에서 똑같은 python 함수를 사용하고 싶을 때는 어떻게 할까요?
이때는 주로 공통 모듈 을 작성하고 DAG 코드에서 import 해오는 형식으로 사용합니다.
그렇다면 공통 모듈 은 어디에 작성해야 할까요?

python 에서는 import 를 위해 사용하는 경로가 모두 sys.path 에 있습니다.
airflow 는 이러한 경로들을 airflow info -> python_path 를 통해 알 수 있습니다.

여러 경로들 중 하나를 선택하고 그 하위에 python module 을 작성하면 된다는 거죠.
저는 경로들 중에서 /opt/airflow/plugins 경로에 하단에 모듈을 작성해보겠습니다.

이후에 나오는 ./plugins, ./dags 라는 디렉토리는
각각 컨테이너의 /opt/airflow/plugins, /opt/airflow/dags 와 매핑된 경로입니다.

아래처럼 ./plugins/common/common_func.py 를 하나 생성하고,
./dags 디렉토리의 DAG 코드에서 공통 모듈의 함수를 import 해서 사용해보겠습니다.


common_func.py 코드

def good_bye_world(name, age, *args, **kwargs) -> None:
    print('good_by_world')
    print('my name is ...', name)
    print('my age is ...', age)
    print('arg list : ', args)
    print('kwargs -> key_sample1 : ', kwargs['key_sample1'])

dags_python_op1.py 코드

# 파일명: dags_python_op1.py
from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum

# 공통 모듈의 from, import 를 정확히 작성해야 합니다!
from common.common_func import good_bye_world


with DAG(
    dag_id="dags_python_op1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    task_t1 = PythonOperator(
        task_id="task_t1",
        python_callable=good_bye_world,
        op_args=['coding_toast', 20, 'arg sample1'],
        op_kwargs={'key_sample1':'value_sample1'}
    )
    
    task_t1

참고: op_args, op_kwargs 를 통해서 python_callable 함수에 원하는 파라미터를 보낼 수 있습니다!


실행시키고 Task 의 Log 를 확인해볼까요?

원하던 대로 정확하게 찍혔습니다.
함수 실행은 물론이고 op_args, op_kwargs 를 통해서 보낸 파라미터들이
함수에 잘 전달된 것까지 확인할 수 있습니다.




📝 Custom Operator


위에서 가장 기본적이며, 자주 사용되는 2개의 Operator 사용법에 대해서 알아봤습니다.
그런데 사실 위 2개의 Operator 들, 그리고 그외 모든 Operator 들은
모~두 BaseOperator 라는 클래스를 상속한 자식 클래스입니다.

이말은 저희도 BaseOperator 를 상속한 클래스를 만들기만 하면
저희만의 Custom Operator 를 생성할 수 있다는 의미이기도 합니다.

그럼 한번 BaseOperator 를 상속한 Custom Operator 를 구현해보겠습니다.

# https://airflow.apache.org/docs/apache-airflow/2.10.5/howto/custom-operator.html 참고
from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
	
    # 이 메소드가 실제 task 가 실행할 내용을 담습니다.
    # 그러니 저희의 비즈니스 로직이 여기에 작성하면 되는 겁니다.
    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

이렇게 만들고 DAG 코드에 아래처럼 다른 Operator 과 마찬가지로 사용하시면 됩니다.

from custom_operator.hello_operator import HelloOperator

with DAG(
    # ... 생략 ...
) as dag:
    hello_task = HelloOperator(task_id="sample-task", name="foo_bar")

지금은 매우 간단한 예시지만, 더 난해한 것도 만들 수도 있습니다 😀


이상으로 글을 마치도록 하겠습니다.
감사합니다.




참고한 링크

profile
백엔드 개발자로 일하고 있는 식빵(🍞)입니다.

0개의 댓글