230605 - 데이터 파이프라인과 Airflow 소개

김지석·2023년 6월 6일
0

ETL 소개

데이터의 흐름과 데이터 팀의 발전 단계

  • 온라인 서비스에서 직접 생기는 데이터와 써드파티를 통해 생기는 간접 데이터
    -> 1. 데이터 인프라(데이터 웨어하우스)에 저장
    -> 2. 이 데이터로 분석 및 시각화
    -> 3. 데이터 과학을 적용하여 사용자 경험 개선(추천, 검색 등의 개인화)

데이터 웨어하우스의 구성 예

데이터 파이프라인

ETL 용어설명

  • ETL : Extract, Transform, Load
  • Data Pipeline, ETL, Data Workflow, DAG
    • ETL (Extract, Transform, and Load)
    • Called DAG (Directed Acyclic Graph) in Airflow
      • 한방향으로만 간다 / 각 task 간에는 루프가 존재하지 않음.

ELT

  • ETL vs ELT
    • ETL: 데이터를 데이터 웨어하우스 외부에서 내부로 가져오는 프로세스
      • 보통 데이터 엔지니어들이 수행함 (요청에 의해 수행)
    • ELT: 데이터 웨어하우스 내부 데이터를 조작해서 (보통은 좀더 추상화되고 요약된) 새로운 데이터를 만드는 프로세스
      • 보통 데이터 분석가들이 많이 수행
      • 이 경우 데이터 레이크 위에서 이런 작업들이 벌어지기도 함
      • 이런 프로세스 전용 기술들이 있으며 dbt가 가장 유명: Analytics Engineering
        • dbt: Data Build Tool

Data Lake vs. Data Warehouse

  • 데이터 레이크 (Data Lake)
    • 구조화 데이터 + 비구조화 데이터
    • 보존 기한이 없는 모든 데이터를 원래 형태대로 보존하는 스토리지에 가까움
    • 보통은 데이터 웨어하우스보다 몇배는 더 큰 스토리지
  • 데이터 웨어하우스 (Data Warehouse)
    • 보존 기한이 있는 구조화된 데이터를 저장하고 처리하는 스토리지
    • 보통 BI 툴들(룩커, 태블로, 수퍼셋, …)은 데이터 웨어하우스를 백엔드로 사용함

Data Lake & ELT

  • Data Sources -> Data Lake = ETL
  • Data Lake -> Data Transforms = ELT
  • 이 때 다양한 데이터 파이프라인의 스케줄러와 관리 툴이 필요 -> Airflow

Data Pipeline의 정의

  • 데이터를 소스로부터 목적지로 복사하는(옮기는) 작업
    • 이 작업은 보통 코딩 (파이썬 혹은 스칼라) 혹은 SQL을 통해 이뤄짐
    • 대부분의 경우 목적지는 데이터 웨어하우스가 됨(외부 시스템일 수도 있음)
  • 데이터 소스의 예:
    • Click stream, call data, ads performance data, transactions, sensor data, metadata, …
    • More concrete examples: production databases, log files, API, stream data (Kafka topic)
  • 데이터 목적지의 예:
    • 데이터 웨어하우스, 캐시 시스템 (Redis, Memcache), 프로덕션 데이터베이스, NoSQL, S3, ...

데이터 파이프라인의 종류

Raw Data ETL Jobs (데이터 엔지니어의 업무)

  1. 외부(회사 밖)와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통하게 됨)
  2. 적당한 데이터 포맷 변환 후 (데이터의 크기가 커지면 Spark등이 필요해짐)
  3. 데이터 웨어하우스 로드

Summary/Report Jobs (데이터 분석가의 업무)

  1. DW(혹은 DL)로부터 데이터를 읽어 다시 DW에 쓰는 ETL
  2. Raw Data를 읽어서 일종의 리포트 형태나 써머리 형태의 테이블을 다시
    만드는 용도
  3. 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
  • 요약 테이블의 경우 SQL (CTAS를 통해)만으로 만들고 이는 데이터 분석가가 하는
    것이 맞음. 데이터 엔지니어 관점에서는 어떻게 데이터 분석가들이 편하게 할 수
    있는 환경을 만들어 주느냐가 관건
    -> Analytics Engineer (DBT)

Production Data Jobs

  1. DW로부터 데이터를 읽어 다른 Storage(많은 경우 프로덕션 환경)로 쓰는 ETL
    a. 써머리 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
    b. 혹은 머신러닝 모델에서 필요한 피쳐들을 미리 계산해두는 경우
  2. 이 경우 흔한 타켓(외부) 스토리지:
    a. Cassandra/HBase/DynamoDB와 같은 NoSQL
    b. MySQL과 같은 관계형 데이터베이스 (OLTP)
    c. Redis/Memcache와 같은 캐시
    d. ElasticSearch와 같은 검색엔진

데이터 파이프라인을 만들 때 고려할 점

