2026.04.13(Mon)

오유찬·2026년 4월 13일

DE

목록 보기
8/16

Airflow


Airflow 연산자

EmptyOperator : 문제 해결을 위한 task나 아직 구현되지 않은 task를 표현하는 데 사용

BashOperator : 지정된 Bash 명령어나 스크립트 실행

  • 워크플로우 맥락에서 의미가 있다면, Bash가 할 수 있는 거의 모든 동작을 수행할 수 있다.

실제 액션 정의

Airflow Task

  • operator를 instance화한 실제 실행 단위
  • Dag 내부에서 정의되고, task 간 연결 정립
  • 한 task는 하나의 책임으로 명확하게 정의 내릴 것!
  • 대용량 데이터를 보내는 것보다는 메타데이터만 보내고 실제 데이터는 s3와 같은 스토리지에서 사용
  • 의존성 확립 중요 (Upstream, Downstream 설정, 방향을 명확하게 설정할 것!)

t1 >> t2 : task1 진행 후, task2

  • t1 : upstream
  • t2 : downstream

t1 >> t2 << t3
t1, t3가 끝나야 t2 진행

python operator에서 op_kwargs 키워드 인자 딕셔너리의 키와 함수의 이름은 항상 일치해야 한다.

def pull_file(URL, savepath):
	r = requests.get(URL)
	with open(savepath, 'wb') as f:
	f.write(r.content)
	# Use the print method for logging
	print(f"File pulled from {URL} and saved to {savepath}")
	
from airflow.operators.python import PythonOperator

# Create the task
pull_file_task = PythonOperator(
	task_id='pull_file',
	# Add the callable
	python_callable=pull_file,
	# Define the arguments
	op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'}
)

python_callable는 키워드 인자로 pull_file()을 인자로 함수에 괄호를 붙여서 보내면 바로 함수가 실행이 되면서 반환값이 전달되기 때문에 에러가 뜬다. 바로 실행되지 않고 함수만 전달되도록 pull_file만 전달한다.

Airflow cron 기본 문법 (5자리)

Airflow cron은 기본적으로 유닉스 cron과 같은 5필드를 씁니다.

* * * * * 분 시 일 월 요일
각 필드 의미:

  • 분(minute): 0-59
  • 시(hour): 0-23
  • 일(day of month): 1-31
  • 월(month): 1-12 또는 JAN-DEC
  • 요일(day of week): 0-6 또는 SUN-SAT (0/7 = 일요일)

자주 쓰는 패턴:

  • * : 가능한 모든 값 (매 분, 매 시 등)
  • , : 여러 값 지정 (예: 1,2,5)
  • - : 범위 (예: 1-5 = 월~금 요일)
  • */n: n 간격 (예: */5 = 5분마다)

자주 쓰는 Airflow cron 예시

Airflow DAG 정의에서:

from airflow import DAG
from datetime import datetime

with DAG(
    dag_id="example_cron",
    start_date=datetime(2025, 1, 1),
    schedule_interval="0 0 * * *",  # 매일 0시
    catchup=False,
) as dag:
    ...

매일 0시 : 0 0 * * *
매시간 정각 : 0 * * * *
매 5분 : */5 * * * *

preset 문자열도 지원

  • None : 스케줄 없이 수동 / 외부 트리거로만 실행
  • @once : 한 번만 실행
  • @hourly : 매시간 정각
  • `@daily : 매일 0시
  • `@weekly : 매주 일요일 0시
  • @monthly : 매달 1일 0시
  • @yearly : 매년 1월 1일 0시 0 0 1 1 *

센서 (Sensor)

특정 조건이 참이 될 때까지 계속 체크(polling)만 하는 특수한 operator

  • 조건이 만족되면 success로 끝나고 그 뒤의 downstream task들이 실행
  • EX
    - FileSensor : S3/FTP/local에 특정 파일이 생겼는지
    - SqlSensor : table에 레코드가 생겼는지
    - ExternalTaskSensor : 다른 DAG/Task가 완려되었는지

Sensor 동작 방식 (poke vs reschedule)

공통 주요 파라미터

  • poke_interval : 몇 초마다 조건을 체크할지(default = 60s)
  • timeout : 최대 대기 시간(지나면 실패, 단위 : seconds)
  • mode : "poke" 또는 "reschedule"
  • soft_fail : 실패 시 FAILED 대신 SKIPPED로 처리할지 결정

