금주부터 학습할 내용은 Airflow!
파이프라인 과정을 총괄하는 Airflow를 직접 실습하며 접해보기 전 개념에 대해 알아보고자 한다.
지금까지 ETL이란, 3rd Party로부터 발생하는 간접데이터와 서비스로부터 발생하는 직접데이터를 데이터레이크, (분석용 : 데이터웨어하우스)로 적재하는 과정(데이터 파이프라인, ETL)이며, Airflow에선 해당 용어를 DAG로 칭한다. 현재 ETL에 있어 가장 대표적인 Framework는 Airflow이다.
예시로는, 여러 플랫폼으로부터 발생하는 다양한 데이터 소스를 ETL 작업을 거쳐 DW로 적재한다. 이전 회사 인턴 경험으로 봐선 elink(email), kakao, eletter(직접) 등등 여러 사내 플랫폼과 더불어 Salesforce (판매 데이터) 등등을 DW(Snowflake)에 적재하여 원하는 테이블만 뽑아(ELT) BI Tool을 적용해 대시보드를 구성하거나 인사이트를 도출하는 방식을 사용했다.
위 전반적인 과정을 데이터 파이프라인이라고 생각하면 된다.
즉, 여러 데이터 소스로부터 구조화된 데이터 + 비구조화된 데이터를 데이터레이크로 적재한 후 빅데이터 분산 처리 기법인 Hadoop Spark, Hive, Athena 등을 활용해 scale이 더 작은 Data Warehouse, Data Mart로 적재하는 과정을 의미한다.
-> 프로젝트를 실습해보면서 다양한 플랫폼과 클라우드 환경, 스케줄링 등 파이프라인을 구축하기 위해선 정확한 스케줄링을 기본으로 하기에 Airflow를 활용한다.🎈 Airflow에선 ETL을 DAG라 칭한다는데 DAG란 무엇인지 자세히 살펴보려 한다.
- DAG (Directed Acyclic Graph)
-> 양방향이 아닌 단방향으로만 workflow가 이루어지는 환경을 의미한다.결론적으로 data pipeline은 다음과 같다.
- 데이터를 소스로부터 목적지로 복사하는 작업 (SQL로 대부분 이루어짐)
- 소스 예시는 다음과 같음
- 목적지 예시로는 다음과 같다.
(DW, 캐시시스템(Redis, Memcache, 프로덕션 DB, NoSQL, S3 etc.)🎈 ETL 작업과정은 다음과 같다. (Data Engineer의 role)
1. 외부, 내부 데이터소스로부터 데이터를 읽어옴 (API 등 활용)
2. 원하는 형태로 데이터 포맷 변환 (빅데이터의 경우 Spark 활용)
3. 데이터 웨어하우스 로드🎈 DW 구축 시 Best Practice
1. 가능하면 데이터가 작을 경우 매번 통채로 복사해 테이블 생성
2. Incremental-update만 가능한 경우 data가 갖춰져야 할 조건 존재
-> data source가 프로덕션 DB 테이블인 경우 (created, modified, deleted) 필드 필요
3. 멱등성(Idempotency)보장 중요
-> 동일 입력 데이터로 data pipeline을 다수 실행해도 최종 테이블 내용 변경 X (중복 X)
-> 원자성 보장 필요 (SQL - Transaction Isolation Level)
4. data pipeline 재실행은 물론 backfill이 쉬워야 함. (Airflow는 backfill에 용이)
5. pipeline 내 입출력 체크 및 명확화 및 문서화 필요 (data lineage 중요)
6. 주기적으로 불필요한 데이터 모니터링하여 삭제
7. urgency한 상황 발생 시 post-mortem(사고-리포트) 작성 필요
redshift에 연결해 ETL 처리하는 코드 - (Practice)
Redshift에 연결된 테이블 활용해 간단한 ETL 처리
파이썬으로 작성된 ETL Framework로 에어비앤비에서 시작한 Apache 오픈소스 프로젝트
데이터 파이프라인 스케줄링 지원 (정해진 시간에 ETL 순차 실행, 웹 UI 제공 등)
-> ETL 쉽게 생성 가능 (DAG로 부르며 하나의 DAG는 하나 이상의 태스크로 구성)
태스크란, Airflow의 Operator로 생성된다.
ex) Redshift Writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script🎈 Airflow 구성
- 총 5개의 component로 구성
- web server (python Flask로 구성)
- Scheduler
- Worker (태스크에 해당하는 코드 실행하는 부분)
- meta Data DB (Sqlite 기본)
- Queue (다수의 서버 구성인 경우에만 사용되며 이 경우 Executor가 달라짐)
- Scheduler는 DAG들을 워커에 배정하는 역할을 수행
- 웹 UI는 Scheduler, DAG 실행 상황 시각화
- Worker는 실제로 DAG 실행하는 부분
- Scheduler, DAG 실행한 결과는 별도 DB에 저장 (실제 프로덕션에서는 MySQL, Postgre사용 요망)
💯 장점
1. data pipeline 세밀하게 제어 가능
2. 다양한 data source, DW 지원
3. Backfill 쉬움
💯 단점
1. learning curve가 높고 개발환경 구성이 어려움
2. 직접 운영이 쉽지 않으며 cloud-version이 선호됌.