ETL Pipeline for begginers(Airflow)

skh951225·2023년 4월 9일
0

Data Engineering Course For Begginers (Youtube)

ETL

Extract

  • basically means to download the data or bring it on board
  • Where? API vs FTP
  • Format? Csv, Json, Xml
  • Compressed vs Uncompressed

Transform

  • validation stage
  • what if something goes wrong? -> Garbage in, garbage out
  • Primary key check

LOAD

  • Relational(MySQL,Postgresql...) vs Nonrelational(MongoDB...)
  • Location(Onpremise vs Cloud)
  • ORM(Object Relational Mappers) : SqlAlchemy...

Apache Airflow Tutorial for Beginners

Airflow

Core Concepts

DAG(Directed Acyclic Graph) : 비순환 그래프, Workflow를 나타냄
Task : 작업단위
Operator : Task가 하는 작업의 유형을 나타냄(PythonOperator, BashOperator, ...)

Execution Date : logic date/time which the dag run
Task Instance : Task + Excution Date
Dag Run : DAG + Execution Date

Task Lifecycle

no status(->scheduler) : 아직 scheduler를 거치지 않은 단계

scheduler (->[scheduled,removed,upstream failed, skipped])

  • scheduled(-> executor) : scheduler determined task instance needs to run
  • upstream failed : the task's upstream task failed
  • skipped : task is skipped
  • removed : task have been removed

executor(->[queued])

  • ququed(->worker) : scheduler sent task to executor to run on the queue

worker(->[running])

  • running(->[success,failed,shut down, up for reschedule]) : worker picked up a task and is now running it
    • success : task completed without error
    • up for reschedule : reschedule task every certain time interval
    • failed(->up for retry) : task failed
    • shut down(->up for retry) : task run has been aborted
      • up for retry : task will be scheduled and rerun the task after certain waiting time

Basic Architecture

  • Webserver : 다양한 Web UI 제공, python Flask로 구현됨
  • scheduler : job을 worker에게 전달, worker node가 여러개면 queue에 job을 보냄
  • Woker : job을 받아 DAG를 수행
  • Metadata DB : 실행관련된 여러가지 정보 저장(ex. log), default는 SQLlite, 실제 현업에서는 MySQL, Postgres와 같은 것을 이용

Airflow 설치

출처 : Running Airflow in Docker

# docker-compose.yaml 다운로드
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.2/docker-compose.yaml'

CeleryExecutor 를 LocalExecutor로 변경하고 CeleryExecutor 관련된 라인을 전부 삭제했다. LocalExecutor이기 때문에 redis, flower 삭제

# volume 을 저장할 디렉토리 생성
mkdir -p ./dags ./logs ./plugins
# initialization
docker-compose up airflow-init
# background 로 컨테이너 실행
docker-compose up -d

Airflow tutorial

Operator

BashOperator : bash 명령어 실행하는 operator
PythonOperator : Python code를 실행하는 operator
위의 것 외에도 수많은 operator 존재

Xcoms

  • Xcom을 통해 데이터 공유 가능(최대 48KB)
  • ti.xcom_pull(task_ids,key)
    • task_id에 해당한 task가 push한 데이터중 key값을 가지는 데이터 Return
  • ti.xcom_push(key,value)
    • (key,value) 쌍으로 데이터 push
  • ti : task instance

Task Flow API

  • dag, task 데코레이터를 활용하여 code를 더 간결하게 만들 수 있음
from airflow.decorators import dag, task
... 

@dag (
	  ...
		)
def dag1():
    @task()
    def task1():
    	...
        return value1

    @task()
    def task2():
    	...
        return value2

    @task()
    def task3(value1,value2):
    	...
        return value1 + value2
	value1 = task1()
    value2 = task2()
    print(taks3(value1,value2))
dag1()
  • value1과 value2가 xcom에 push되고 task3가 그 값을 pull함
  • [task1,task2] >> task3 와 같은 task dependency가 자동으로 만들어짐

catchup, backfill

  • catchup은 True/False 값을 가질 수 있으며, default 값은 True
    • catchup=False : start_date이 현재보다 이전일때 가장 최근 것만 실행
    • catchup=True : start_date이 현재보다 이전일때 start_date부터 현재까지 실행됬어야할 것들을 모두 실행
  • backfill은 과거에 어떤 이유로 실행되지 않은 execution을 재실행해줌
    • start_date, end_date을 파라미터로 범위를 지정가능
    • airflow dags backfill -s [start_date] -e [end_date] [dag_id] 명령어로 실행가능
    • docker로 airflow로 실행한 경우에는 scheduler container의 shell에 접속해서 위 명령어를 입력해주면 됨

cron expression

  • dag의 schedule_interval을 '@daily'와 같은 형태로도 가능하지만 cron expression으로도 가능
  • cron expression은 5개의 부분으로 나눠지고 각 부분은 공백으로 구분됨
  • 순서대로 Minute, Hour, Day, month, week 를 나타냄
    -* 은 every 를 뜻함
    • 예를 들어 0 * * * * 는 매 시 정각을 뜻함
  • 각 부분에 여러개의 값을 넣을 수도 있음 이때는 ,로 구분함
    • 예를들어 0,30 * * * * 은 매 시 정각, 30분을 뜻함
      type backfill vs scheduled

Connection

  • airflow는 여러개의 service와 연결되어 있음
  • connection은 이러한 service와 연결을 도와줌
  • connection은 주로 Operator가 많이 사용함(너무 당연한 소리인가?)
  • connection을 사용하게 되면 credential(Host,User,PW)를 코드에서 숨길 수 있음
  • 환경변수로 선언할 수도 있고 Web UI를 통해서도 할 수 있음
  • 환경변수로 선언할때는 보통 AIRFLOW_CONN_[conn_id]: [conn_type]://id:password@host:port 가 일반적임(conn_type에 따라 바뀔 수 있음)
  • docker-compose up -d --no-deps --build airflow-webserver airflow-scheduler 를 수행해 container를 recreate 시켜서 변경사항 적용가능

Install Python package

  • Extending
    • dockerfile 작성해서 requirements.txt 설치
    • 99%의 경우 사용, 빠름, 쉬움
  • Customizing
    • apache/airflow git clone해서 docker-context-files 에 requirements.txt 생성후
    • docker build . --build-arg AIRFLOW_VERSION=.. --tag ... 해서 이미지 생성
    • extending으로 안되는 경우, 이미지 크기 최적화가 가능

Sensor

  • A special type of operator which waits for something to occur
  • tool for use cases in which you don't know exactly when the data will be available

0개의 댓글