😵💫😵💫 정리 안 하면 절대 까먹을 것 같아서 실습하면서 정리...
AWS EC2의 가상환경에서 진행해기 위해 간단한 설정을 해준다.
sudo apt-get install virtualenv
virtualenv airflow
cd airflow
source ./bin/activate
deactivate
sudo apt-get install python3.11
sudo apt install python3-pip
export AIRFLOW_HOME=~/airflow
export AIRFLOW_VERSION=2.7.3
export PYTHON_VERSION=3.11
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
airflow db init
+수정!!! 지금은 이렇게 하면 안 되고
airflow db migrate
이렇게 해주자.
airflow db migrate is giving TypeError: SqlAlchemySessionInterface.init() missing 6 required positional
이런 오류가 나면
pip3 install Flask-Session==0.5.0
를 하고 다시 airflow db migrate 명령어를 실행하면 잘 작동한다!!
airflow users create \
--username admin \
--firstname myeongeun \
--lastname noh \
--role Admin \
--email nme0529@khu.ac.kr
airflow webserver --port 8080
airflow 웹서버를 실행시킨다.
airflow.cfg 파일의 dags_folder에서 DAG를 관리하는 디렉토리를 설정할 수 있다.
Airflow 공식 문서 에 나와있는 예제 코드 " airflow/example_dags/tutorial.py "를 실행해보자.
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
: DAG를 구성할 때 필요한 default arguments를 지정해준다.
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False, # True면 이전 인스턴스에 sequentially하게 실행
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue', >> job을 실행할 큐 지정
# 'pool': 'backfill', >> 태스크가 실행될 slot pool
# 'priority_weight': 10, >> 우선순위
# 'end_date': datetime(2016, 1, 1), >> 이후로는 실행x
# 'wait_for_downstream': False, >> 이전 태스크 기다림
# 'sla': timedelta(hours=2), >> job 성공 기대 시간
# 'execution_timeout': timedelta(seconds=300), >> 최대 실행 허용 시간
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
자세한 설명은 Airflow 공식 문서 : Base Operator Arguments 를 찾아보자
: DAG 객체를 생성해주고 다음과 같은 내용들을 명시해준다.
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this