이 게시물은
airflow 2.10.5버전을 사용해서 작성됐습니다.
3.x버전과 조금 다를 수 있으니 유의하시기 바랍니다.
이전 글에서는 기본적인 airflow 개발환경 세팅법을 알아봤습니다.
이번 글에서는 Task 를 생성하는 Operator 클래스에 대해 알아보겠습니다.
이를 위해 Airflow 에서는 기본으로 제공하는 Operator 중에서
BashOperator 와 PythonOperator 를 통해 기본적인 사용법을 익혀보겠습니다.
BashOperator 는 말 그대로 Bash 명령어를 실행하는 Task 를 만드는 클래스입니다.
BashOperator 는 간단한 명령어 뿐만아니라 shell script 도 실행할 수 있습니다.
지금부터 그 방법을 알아보죠.
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="dags_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_hello = BashOperator(
task_id="bash_hello",
bash_command="echo 'Hello World'"
)
bash_hello
BashOperator 로 shell script 를 실행하려면
airflow-worker 가 접근 가능한 Path (경로) 에 스크립트를 저장해야 합니다.
참고:
airflow-worker는task를 실제 실행시키는 주체입니다.
그렇다면 정확히 어떤 Path 에 저장해야 할까요?
크게 2가지 방법이 있습니다.
python 으로 작성한 DAG 파일에 가까운 상대 경로에 저장 (참고링크)airflow info 명령어를 통해서 조회된 python_path 경로에 저장위 2가지 방식에 대한 간단한 테스트를 위해서 아래와 같이 shell script 를 저장했습니다.
DAG 파일(=hello_world_bash.py)과 가까운 위치(=scripts/helloworld.sh)에
shell script 를 저장해놨습니다.
이제 각 경우에 코드를 어떻게 DAG 코드를 작성해야되는지 알아봅시다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.decorators import task
import pendulum
with DAG(
dag_id="hello_world_bash",
schedule="30 6 * * *",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
task1 = BashOperator(
task_id="task1",
bash_command="scripts/helloworld.sh",
# 참고: env 파라미터로 환경변수도 줄 수 있습니다!
env={'MY_NAME': 'CODING_TOAST'}
)
task1
참고로 스크립트 내용은 다음과 같습니다.
#!/bin/bash
# 파일명은 helloworld.sh 입니다.
echo 'Hello World'
echo "my name is ... $MY_NAME!"
이제 DAG 를 실행하고 Log 보겠습니다.
먼저 airflow info 명령어를 통해서 python_path 경로를 정확히 알아봅시다.
저는 docker 로 airflow 를 실행 중이라서 아래와 같이 명령어를 입력했습니다.
docker exec compose-v2-airflow-webserver-1 airflow info

명령어 입력 후 여러 정보가 나오지만, Paths info > python_path line 에 나오는
모든 경로들이 저희의 shell script 를 저장하고, 참고할 수 있는 위치입니다.
예를 들어서 python_path 나온 것 중 /opt/airflow/plugins 를 사용하고 싶다면
해당 경로 하단에 스크립트를 넣고, BashOperator 를 아래처럼 작성하면 끝입니다.
task1 = BashOperator(
task_id="task1",
bash_command="/opt/airflow/plugins/helloworld.sh",
env={'MY_NAME': 'CODING_TOAST'}
)
참고:
docker-compose.yaml사용 중이신 분들은yml파일이 있는
위치의plugins디렉토리가container내부의/opt/airflows/plugins
와 매핑된 상태입니다. 참고해서 실습해보시기 바랍니다.![]()
PythonOperator 는 python 함수 를 실행하는 Operator 입니다.
간단한 사용법을 먼저 알아봅시다.
# 파일명: dags_python_op1.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
import pendulum
with DAG(
dag_id="dags_python_op1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
def good_bye_world(name, age, *args, **kwargs) -> None:
print('good_by_world')
print('my name is ...', name)
print('my age is ...', age)
print('arg list : ', args)
print('kwargs -> key_sample1 : ', kwargs['key_sample1'])
task_t1 = PythonOperator(
task_id="task_t1",
python_callable=good_bye_world,
op_args=['coding_toast', 20, 'arg sample1'],
op_kwargs={'key_sample1':'value_sample1'}
)
# 데코레이터 방식을 사용할 수도 있습니다.
@task(task_id="task_t2")
def task_t2():
print('task 2 executed')
task_t1 >> task_t2()
아까 본 예시에서는 간단하게 PythonOperator 와 함수를 같은 DAG 코드
안에 작성하고 사용했습니다.
그런데 만약에 많은 DAG 에서 똑같은 python 함수를 사용하고 싶을 때는 어떻게 할까요?
이때는 주로 공통 모듈 을 작성하고 DAG 코드에서 import 해오는 형식으로 사용합니다.
그렇다면 공통 모듈 은 어디에 작성해야 할까요?
python 에서는 import 를 위해 사용하는 경로가 모두 sys.path 에 있습니다.
airflow 는 이러한 경로들을 airflow info -> python_path 를 통해 알 수 있습니다.

여러 경로들 중 하나를 선택하고 그 하위에 python module 을 작성하면 된다는 거죠.
저는 경로들 중에서 /opt/airflow/plugins 경로에 하단에 모듈을 작성해보겠습니다.
이후에 나오는
./plugins,./dags라는 디렉토리는
각각 컨테이너의/opt/airflow/plugins,/opt/airflow/dags와 매핑된 경로입니다.
아래처럼 ./plugins/common/common_func.py 를 하나 생성하고,
./dags 디렉토리의 DAG 코드에서 공통 모듈의 함수를 import 해서 사용해보겠습니다.
common_func.py 코드
def good_bye_world(name, age, *args, **kwargs) -> None:
print('good_by_world')
print('my name is ...', name)
print('my age is ...', age)
print('arg list : ', args)
print('kwargs -> key_sample1 : ', kwargs['key_sample1'])
dags_python_op1.py 코드
# 파일명: dags_python_op1.py
from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum
# 공통 모듈의 from, import 를 정확히 작성해야 합니다!
from common.common_func import good_bye_world
with DAG(
dag_id="dags_python_op1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
task_t1 = PythonOperator(
task_id="task_t1",
python_callable=good_bye_world,
op_args=['coding_toast', 20, 'arg sample1'],
op_kwargs={'key_sample1':'value_sample1'}
)
task_t1
참고: op_args, op_kwargs 를 통해서 python_callable 함수에 원하는 파라미터를 보낼 수 있습니다!
실행시키고 Task 의 Log 를 확인해볼까요?
원하던 대로 정확하게 찍혔습니다.
함수 실행은 물론이고 op_args, op_kwargs 를 통해서 보낸 파라미터들이
함수에 잘 전달된 것까지 확인할 수 있습니다.
위에서 가장 기본적이며, 자주 사용되는 2개의 Operator 사용법에 대해서 알아봤습니다.
그런데 사실 위 2개의 Operator 들, 그리고 그외 모든 Operator 들은
모~두 BaseOperator 라는 클래스를 상속한 자식 클래스입니다.
이말은 저희도 BaseOperator 를 상속한 클래스를 만들기만 하면
저희만의 Custom Operator 를 생성할 수 있다는 의미이기도 합니다.
그럼 한번 BaseOperator 를 상속한 Custom Operator 를 구현해보겠습니다.
# https://airflow.apache.org/docs/apache-airflow/2.10.5/howto/custom-operator.html 참고
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
# 이 메소드가 실제 task 가 실행할 내용을 담습니다.
# 그러니 저희의 비즈니스 로직이 여기에 작성하면 되는 겁니다.
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
이렇게 만들고 DAG 코드에 아래처럼 다른 Operator 과 마찬가지로 사용하시면 됩니다.
from custom_operator.hello_operator import HelloOperator
with DAG(
# ... 생략 ...
) as dag:
hello_task = HelloOperator(task_id="sample-task", name="foo_bar")
지금은 매우 간단한 예시지만, 더 난해한 것도 만들 수도 있습니다 😀
이상으로 글을 마치도록 하겠습니다.
감사합니다.