mode 차이

  • poke 모드(default)
    - sensor task가 돌아가는 동안 계속 워커 슬롯 점유
    - 지연 시간(조건 충족 후 반응하는데까지 걸리는 시간)이 짧은 대신, 워커 자원을 많이 사용
  • reschedule모드
    - 체크할 때만 잠깐 점유하고 확인하고, 나머지는 자원 반납
    - 워커 자원 효율 좋지만, poke_interval 단위로 약간의 지연 존재

Example Code

from airflow import DAG
from datetime import datetime
from airflow.provides.common.sql.sensors.sql import SqlSensor
from airflow.operators.python import PythonOperator

def process_data():
	pritn("데이터 처리 시작")

with DAG(
	dag_id='example_sql_sensor',
	start_date=datetime(2025, 1, 1),
	schedule_interval="@daily",
	catchup=False,
) as dag:

	wait_for_partition = SqlSensor(
		task_id="wait_for_partition",
		conn_id="postgres_default",
		sql="""
			SELECT 1
			FROM partitions
			WHERE dt = {{ ds }}
		""",
		poke_interval=60,       # 60s마다 체크
		timeout=60 * 60 * 3,    # 최대 3시간 기다리기
		mode="reschedule",      # 워카 자원 아끼기
	)
	
	run_processing = PythonOperator(
		task_id="run_processing",
		python_callable=process_data,
	)
	
	wait_for_partition >> run_processing

→ 이 DAG는 매일 1회 실행되면서, 해당 날짜의 partition이 DB에 생길 때까지 seonsor가 기다리고, 이후에 처리 Task를 실행하게 된다.

Airflow Debug & TroubleShooting

  1. DAG 레벨
    • DAG가 UI에 안 보인다? → DAG parsing/import error
    • airflow dags list-import-errors로 확인 가능
  2. Task 레벨
    • task 상태와 log 확인
  3. scheduler/worker 레벨
    • 스케줄러가 task를 잡아 주는지, 워커가 실제로 실행되는지 확인
  4. 외부 시스템(파일, DB, API) 레벨
    • sensor, hook, operator가 의존하는 S3, DB, API 쪽 문제

이 순서대로 좁히면서 버그 위치를 찾는 게 Debug 핵심!

Debugging Tool & Pattern
1. Airflow UI
2. CLI로 개별 Task 테스트
- airflow tasks test <dag_id> <task_id> <execution_date>
- 메타데이터 DB 상태와 상관 없이 로컬에서 그 task만 실행해 보는 용도
3. dag.test()
- dag 파일 맨 아래 다음 추가 후, IDE/로컬에서 실행

  ```python
  if __name__ == "__main__":
	  dag_test()
  ```
  - 전체 dag를 하나의 프로세스에서 순서대로 실행해서, 어디에서 에러가 나는지 확인
  1. DebugExecuter / 로컬 개발 환경
  • AIRFLOW__CORE__EXECUTOR=DebugExecutor로 두고, SQLite+ 단일 프로세스로 디버깅용 실행

SLA(Service Level Agreement)

  • task 또는 DAG가 실행에 걸려야 하는 예상 시간
  • SLA Miss : task, DAG가 예상 시간 내 완료되지 못한 경우

SLA 정의하는 방법

  1. task에서 sla 인자 사용
task1 = BashOperator(
			task_id='sla_task',
			bash_command='runcode.sh',
			sla=timedelta(seconds=30),
			dag=dag)
  1. default_args 딕셔너리에 설정
default_args={
	'sla': timedelta(minutes=20),
	'start_date': datetime(2023, 2, 20)
}
dag = DAG('sla_dag', default_args=default_args)

Template이 적용된 BashOperator

Jinja Template

Airflow는 내부적으로 Jinja2라는 파이썬 template engine을 사용한다. 이를 통해 Bash 명령어 안에 {{}}형태의 중괄호를 사용하면, Airflow가 이를 실제 값으로 치환해준다.

자주 사용하는 템플릿 변수

BashOperator의 bash_command 내에서 가장 많이 쓰이는 변수들입니다.