이상과 현실간의 괴리

  • 이상 혹은 환상
    • 내가 만든 데이터 파이프라인은 문제 없이 동작할 것이다
    • 내가 만든 데이터 파이프라인을 관리하는 것은 어렵지 않을 것이다
  • 현실 혹은 실상
    • 데이터 파이프라인은 많은 이유로 실패함
      • 버그 :)
      • 데이터 소스상의 이슈: What if data sources are not available or change its data format
      • 내가 컨트롤 할 수 없는 데이터
      • 데이터 파이프라인들간의 의존도에 이해도 부족
    • 데이터 파이프라인의 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남
      • 데이터 소스간의 의존도가 생기면서 이는 더 복잡해짐. 만일 마케팅 채널 정보가 업데이트가 안된다면 마케팅 관련 다른 모든 정보들이 갱신되지 않음
      • More tables needs to be managed (source of truth, search cost, …)

Best Practices

  1. 가능하면 데이터가 작을 경우 매번 통채로 복사해서 테이블을 만들기 (Full
    Refresh)
  • Full Refresh의 소요시간이 오래 걸린다면 사용하기 힘듬
  • Incremental update(새로 생긴 데이터만 적재)만이 가능하다면, 대상 데이터소스가 갖춰야할 몇 가지 조건이 있음
    • 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요:
      • created (데이터 업데이트 관점에서 필요하지는 않음)
      • modified
      • deleted
    • 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야함
  1. 멱등성(Idempotency)을 보장하는 것이 중요
  • 멱등성이란?
    • 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 말아야함
      • 실패할거면 깔끔하게 실패해야함.
      • 예를 들면 중복 데이터가 생기지 말아야함
    • 중요한 포인트는 critical point들이 모두 one atomic action으로 실행이 되어야 한다는 점
      • SQL의 transaction이 꼭 필요한 기술
  1. 실패한 데이터 파이프라인을 재실행(Backfill)이 쉬워야함
  • 과거 데이터를 다시 채우는 과정(Backfill)이 쉬워야 함
  • Airflow는 이 부분(특히 backfill)에 강점을 갖고 있음
  1. 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
  • 비지니스 오너 명시: 누가 이 데이터를 요청했는지를 기록으로 남길 것!
  • 나중에 데이터 카탈로그(데이터의 데이터)로 들어가 데이터 디스커버리에 사용 가능
    • 데이터 리니지가 중요해짐 -> 이걸 이해하지 못하면 온갖 종류의 사고 발생
  1. 주기적으로 쓸모없는 데이터들을 삭제
  • Kill unused tables and data pipelines proactively
  • Retain only necessary data in DW and move past data to DL (or storage)
  • 먼저 테이블들을 살피고 삭제를 진행
  1. 데이터 파이프라인 사고시 마다 사고 리포트(post-mortem) 쓰기
  • 목적은 동일한 혹은 아주 비슷한 사고가 또 발생하는 것을 막기 위함
  • 사고 원인(root-cause)을 이해하고 이를 방지하기 위한 액션 아이템들의 실행이 중요해짐
  • 기술 부채의 정도를 이야기해주는 바로미터
  1. 중요한 데이터 파이프라인이라면 입력과 출력을 체크하기
  • 아주 간단하게 입력 레코드의 수와 출력 레코드의 수가 몇개인지 체크하는 것부터 시작
  • 써머리 테이블을 만들어내고 Primary key가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
  • 중복 레코드 체크
  • 데이터 대상 유닛 테스트

간단한 ETL 작성해보기

ETL, 데이터 파이프라인

  • Airflow에서는 DAG라고 불림
  • Extract:
    • 데이터를 데이터 소스에서 읽어내는(추출하는) 과정. 보통 API 호출
    • 이번 실습은 csv를 바로 호출할 예정
  • Transform:
    • 필요하다면 그 원본 데이터의 포맷을 원하는 형태로 변경시키는 과정. 굳이 변환할 필요는 없음(생략 가능)
  • Load:
    • 최종적으로 Data Warehouse에 테이블로 집어넣는 과정
  • 각각이 하나의 task가 될 수도 있으며 ETL 전체가 하나의 task가 될 수도 있음.

실습 ETL 개요

  • 웹상(S3)에 존재하는 이름 성별 CSV 파일을 Redshift에 있는 테이블로 복사
  • Python으로 Google Colab에서 작성

Redshift에 테이블 생성

  • 웹상에 존재하는 이름 성별 CSV 파일을 Redshift에 있는 테이블로 복사

1. 각자에게 할당된 schema밑에 아래 테이블을 생성

CREATE TABLE (스키마 이름).name_gender (
 name varchar(32) primary key, -- D/W는 primary key uniqueness를 보장하지 않음.
 gender varchar(8)
);

  • 최종적으로 멱등성까지 보장해줘야 함.

2. 데이터 소스 다운

  • name & gender 두가지 필드가 존재하는 csv파일

3. 파이썬으로 Colab에서 작성: 세 개의 함수로 구성

  • extract, transform, load
  • 3개의 함수를 각각 별개의 태스크로 구성할 수도 있고 하나의 태스크 안에서 3개의 함수를 모두 호출하게 구성도 가능

