EmptyOperator : 문제 해결을 위한 task나 아직 구현되지 않은 task를 표현하는 데 사용
BashOperator : 지정된 Bash 명령어나 스크립트 실행
실제 액션 정의
t1 >> t2 : task1 진행 후, task2
t1 >> t2 << t3
t1, t3가 끝나야 t2 진행
python operator에서 op_kwargs 키워드 인자 딕셔너리의 키와 함수의 이름은 항상 일치해야 한다.
def pull_file(URL, savepath):
r = requests.get(URL)
with open(savepath, 'wb') as f:
f.write(r.content)
# Use the print method for logging
print(f"File pulled from {URL} and saved to {savepath}")
from airflow.operators.python import PythonOperator
# Create the task
pull_file_task = PythonOperator(
task_id='pull_file',
# Add the callable
python_callable=pull_file,
# Define the arguments
op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'}
)
python_callable는 키워드 인자로 pull_file()을 인자로 함수에 괄호를 붙여서 보내면 바로 함수가 실행이 되면서 반환값이 전달되기 때문에 에러가 뜬다. 바로 실행되지 않고 함수만 전달되도록 pull_file만 전달한다.
Airflow cron은 기본적으로 유닉스 cron과 같은 5필드를 씁니다.
* * * * * 분 시 일 월 요일
각 필드 의미:
0-590-231-311-12 또는 JAN-DEC0-6 또는 SUN-SAT (0/7 = 일요일)자주 쓰는 패턴:
* : 가능한 모든 값 (매 분, 매 시 등), : 여러 값 지정 (예: 1,2,5)- : 범위 (예: 1-5 = 월~금 요일)*/n: n 간격 (예: */5 = 5분마다)Airflow DAG 정의에서:
from airflow import DAG
from datetime import datetime
with DAG(
dag_id="example_cron",
start_date=datetime(2025, 1, 1),
schedule_interval="0 0 * * *", # 매일 0시
catchup=False,
) as dag:
...
매일 0시 : 0 0 * * *
매시간 정각 : 0 * * * *
매 5분 : */5 * * * *
None : 스케줄 없이 수동 / 외부 트리거로만 실행@once : 한 번만 실행 @hourly : 매시간 정각@monthly : 매달 1일 0시@yearly : 매년 1월 1일 0시 0 0 1 1 *특정 조건이 참이 될 때까지 계속 체크(polling)만 하는 특수한 operator
success로 끝나고 그 뒤의 downstream task들이 실행공통 주요 파라미터
poke_interval : 몇 초마다 조건을 체크할지(default = 60s)timeout : 최대 대기 시간(지나면 실패, 단위 : seconds)mode : "poke" 또는 "reschedule"soft_fail : 실패 시 FAILED 대신 SKIPPED로 처리할지 결정mode 차이
poke 모드(default)poke_interval 단위로 약간의 지연 존재from airflow import DAG
from datetime import datetime
from airflow.provides.common.sql.sensors.sql import SqlSensor
from airflow.operators.python import PythonOperator
def process_data():
pritn("데이터 처리 시작")
with DAG(
dag_id='example_sql_sensor',
start_date=datetime(2025, 1, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
wait_for_partition = SqlSensor(
task_id="wait_for_partition",
conn_id="postgres_default",
sql="""
SELECT 1
FROM partitions
WHERE dt = {{ ds }}
""",
poke_interval=60, # 60s마다 체크
timeout=60 * 60 * 3, # 최대 3시간 기다리기
mode="reschedule", # 워카 자원 아끼기
)
run_processing = PythonOperator(
task_id="run_processing",
python_callable=process_data,
)
wait_for_partition >> run_processing
→ 이 DAG는 매일 1회 실행되면서, 해당 날짜의 partition이 DB에 생길 때까지 seonsor가 기다리고, 이후에 처리 Task를 실행하게 된다.
airflow dags list-import-errors로 확인 가능이 순서대로 좁히면서 버그 위치를 찾는 게 Debug 핵심!
Debugging Tool & Pattern
1. Airflow UI
2. CLI로 개별 Task 테스트
- airflow tasks test <dag_id> <task_id> <execution_date>
- 메타데이터 DB 상태와 상관 없이 로컬에서 그 task만 실행해 보는 용도
3. dag.test()
- dag 파일 맨 아래 다음 추가 후, IDE/로컬에서 실행
```python
if __name__ == "__main__":
dag_test()
```
- 전체 dag를 하나의 프로세스에서 순서대로 실행해서, 어디에서 에러가 나는지 확인
AIRFLOW__CORE__EXECUTOR=DebugExecutor로 두고, SQLite+ 단일 프로세스로 디버깅용 실행SLA 정의하는 방법
sla 인자 사용task1 = BashOperator(
task_id='sla_task',
bash_command='runcode.sh',
sla=timedelta(seconds=30),
dag=dag)
default_args 딕셔너리에 설정default_args={
'sla': timedelta(minutes=20),
'start_date': datetime(2023, 2, 20)
}
dag = DAG('sla_dag', default_args=default_args)
Airflow는 내부적으로 Jinja2라는 파이썬 template engine을 사용한다. 이를 통해 Bash 명령어 안에
{{}}형태의 중괄호를 사용하면, Airflow가 이를 실제 값으로 치환해준다.
자주 사용하는 템플릿 변수
BashOperator의 bash_command 내에서 가장 많이 쓰이는 변수들입니다.
| 변수명 | 설명 | 예시 출력 |
|---|---|---|
{{ ds }} | execution_date의 날짜 (YYYY-MM-DD) (datestamp의 약자) | 2026-04-13 |
{{ ds_nodash }} | 하이픈이 없는 날짜 | 20260413 |
{{ run_id }} | 현재 DAG Run의 고유 ID | scheduled__2026-04-13... |
{{ task_instance.task_id }} | 현재 실행 중인 태스크 이름 | generate_report |
{{ params.my_param }} | 사용자가 직접 정의한 파라미터 | (사용자 지정값) |
{{ prev_ds }} | 이전 DAG 실행 날짜 | |
| `Airflow config object: {{conf}}` | conf 객체를 사용해 코드 안에서 현재 Airflow 설정에 접근 가능 |
.sh 파일 실행 시 끝에 공백 추가하여 템플릿 엔진 오작동 방지{{ ds }}, {{ ds_nodash }} → 파이썬 datetime 객체가 아니라 문자열이다.Airflow template에서 유용한 객체나 메서드에 대한 reference를 제공
macros.datetime ← 파이썬의 datetime.datetime 객체macros.timedelta ← timedelta 객체 참조macros.uuid ← python의 uuid 객체와 동일macros.ds_add 와 같은 추가 함수도 존재 ← 템플릿 안에서 날짜 계산 간단히 할 수 있도록 도와준다{{ macros.ds_add('2020-05-15', 5) }} : 날짜에 일 수 더하기python의
uuid객체란?
Universally Unique Identifier(범용 고유 식별자)의 약자로, 네트워크 상에서 서로 다른 시스템들이 독립적으로 식별자를 생성하더라도 중복될 확률이 거의 없도록 설계된 128비트 길이의 숫자이다.DB의 기본키, 세션 ID, file name 등 절대 중복되면 안 되는 고유값이 필요할 때 사용한다!
| 버전 | 생성 방식 | 특징 |
|---|---|---|
| UUID1 | 호스트 ID(MAC 주소) + 현재 시간 | 생성 시간과 위치를 알 수 있지만, 개인정보(MAC) 노출 위험이 있음. |
| UUID3 | 네임스페이스 + 이름 (MD5 해시) | 동일한 입력값에 대해 항상 동일한 UUID를 생성함. |
| UUID4 | 완전 무작위(Random) | 가장 많이 사용됨. 중복 가능성이 극히 낮아 일반적인 고유값 생성에 최적. |
| UUID5 | 네임스페이스 + 이름 (SHA-1 해시) | UUID3과 같지만 보안성이 더 높은 해시 알고리즘 사용. |
my_uuid = uuid.uuid4()templated_command="""
{% for filename in params.filenames %}
echo "Reading {{ filename }}"
{% endfor %}
"""
Jinja 구문에서 for 루프의 끝을 나타내려면 → {% endfor %}
브랜칭 : 조건부 로직 가능토록 한다.
BranchPythonOperator 사용
from airflow.operators.python import BranchPythonOperatorpython_callable을 받는다.
python_callable에서 중괄호를 안 쓰는 이유
BranchPythonOperator의python_callable인자는 문자열이 아니라 파이썬 함수 객체 그 자체를 전달받습니다.
- 동작 방식: 이 인자에는 함수의 "이름"을 넘겨줍니다. Airflow는 이 함수를 나중에 직접 호출(Call)합니다.
- 중괄호를 쓰지 않는 이유:
- 함수는 문자열이 아님:{{ }}는 문자열 내부의 텍스트를 바꿀 때 쓰는 문법입니다. 함수 객체 자체에는 적용되지 않습니다.
- 런타임 실행: 함수 내부에서 날짜 같은 정보가 필요하다면, Airflow는 함수를 호출할 때context라는 딕셔너리에 모든 정보를 담아 보내줍니다. 함수 안에서 직접 꺼내 쓰면 되기 때문에 굳이 중괄호로 치환할 필요가 없습니다.
provide_context의 주요 역할
Airflow는 함수를 호출할 때 Context라는 거대한 딕셔너리를 인자로 전달한다.
- 날짜 정보 :
ds,logical_date,execution_date- 객체 정보 :
dag,task- task 간 통신 :
ti,task_instance(Xcom을 사용해 다른 task의 데이터를 가져올 때 필수)
def check_weekend(**kwargs):
dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
# If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
if (dt.weekday() < 5):
return 'email_report_task'
else:
return 'no_email_task'
branch_task = BranchPythonOperator(task_id='check_if_weekend',
python_callable=check_weekend,
provide_context=True,
dag=dag)
DAG 및 task 실행
airflow tasks test <dag_id> <task_id> <date>airflow dags trigger -e <date> <dag_id> → 지정한 날짜에 전체 DAG가 실행하는 것처럼 동작한다.Operator 요약
bash_command 필요python_callable 필요BranchPythonOperator → python_callable, provide_context=True 필요, 호출 함수는 **kwargs를 받아야 함.filepath 인자 필요merged_df (데이터프레임)Month(월) 컬럼을 새로 추가합니다. OWeekly_Sales)이 $10,000를 초과하는 행만 유지합니다. O"Store_ID""Month""Dept""IsHoliday""Weekly_Sales""CPI""Unemployment""clean_data라는 변수에 저장되어야 합니다.clean_data (위에서 정제된 데이터프레임)Month와 Weekly_Sales 컬럼만 선택합니다.groupby(): "Month" 컬럼을 기준으로 그룹화agg(): 평균 매출 계산reset_index(): 인덱스를 새로 재설정round(): 결과를 소수점 둘째 자리까지 반올림clean_data), 집계된 데이터프레임(agg_data), 그리고 각각의 저장 경로clean_data.csv와 agg_data.csv 파일로 저장합니다.load() 함수를 통해 생성된 두 개의 CSV 파일이 현재 작업 디렉토리에 실제로 존재하는지 확인합니다.참고 사항:
grocery_sales라는 이름의 Pandas 데이터프레임으로 저장되며, 이를 바로 Python 코드에서 사용할 수 있습니다.def avg_weekly_sales_per_month(clean_data):
df = clean_data.groupby(by="Month").agg('mean').reset_index().round(2)
return df
error → agg(mean)이라고 작성했는데 agg('mean')이라고 작성해야 한다
agg(mean)→ 파이썬은mean이라는 이름을 가진 변수나 객체를 찾으려고 하는데, 이전에 정의해두지 않았으면 이를 찾지 못하고NameError가 발생시킨다.
os.path.exists → csv 파일 validation 확인
정규표현식 r'(\d+\.?\d*)'
\d : 숫자+ : 하나 이상 반복됨[0-9] : \d와 같은 의미\.? : 마침표 + 0개 또는 1개 → 소수점이 있을수도 있고 없을 수도 있고(?)\d* : 숫자 + 0개 이상 → 소수점 뒤에 숫자가 붙을 수도 있고 없을 수도 있다정규표현식에서 특정 문자열 뒤의 숫자 찾으려면?
→ 캡쳐 그룹 사용 : 특정문자열(\d+\.?\d*)