Local Envs
Airflow install
MySQL, Redis Docker container
MySQL setting
Airflow config setting [ /home/user/airflow/airflow.cfg ]
executor = LocalExecutor
sql_alchemy_conn = mysql+pymysql://airflow:airflow@xx.xx.xx.xx:3306/airflow
catchup_by_default = False
broker_url = redis://airflow@xx.xx.xx.xx:6379
result_backend = db+mysql://airflow:airflow@xx.xx.xx.xx:3306/airflow
load_examples = False
# xx.xx.xx.xx -> MySQL IP
Airflow start
Create DAGs
BashOperator
from airflow import models
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 9, 7),
'retries': 1,
'retry_delay': timedelta(minutes=5)}
with models.DAG(
dag_id='echo_test', description='echo_test',
schedule_interval=None,
max_active_runs=5,
concurrency=10,
default_args=default_args) as dag:
text_file_path = '/root/airflow/dags'
create_text_file_command = f'cd {text_file_path} && echo hello airflow > test.txt'
create_text_file = BashOperator(
task_id='create_text_file',
bash_command=create_text_file_command,
dag=dag)
read_text_file_command = f'cd {text_file_path} && cat test.txt'
read_text_file = BashOperator(
task_id='cat_text_file',
bash_command=read_text_file_command,
dag=dag)
remove_text_file_command = f'cd {text_file_path} && rm test.txt'
remove_text_file = BashOperator(
task_id='remove_text_file',
bash_command=remove_text_file_command,
dag=dag)
create_text_file >> read_text_file >> remove_text_file
schedule_interval=timedelta(1)
파이썬 datetime library로 표현 가능. 인터벌이 하루라는 뜻schedule_interval=None
, schedule_interval="@once"
이런식으로 작성 가능schedule_interval='*/1 * * * *'
cron 형태로 작성 가능각 Airflow DAG는 여러 Task로 이루어져있습니다. operator나 sensor가 하나의 Task로 만들어집니다. Airflow는 기본적인 Task를 위해 다양한 operator를 제공합니다.
Airflow에서 기본으로 제공하는 operator 외에도 커뮤니티에서 만든 수많은 operator들이 Data Engineer의 작업을 편하게 만들어 주고 있습니다.
Reference
세상에서 가장 간단한 Airflow 튜토리얼
[Airflow] ModuleNotFoundError: No module named 'sqlalchemy.ext.declarative.clsregistry'
Apache Airflow - Workflow 관리 도구(1)
Basic Flask app not running (TypeError: required field "type_ignores" missing from Module)
GitHub - alicek106/start-docker-kubernetes: 시작하세요! 도커, 그리고 쿠버네티스 책에서 사용되는 예제를 정리한 저장소입니다.