Airflow 소개

Airflow 소개

  • Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임웍
    • Airbnb에서 시작한 아파치 오픈소스 프로젝트
    • 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임웍
  • 데이터 파이프라인 스케줄링 지원
    • 정해진 시간에 ETL 실행 혹은 한 ETL의 실행이 끝나면 다음 ETL 실행
    • 웹 UI를 제공하기도 함
  • 데이터 파이프라인(ETL)을 쉽게 만들 수 있도록 해줌
    • 다양한 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공
      https://airflow.apache.org/docs/
    • 데이터 파이프라인 관리 관련 다양한 기능을 제공해줌: 특히 Backfill
  • Airflow에서는 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
    • 하나의 DAG는 하나 이상의 태스크로 구성됨
    • 실행 순서에 맞춰서 태스크 별로 스케줄링을 해야함
  • 2020년 12월에 Airflow 2.0이 릴리스됨
  • Airflow 버전 선택 방법: 큰 회사에서 사용하는 버전이 무엇인지 확인.

Airflow 구성

Airflow - 총 5개의 컴포넌트로 구성

  • 웹 서버 (Web Server) - Python Flask로 구현
  • 스케줄러 (Scheduler) - 데이터 파이프라인 전반적인 실행을 위해 필요한 모듈 (마스터)
  • 워커 (Worker) - task 실행에 해당하는 모듈 (일꾼)
  • 메타 데이터 데이터베이스
    • Sqlite가 기본으로 설치됨 - MySQL, Postgresql로 사용하는 것이 일반적
  • 큐 (다수서버 구성인 경우에만 사용됨)
    • 이 경우 Executor가 달라짐

Airflow 구성

  • 스케줄러는 DAG들을 워커들에게 배정하는 역할을 수행
    • DAG는 Task의 집합이라고 생각하면 됨.
  • 웹 UI는 스케줄러와 DAG의 실행 상황을 시각화해줌
  • 워커는 실제로 DAG를 구성하는 Task들을 실행하는 역할을 수행
  • 스케줄러와 각 DAG의 실행결과는 별도 DB에 저장됨
    • 기본으로 설치되는 DB는 SQLite
    • 실제 프로덕션에서는 MySQL이나 Postgres를 사용해야함

Airflow 구조: 서버 한대

  • 서버 한대로는 돌아가기 힘들 수도 있기 때문에 Worker를 별도의 서버에 세팅하고 Worker의 서버 수를 늘린다.(스케일링한다.)

Airflow 스케일링 방법

  • Scale Up (더 좋은 사양의 서버 사용) - 언젠가는 한계에 부딪힐 것이다.
    • 이를 선호한다. (비용이 덜 듬)
  • Scale Out (서버 추가) - 워커의 서버 수를 늘린다.
  • 위는 보통 빅데이터 시스템에서 사용하는 용어들이다.

Airflow 구조: 다수 서버

  • Executor의 종류에 따라 Queue를 사용하지 않을 수도 있음.

전반적인 Airflow 구조

  • 여러종류의 Executor들
    • Sequential Executor (default, 제약이 많기에 선호하진 않음)
    • Local Executor
    • Celery Executor
    • Kubernetes Executor
    • CeleryKubernetes Executor
    • Dask Executor

Airflow 개발의 장단점

  • 장점
    • 데이터 파이프라인을 세밀하게 제어 가능
    • 다양한 데이터 소스와 데이터 웨어하우스를 지원
    • 백필(Backfill)이 쉬움
  • 단점
    • 배우기가 쉽지 않음
    • 상대적으로 개발환경을 구성하기가 쉽지 않음
    • 직접 운영이 쉽지 않음. 클라우드 버전 사용이 선호됨
      • GCP provides “Cloud Composer”
      • AWS provides “Managed Workflows for Apache Airflow”
      • Azure provides “Data Factory Managed Airflow”

DAG

  • Directed Acyclic Graph의 줄임말
  • Airflow에서 ETL을 부르는 명칭
  • DAG는 태스크의 집합
    • 예를 3개의 태스크로 구성된다면 Extract, Transform, Load로 구성
  • 태스크란? - Airflow의 오퍼레이터(Operator)로 만들어짐
    • Airflow에서 이미 다양한 종류의 오퍼레이터를 제공함
    • 경우에 맞게 사용할 오퍼레이터를 결정하거나 필요하다면 직접 개발
    • Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell scrip
  • Airflow 코딩을 한다
    -> DAG 인스턴스 생성
    -> DAG에 대한 정보 지정
    -> DAG를 구성하는 task가 무엇인지 오퍼레이터 형태로 구현

DAG의 구성 예제

  • 3개의 Task로 구성된 DAG.
  • 먼저 t1이 실행되고 t2, t3의 순으로 일렬로 실행
  • 3개의 Task로 구성된 DAG.
  • 먼저 t1이 실행되고 여기서 t2와 t3로 분기
profile
초짜에요...

0개의 댓글