[Programmers]실리콘밸리에서 날아온 DE 스타터 키트5️⃣주차

포동동·2022년 10월 26일
0
post-thumbnail

우선, 6주차 강의가 올라간지 한참 지나서 5주차 리뷰를 쓰는 자신을 반성하며 글을 시작합니다...🥲

🈯 복습 및 숙제 해설

4주차 복습

질문 리뷰

  • 대용량 데이터 처리 할 때 airflow worker 노드 안에서 spark 처리? 아니면 airflow에서 emr 구동해서 emr에서 spark 처리?
    • 즉, "airflow 안에서 worker 노드들을 늘려서 대용량 데이터를 처리하도록 하는게 맞는지, 아니면 큰 데이터를 처리할 수 있는 기술 스택을 airflow 밖에 설치하고 airflow에서 조작하는 게 맞나"라는 질문인데 일반적으로 후자. 여기에 kubernetes와 같은 컨테이너 가상환경을 구축해 on demand로 할 수도 있고 spark를 미리 깔아서 그냥 계속 구동될 수 있게 환경을 만들어 둘 수도 있고.
  • Incremental update에서 CDC 구현할 때 timestamp나 version 말고 트랜잭션 로그를 통해 구현한다고 하던데 실제로도?
    • 여기서 얘기하는 incremental update는 DW가 아니라 production db를 말함. 할 수는 있음. 기업이 돈이 많거나 하면 구현 가능. 난이도도 있음.
  • redshift에서 쿼리 실행하기 전에 퍼포먼스를 확인할 수 있는 방법?
    • 쿼리 전에 Explain 을 붙이면 됨. 좀 복잡한 형태로 반환함.
  • incremental update할 때 created, modified, deleted 등의 컬럼이 없으면 어떻게?
    • 못 함.

강조하고 또 강조해도 부족한 start_date, execution_date, backfill

  • 2022/08/10부터 2022/08/14일까지 데이터를 읽어오는 DAG를 실행한다고 예를 들면,
    • 실제로 DAG가 실행되는 건 8/10 데이터가 전부 다 쌓인 8/11
    • DAG를 쓸 때는 start_date는 8/10
    • 실제로 DAG가 실행되는 8/11가져와야 하는 데이터의 날짜로 전달받는 execution_date8/10
    • 만약 다음 날인 8/12에 또 DAG를 시행하면, execution_date8/11
    • 만약 오늘이 8/14인데, 데이터를 뜯어보니 8/10~8/12까지 데이터가 잘못들어갔다. 이 것을 다시 시행하는 게 backfill

try/except 사용시 유의할 점

try :
	cur.execute(<sql>)
    cur.execute("COMMIT.")
except Exception as e :
	cur.execute("ROLLBACK")
    raise
  • 요런 식으로 에러를 발생시키고 fail시키는 게 나음.

autocommit

  • default는 False
  • 이 경우에는 BEGIN해도 영향 없음.


🈯 복습 및 숙제 해설

숙제 해설

  • Full Refresh ver. (etl 함수만)
# 내 버전
def load(**context):
    schema = context["params"]["schema"]
    table = context["params"]["table"]
    
    cur = get_Redshift_connection()
    lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    sql = """CREATE TABLE {schema}.{table} (
                                            date date primary key,
                                            temp float,
                                            min_temp float,
                                            max_temp float,
                                            created_date timestamp default GETDATE()
                                            );
           """.format(schema=schema, table=table)
    sql += "BEGIN; DELETE FROM {schema}.{table};".format(schema=schema, table=table)
    for line in lines:
        if line != "":
            (date, temp, min_temp, max_temp) = (datetime.fromtimestamp(line.get("dt")).strftime('%Y-%m-%d'), 
                                                line.get("temp").get("day"), 
                                                line.get("temp").get("min"), 
                                                line.get("temp").get("max"))
            sql += f"""INSERT INTO {schema}.{table} VALUES ('{date}', '{temp}', '{min_temp}', '{max_temp}');"""
    sql += "END;"
    logging.info(sql)
    cur.execute(sql)

# 모범 답안
def etl(**context):
    api_key = Variable.get("open_weather_api_key")
    # 서울의 위도/경도
    lat = 37.5665
    lon = 126.9780

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)
    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    insert_sql = """DELETE FROM keeyong.weather_forecast;INSERT INTO keeyong.weather_forecast VALUES """ + ",".join(ret)
    logging.info(insert_sql)
    try:
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        cur.execute("Rollback;")
        raise

"""
CREATE TABLE <db>.<table> (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    updated_date timestamp default GETDATE()
);
"""
  • 차이점
    • (나) : Create table(함수 안에서) → Delete from → for문에서 line별로 Insert
    • (모범 답안) : Create table(함수 밖에서) → for문에서 line 별로 빈 array에 데이터 저장 → Delete from → Insert
    • 그리고 모범 답안은 try/execpt를 써서 에러 처리를 해주었다.

  • Incremental Update ver.(etl 함수만)
