[TIL] 데이터 파이프라인, Airflow (4)

이원진·2023년 6월 8일
0

데브코스

목록 보기
44/54
post-thumbnail
post-custom-banner

학습내용


  1. Open Weathermap DAG 구현하기

  2. Primary Key Uniqueness 보장하기

  3. Backfill과 Airflow

1. Open Weathermap DAG 구현하기


  • Open Weathermap API

    • 위도 / 경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스

    • 무료 계정으로 API Key 받아서 호출 시 사용

    • 결과 JSON에서 daily라는 필드에 앞으로 8일 간의 날씨 정보가 들어있음

      • daily는 리스트이며, 각 레코드가 하나의 날짜에 해당

      • 날짜 정보는 dt라는 필드에 들어있으며, 아래와 같이 읽을 수 있는 날짜로 변경

        • datetime.fromtimestamp(d[”dt”].strftime(”%Y-%m-%d”))

      • temp 필드가 온도 정보를 나타냄


  • 제작하려는 DAG: 8일 간 서울의 낮 / 최소 / 최대 온도 읽기

    • API Key를 open_weather_api_key라는 Airflow Variable로 저장

    • 서울의 위도 / 경도 찾기

    • One-Call Api 사용

      • API Key와 서울의 위도 / 경도 정보를 사용해 위의 API를 requests 모듈로 호출

      • 응답 결과에서 오늘로부터 7일 간 온도 정보(평균 / 최대 / 최소)만 출력


  • DAG 구현

    • weather_forecast 테이블 생성

      CREATE TABLE weather_forecast(
              date DATE PRIMARY KEY,
              temp FLOAT,
              min_temp FLOAT,
              max_temp FLOAT,
              created_date TIMESTAMP DEFAULT GETDATE()
      );
      • created_date는 자동으로 채워지는 필드

    • One-Call API는 결과를 JSON 형태로 반환

      • 이를 읽기 위해 requests.get 결과의 text를 JSON으로 변환해야 함

      • 혹은, requests.get 결과 오브젝트가 제공하는 .json() 함수 사용

        • f_js = requests.get(link).json()

    @task
    def etl(schema, table):
            api_key = Variable.get("open_weather_api_key")
    
            # 서울의 위도, 경도
            lat = 37.5665
            lon = 126.9780
    
            url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={api_key}&units=metric"
            response = requests.get(url)
            data = json.loads(response.text)
    
            for d in data["daily"]:
                    day = datetime.fromtimestamp(d[”dt”].strftime(%Y-%m-%d”))
                    ret.append("('{}', {}, {}, {})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
    
            cur = get_DB_connection()
            drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
                    CREATE TABLE {schema}.{table} (
                            date date,
                            temp float,
                            min_temp float,
                            max_temp float,
                            created_date timestamp default GETDATE()
            );"""
    
            insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
            logging.info(drop_recreate_sql)
            logging.info(insert_sql)
    
            try:
                    cur.execute(drop_recreate_sql)
                    cur.execute(insert_sql)
                    cur.execute("COMMIT;")
    
            except Exception as e:
                    cur.execute("ROLLBACK;")
                    raise
    with DAG(
            dag_id = "Weather_to_Redshift",
            start_date = datetime(2023, 5, 30),
            schedule = "0 2 * * *",
            max_active_runs = 1,
            catchup = False,
            default_args = {
                    "retries": 1,
                    "retry_delay": timedelta(minutes = 3)
            }
    ) as dag:
    
            etl("schema_name", "weather_forecast")

2. Primary Key Uniqueness 보장하기


  • Primary Key Uniqueness

    • 관계형 DB 시스템이 PK값의 중복을 막아주는 것

    • 빅데이터 기반 DW들은 PK Uniqueness를 보장하지 않음

      • 보장하는데 메모리와 시간이 더 소모되므로 대용량 데이터 적재가 어려워지기 때문

      • 데이터 인력이 이를 보장하기 위한 작업을 해야 함

  • PK Uniqueness 보장 방법

    • 앞서 생성한 weather_forecast 테이블을 예로 설명

      CREATE TABLE weather_forecast(
              date DATE PRIMARY KEY,
              temp FLOAT,
              min_temp FLOAT,
              max_temp FLOAT,
              created_date TIMESTAMP DEFAULT GETDATE()
      );
      • PK인 date가 같은 레코드가 있다면, created_date를 기준으로 더 최근 정보 선택

    • PK가 중복될 경우, PK를 기준으로 Partition한 뒤 ROW_NUMBER를 사용

      • ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq

    • 임시 테이블(스테이징 테이블) 사용

      1. 임시 테이블을 만들고 현재 모든 레코드 복사

        • CREATE TEMP TABLE t AS SELECT * FROM weather_forecast;

      2. 임시 테이블에 데이터 소스에서 새롭게 읽은 레코드 복사

      3. 중복 제거

      4. 원본 테이블에서 레코드 삭제 후 임시 테이블 복사

        DELETE FROM weather_forecast;
        
        SELECT date, temp, min_temp, max_temp 
        FROM (
                SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
                FROM t
        )
        WHERE seq = 1;

  • Upsert란?

    • (PK를 기준으로) 존재하는 레코드라면 새 정보로 수정, 존재하지 않는 레코드라면 새 레코드 적재

    • 보통 DW에서 효율적인 Upsert 문법을 지원


3. Backfill과 Airflow


  • Incremental Update는 효율성은 더 좋을 수 있지만, 실수 등으로 인해 데이터가 누락될 수 있어 운영과 유지보수의 난이도가 올라감

    • 실패한 데이터 파이프라인을 재실행하거나, 읽어온 데이터에 문제가 있어 다시 모두 읽어오는 Backfill 기능이 중요

    • Full Refresh는 그냥 다시 실행하면 되지만, Incremental Update에서는 Backfill이 복잡하기 때문에 가능하면 Full Refresh를 사용하는 것이 좋음

  • Airflow의 Backfill

    • Airflow는 강력한 Backfill 기능을 제공

    • 접근 방식

      • ETL별로 실행 날짜와 결과를 메타데이터 DB에 저장

      • 모든 DAG에 지정되어있는 execution_date를 바탕으로 데이터를 갱신하도록 코드 작성

        • execution_date로 채워야하는 날짜와 시간이 전달됨

    • 변수

      • start_date: DAG가 처음 읽을 데이터의 날짜 / 시간

      • execution_date: DAG가 읽어와야하는 데이터의 날짜 / 시간

      • catchup: start_date보다 이후에 DAG를 활성화한 경우, 그 사이에 실행되지 않아 누락된 데이터를 처리할 방법을 결정

        • True(default): 누락된 데이터를 모두 채움

        • False: 누락된 데이터 무시

      • end_date: 보통은 필요하지 않으며, Backfill을 날짜 범위에 대해 수행하는 경우에만 필요

    • 비용이 비싼 쿼리가 Airflow에 의해 스케줄링될 경우, 의도치 않게 여러 번 수행되어 많은 비용이 발생할 수 있으니 주의


메모


  • Airflow와 타임존

    • airflow.cfg에는 default_timezone, default_ui_timezone 두 개의 타임존 관련 키가 존재

    • start_date, end_date, schedule: default_timezone에 지정된 타임존을 따름

    • execution_date, 로그 시간: 항상 UTC를 따름

      • 즉, execution_date를 사용할 때는 타임존을 고려해서 변환 후 사용

  • DAGS 폴더에서 코드 작성 시 주의할 점

    • airflow는 DAGS 폴더를 주기적으로(5분마다) 스캔함

    • 이때 DAG 모듈이 들어있는 모든 파일의 메인 함수가 실행됨

      • 개발 중인 테스트 코드도 의도치 않게 실행될 수 있음

post-custom-banner

0개의 댓글