Airflow 1.xx.xx 구축 방법

dnchoi·2022년 10월 17일
0

k8s

목록 보기
1/1

Local Envs

  • Ubuntu 18.04
  • Anaconda3
  • Python 3.8
  • Airflow 1.10.5
  • Docker
  • MySQL
  • Radis
  1. Airflow install

    1. pip install apache-airflow=1.10.5
    2. pip install 'apache-airflow[mysql, redis]==1.10.5'
  2. MySQL, Redis Docker container

    1. docker pull mysql:8.0
    2. docker pull redis:5.0
    3. docker run -d --name mysql -p 3306:3306 -v /data/mysql:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=root mysql:8.0
    4. docker run -d --name redis -p 6379:6379 -v /data/redis:/data -e REDIS_PASSWORD=airflow redis:5.0
  3. MySQL setting

    1. mysql -h192.168.1.131 -uroot -proot
      192.168.1.131 → user ip address
    2. mysql > create user 'airflow'@'%' identified by 'airflow'
      mysql > grant all privileges on . to 'airflow'@'%'
      mysql > flush privileges
      mysql > create database airflow
  4. Airflow config setting [ /home/user/airflow/airflow.cfg ]

    executor = LocalExecutor
    sql_alchemy_conn = mysql+pymysql://airflow:airflow@xx.xx.xx.xx:3306/airflow 
    catchup_by_default = False
    broker_url = redis://airflow@xx.xx.xx.xx:6379
    result_backend = db+mysql://airflow:airflow@xx.xx.xx.xx:3306/airflow
    load_examples = False
    
    # xx.xx.xx.xx -> MySQL IP
  1. Airflow start

    1. airflow initdb
    2. airflow webserver
    3. airflow scheduler
    4. airflow worker
    5. airflow flower
  2. Create DAGs

    • DAGs?
      • Airflow에서 DAG는 Scheduler가 실제 queue를 잡고 실행하는 Python Script
        DAG는 Bash or Python Operator를 통해 특정 행위를 수행하게 됨
    1. BashOperator

      1. Create new text file > Printed text file > Remove text file
      from airflow import models
      from airflow.operators.bash_operator import BashOperator
      from datetime import datetime, timedelta
       
      default_args = {
              'owner': 'airflow',
              'depends_on_past': False,
              'start_date': datetime(2021, 9, 7),
              'retries': 1,
              'retry_delay': timedelta(minutes=5)}
       
      with models.DAG(
              dag_id='echo_test', description='echo_test',
              schedule_interval=None,
              max_active_runs=5,
              concurrency=10,
              default_args=default_args) as dag:
       
          text_file_path = '/root/airflow/dags'
       
          create_text_file_command = f'cd {text_file_path} && echo hello airflow > test.txt'
          create_text_file = BashOperator(
                  task_id='create_text_file',
                  bash_command=create_text_file_command,
                  dag=dag)
       
          read_text_file_command = f'cd {text_file_path} && cat test.txt'
          read_text_file = BashOperator(
                  task_id='cat_text_file',
                  bash_command=read_text_file_command,
                  dag=dag)
       
          remove_text_file_command = f'cd {text_file_path} && rm test.txt'
          remove_text_file = BashOperator(
                  task_id='remove_text_file',
                  bash_command=remove_text_file_command,
                  dag=dag)
       
          create_text_file >> read_text_file >> remove_text_file

    Schedule interval

    • schedule_interval=timedelta(1) 파이썬 datetime library로 표현 가능. 인터벌이 하루라는 뜻
    • schedule_interval=Noneschedule_interval="@once" 이런식으로 작성 가능
    • schedule_interval='*/1 * * * *' cron 형태로 작성 가능

    Airflow Operator

    각 Airflow DAG는 여러 Task로 이루어져있습니다. operator나 sensor가 하나의 Task로 만들어집니다. Airflow는 기본적인 Task를 위해 다양한 operator를 제공합니다.

    • BashOperator : bash command를 실행
    • PythonOperator : Python 함수를 실행
    • EmailOperator : Email을 발송
    • MySqlOperator : sql 쿼리를 수행
    • Sensor : 시간, 파일, db row, 등등을 기다리는 센서

    Airflow에서 기본으로 제공하는 operator 외에도 커뮤니티에서 만든 수많은 operator들이 Data Engineer의 작업을 편하게 만들어 주고 있습니다.

profile
꿈꾸는 사람

0개의 댓글