# 내 버전
def load(**context):
    schema = context["params"]["schema"]
    table = context["params"]["table"]
    
    cur = get_Redshift_connection()
    lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    sql = "CREATE TABLE {schema}.temp_{table} AS SELECT * FROM {schema}.{table};".format(schema=schema, table=table)
    for line in lines:
        if line != "":
            (date, temp, min_temp, max_temp) = (datetime.fromtimestamp(line.get("dt")).strftime('%Y-%m-%d'), 
                                                line.get("temp").get("day"), 
                                                line.get("temp").get("min"), 
                                                line.get("temp").get("max"))
            sql += f"""INSERT INTO {schema}.temp_{table} VALUES ('{date}', '{temp}', '{min_temp}', '{max_temp}');"""
    sql += "BEGIN; DELETE FROM {schema}.{table};".format(schema=schema, table=table)
    sql += """
                INSERT INTO {schema}.{table}
                SELECT date, temp, min_temp, max_temp, created_date
                FROM (
                        SELECT *, 
                        ROW_NUMBER() OVER (
                                            PARTITION BY date 
                                            ORDER BY created_date DESC
                                            ) seq
                        FROM {schema}.temp_{table}
                )
                WHERE seq = 1;
           """.format(schema=schema, table=table)
    sql += "END;"
    logging.info(sql)
    cur.execute(sql)

# 모범 답안
def etl(**context):
    schema = context["params"]["schema"]
    table = context["params"]["table"]
    lat = context["params"]["lat"]
    lon = context["params"]["lon"]
    api_key = Variable.get("open_weather_api_key")

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)

    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    
    # 임시 테이블 생성
    create_sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};
    CREATE TABLE {schema}.temp_{table} (LIKE {schema}.{table} INCLUDING DEFAULTS);INSERT INTO {schema}.temp_{table} SELECT * FROM {schema}.{table};"""
    logging.info(create_sql)
    try:
        cur.execute(create_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # 임시 테이블 데이터 입력
    insert_sql = f"INSERT INTO {schema}.temp_{table} VALUES " + ",".join(ret)
    logging.info(insert_sql)
    try:
        cur.execute(insert_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # 기존 테이블 대체
    alter_sql = f"""DELETE FROM {schema}.{table};
      INSERT INTO {schema}.{table}
      SELECT date, temp, min_temp, max_temp FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY updated_date DESC) seq
        FROM {schema}.temp_{table}
      )
      WHERE seq = 1;"""
    logging.info(alter_sql)
    try:
        cur.execute(alter_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

"""
CREATE TABLE keeyong.weather_forecast (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    updated_date timestamp default GETDATE()
);
"""
  • 내 코드의 문제점

    • load 함수 실행시 이미 임시테이블이 있다고 시행이 안 됨.
    • 알아서 값을 받아와야하는 created_date 컬럼값을 못 받아옴.
  • 차이점

    • Create Table시 전에 Drop if exist 사용해서 임시테이블 먼저 삭제.
    • 그리고 Create table시 Like {schema}.{table} INCLUDING DEFAULTS(기존에 있던 테이블 속성 그대로 받아옴)를 사용.
    • (나) : Create (임시) table → for문에서 line별로 Insert → Delete from 원본테이블 → 임시테이블에서 row_number로 가장 최신것만 저장
    • (모범 답안) : Create (원본) table(함수 밖에서) → for문에서 line 별로 빈 array에 데이터 저장 → Drop 임시테이블 → Create 임시테이블 → 임시테이블에 Insert → Delete from 원본테이블 → 임시테이블을 원본테이블로 alter(이 때 row_number를 사용해서 가장 최신것만 저장.
    • 임시테이블 생성시 Drop if exitstLike INCLUDING DEFAULTS를 사용하지 않은게 패인.

  • Airflow 환경 설정 변경

    • 환경설정이 들어있는 파일은?
      • airflow.cfg
    • airflow를 API 형태로 외부에서 조작하고 싶다면?
      • api 섹션에서 auth_backend를 airflow.api.auth.backend.basic_auth로
    • variable에서 web ui상 별표(*)로 표시하기 위해 포함되어야 할 단어?
      • password, secret, passwd, authorization, api_key, apikey, access_token
    • 환경 설정 파일이 수정되었다면 실제로 반영시키기 위해 입력해야하는 커맨드는?
      • sudo systemctl restart airflow-webserver
      • b. sudo systemctl restart airflow-scheduler
    • Dags폴더에 새로운 DAG들을 airflow가 스캔하는 주기가 결정되는 키는?
      • dag_dir_list_interval = 300(default)
      • dag 폴더 안에 파일을 그냥 테스트로 짰는데 from airflow import DAG 쓰면 이걸 airflow가 스캔해서 실행해버림. → 테스트 코드도 돌아가버려서 비용 Boom🤯


🧐 5주차

production db(MySQL) → DW(Redshift)

기본적인 구조

  • MySQL → S3 → Redshift

순서

  1. airflow에서 connections에 MySQL 등록
    • 이 때, production db에 영향을 주는 것을 막기 위해 보통 login 유저에게는 굉장히 제한적인 기능만을 허락한다(read만).
  2. 똑같이 airflow connections에서 S3 등록
  3. 그 다음 DAG로 MySQL에서 레코드들 읽어와서 S3에 적재하고 그걸 다시 Redshift로 복사
profile
완료주의

0개의 댓글