변수명설명예시 출력
{{ ds }}execution_date의 날짜 (YYYY-MM-DD) (datestamp의 약자)2026-04-13
{{ ds_nodash }}하이픈이 없는 날짜20260413
{{ run_id }}현재 DAG Run의 고유 IDscheduled__2026-04-13...
{{ task_instance.task_id }}현재 실행 중인 태스크 이름generate_report
{{ params.my_param }}사용자가 직접 정의한 파라미터(사용자 지정값)
{{ prev_ds }}이전 DAG 실행 날짜
`Airflow config object: {{conf}}`conf 객체를 사용해 코드 안에서 현재 Airflow 설정에 접근 가능
  • .sh 파일 실행 시 끝에 공백 추가하여 템플릿 엔진 오작동 방지
    - 파일 경로 찾지 못하면 에러 발생하기 때문에
  • {{ ds }}, {{ ds_nodash }} → 파이썬 datetime 객체가 아니라 문자열이다.
    - ds는 datastamp의 약자로, 해당 task가 실행되어야 하는 논리적 시점의 날짜를 의미한다.

macros 변수

Airflow template에서 유용한 객체나 메서드에 대한 reference를 제공

  • macros.datetime ← 파이썬의 datetime.datetime 객체
  • macros.timedeltatimedelta 객체 참조
  • macros.uuid ← python의 uuid 객체와 동일
  • macros.ds_add 와 같은 추가 함수도 존재 ← 템플릿 안에서 날짜 계산 간단히 할 수 있도록 도와준다
    - {{ macros.ds_add('2020-05-15', 5) }} : 날짜에 일 수 더하기

python의 uuid 객체란?
Universally Unique Identifier(범용 고유 식별자)의 약자로, 네트워크 상에서 서로 다른 시스템들이 독립적으로 식별자를 생성하더라도 중복될 확률이 거의 없도록 설계된 128비트 길이의 숫자이다.

DB의 기본키, 세션 ID, file name 등 절대 중복되면 안 되는 고유값이 필요할 때 사용한다!

버전생성 방식특징
UUID1호스트 ID(MAC 주소) + 현재 시간생성 시간과 위치를 알 수 있지만, 개인정보(MAC) 노출 위험이 있음.
UUID3네임스페이스 + 이름 (MD5 해시)동일한 입력값에 대해 항상 동일한 UUID를 생성함.
UUID4완전 무작위(Random)가장 많이 사용됨. 중복 가능성이 극히 낮아 일반적인 고유값 생성에 최적.
UUID5네임스페이스 + 이름 (SHA-1 해시)UUID3과 같지만 보안성이 더 높은 해시 알고리즘 사용.
  • my_uuid = uuid.uuid4()

고급 template

templated_command="""
{% for filename in params.filenames %}
	echo "Reading {{ filename }}"
{% endfor %}
"""

Jinja 구문에서 for 루프의 끝을 나타내려면{% endfor %}

Branch(분기)

브랜칭 : 조건부 로직 가능토록 한다.
BranchPythonOperator 사용

  • from airflow.operators.python import BranchPythonOperator
    다음에 실행할 task id, (id 목록)을 반호나하는 python_callable을 받는다.

python_callable에서 중괄호를 안 쓰는 이유
BranchPythonOperatorpython_callable 인자는 문자열이 아니라 파이썬 함수 객체 그 자체를 전달받습니다.
- 동작 방식: 이 인자에는 함수의 "이름"을 넘겨줍니다. Airflow는 이 함수를 나중에 직접 호출(Call)합니다.
- 중괄호를 쓰지 않는 이유:
- 함수는 문자열이 아님: {{ }}는 문자열 내부의 텍스트를 바꿀 때 쓰는 문법입니다. 함수 객체 자체에는 적용되지 않습니다.
- 런타임 실행: 함수 내부에서 날짜 같은 정보가 필요하다면, Airflow는 함수를 호출할 때 context라는 딕셔너리에 모든 정보를 담아 보내줍니다. 함수 안에서 직접 꺼내 쓰면 되기 때문에 굳이 중괄호로 치환할 필요가 없습니다.

provide_context의 주요 역할
Airflow는 함수를 호출할 때 Context라는 거대한 딕셔너리를 인자로 전달한다.

  • 날짜 정보 : ds, logical_date, execution_date
  • 객체 정보 : dag, task
  • task 간 통신 : ti, task_instance(Xcom을 사용해 다른 task의 데이터를 가져올 때 필수)
def check_weekend(**kwargs):
	dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
	# If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
	if (dt.weekday() < 5):
		return 'email_report_task'
	else:
		return 'no_email_task'

