

회사에서 데이터 파이프라인을 만들면서 데이터 엔지니어링 쪽에 관심이 생겼다. 뭔가 내가 만든 파이프라인을 볼 때마다 뭔가 관리가 제대로 안 되는 듯한 느낌을 받았는데... (물론 나 혼자의 의견으로만 만든 것은 아님) 회사에 DE쪽을 아는 사람이 거의 없는 것 같기 때문에 이 참에 공부하면 좋을 것 같아 차근차근 작성해보고자 한다. 👀
WorkflowDAG(Directed Acyclic Graph)Task
Web ServerSchedulerExecutorWorkerDatabaseQueueBashOperator : Python 함수 실행PythonOperator : bash command 실행EmailOperator : Email 전송MySqlOperator : sql 쿼리 수행~airflow/dags 폴더 밑에 {dags_name}.py 생성from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import random
with DAG(
# dag_id에는 보통 python file과 같은 이름으로 생성
dag_id="dags_python_operator",
schedule="30 6 * * *", # 분 시 일 월 요일
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"), # dag 생성 이후 바꾸면 안됨. 바꾸려면 dag 새로 생성 or dag_id 변경
catchup=False # start_date부터 backfill 할 것인지 여부
) as dag:
# 1. bash 오퍼레이터
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command="echo whoami", # ehco = print
)
# 2. 파이썬 오퍼레이터
def select_fruit():
fruit = ['APPLE', 'BANANA', 'ORANGE', 'AVOCADO']
rand_int = random.randint(0,3) # 0 ~ 3 까지 임의의 숫자 반환
print(fruit[rand_int])
# 해당 operator를 통해 task 정의
py_t1 = PythonOperator(
task_id = 'py_t1',
python_callable=select_fruit
)
# task들의 수행 순서
bash_t1 >> py_t1