branch_task = BranchPythonOperator(task_id='check_if_weekend',
	python_callable=check_weekend,
	provide_context=True,
	dag=dag)

production pipeline 구축하기

DAG 및 task 실행

  • command line에서 특정 task 실행 :
    airflow tasks test <dag_id> <task_id> <date>
  • 전체 DAG 실행
    airflow dags trigger -e <date> <dag_id> → 지정한 날짜에 전체 DAG가 실행하는 것처럼 동작한다.

Operator 요약

  • Bashoperator → bash_command 필요
  • PythonOperator → python_callable 필요
  • BranchPythonOperatorpython_callable, provide_context=True 필요, 호출 함수는 **kwargs를 받아야 함.
  • FileSensor → filepath 인자 필요

Building a Retail Data Pipeline(Project)

🛒 식료품 매출 데이터 처리 요구 사항

1. transform() 함수 구현

  • 입력: merged_df (데이터프레임)
  • 수행 작업:
    • 수치형 데이터의 결측치(Missing values)를 원하는 방식(예: 0 또는 평균값 등)으로 채웁니다. O
    • Month(월) 컬럼을 새로 추가합니다. O
    • 주간 매출(Weekly_Sales)이 $10,000를 초과하는 행만 유지합니다. O
    • 분석에 불필요한 컬럼들을 삭제합니다.
      • "Store_ID"
        - "Month"
        - "Dept"
        - "IsHoliday"
        - "Weekly_Sales"
        - "CPI"
        - ""Unemployment""
  • 출력: 최종 데이터프레임을 반환하며, 결과는 clean_data라는 변수에 저장되어야 합니다.

2. avg_weekly_sales_per_month() 함수 구현

  • 입력: clean_data (위에서 정제된 데이터프레임)
  • 수행 작업:
    • 월별 평균 매출을 계산합니다.
    • 분석에 필요한 MonthWeekly_Sales 컬럼만 선택합니다.
    • 메서드 체이닝(Chain operation)을 사용하여 다음 함수들을 순서대로 적용해야 합니다:
      1. groupby(): "Month" 컬럼을 기준으로 그룹화
      2. agg(): 평균 매출 계산
      3. reset_index(): 인덱스를 새로 재설정
      4. round(): 결과를 소수점 둘째 자리까지 반올림

3. load() 함수 구현

  • 입력: 정제된 데이터프레임(clean_data), 집계된 데이터프레임(agg_data), 그리고 각각의 저장 경로
  • 수행 작업:
    • 두 데이터프레임을 각각 clean_data.csvagg_data.csv 파일로 저장합니다.
    • 저장 시 인덱스(index)는 포함하지 않습니다.

4. validation() 함수 구현

  • 수행 작업:
    • load() 함수를 통해 생성된 두 개의 CSV 파일이 현재 작업 디렉토리에 실제로 존재하는지 확인합니다.

참고 사항:

  • 데이터베이스 연결을 위한 별도의 엔진 설정은 필요하지 않습니다.
  • 제공된 SQL 코드 셀에 쿼리를 실행하면 결과가 자동으로 grocery_sales라는 이름의 Pandas 데이터프레임으로 저장되며, 이를 바로 Python 코드에서 사용할 수 있습니다.
def avg_weekly_sales_per_month(clean_data):
    df = clean_data.groupby(by="Month").agg('mean').reset_index().round(2)
    return df

error → agg(mean)이라고 작성했는데 agg('mean')이라고 작성해야 한다

agg(mean) → 파이썬은 mean이라는 이름을 가진 변수나 객체를 찾으려고 하는데, 이전에 정의해두지 않았으면 이를 찾지 못하고 NameError가 발생시킨다.

os.path.exists → csv 파일 validation 확인

정규표현식 r'(\d+\.?\d*)'

  • \d : 숫자
  • + : 하나 이상 반복됨
  • [0-9] : \d와 같은 의미
  • \.? : 마침표 + 0개 또는 1개 → 소수점이 있을수도 있고 없을 수도 있고(?)
  • \d* : 숫자 + 0개 이상 → 소수점 뒤에 숫자가 붙을 수도 있고 없을 수도 있다

정규표현식에서 특정 문자열 뒤의 숫자 찾으려면?
캡쳐 그룹 사용 : 특정문자열(\d+\.?\d*)

profile
열심히 하면 재밌다

0개의 댓글