Airflow
Full Refresh
란 데이터가 작을 경우 가능하면 통으로 복사해서 테이블을 만드는 것을 말한다.Incremental Update
를 한다.Incremental Update
라고 한다.Incremental Update
를 할 때 갖추어져야 하는 것은 created(timestamp), modified(timestamp), deleted(boolean) 필드가 필요하다. Incremental Update
가 불필요한 경우 Full Refresh
를 하는 게 더 효율적이다.데이터 엔지니어로 데이터 품질에 신경 쓰고 데이터 활용이란 측면에서 셀프 서비스화 다른 영역으로 기술과 경험을 확장하는 것이 중요하다.
📘 과제 - 생성한 ETL의 문제 개선
- 먼저
NAME_GENDER
라는 테이블에ETL
을 통해 데이터를 적재한 후 다음과 같은 쿼리로 데이터를 조회해 보았다.SELECT GENDER , COUNT(1) GENDER_CNT FROM SSONG_JI_HY.NAME_GENDER GROUP BY 1
- 이때 두 가지 문제가 발생하게 되는데 이 문제를 개선하는 것이 과제였다.
❓ 문제 1
- GENDER 1의 경우 올바른 데이터가 아니라 헤더이다. 현재 헤더 역시 데이터로 읽혀 적재가 되었다. 이 문제를 어떻게 해결할 수 있을까?
🔑 해결 방법
- 인덱스 슬라이싱을 통해
Transform 과정
에서 헤더의 경우 제외하고 리스트 처리가 이루어지게 해 준다.def transform(text): lines = text.strip().split("\n") records = [] for l in lines: (name, gender) = l.split(",") # l = "Jihye,F" -> [ 'Jihye', 'F' ] records.append([name, gender]) return records
- 이전의
transform
코드가 다음과 같았다면 처음split
시 아예 첫 번째 라인을 제외 처리되도록 해 주는 것이다.def transform(text): lines = text.strip().split("\n")[1:] records = [] for l in lines: (name, gender) = l.split(",") # l = "Jihye,F" -> [ 'Jihye', 'F' ] records.append([name, gender]) return records
- 이제 transform을 실행해 적재할 데이터에 헤더가 들어가지 않는지 확인해 보자.
lines = transform(data)
- 다음과 같이 헤더인 [NAME, GENDER]이 사라졌음을 알 수 있다.
❓ 문제 2
- 해당
ETL
을 두 번 하게 되면 이전에 한 데이터가 삭제되지 않아 중복 데이터가 발생하게 된다. 이런 경우멱등성
이 깨지게 되는데 이 문제는 어떻게 해결할 수 있을까?🔑 해결 방법
- 적재해 주는
Load 함수
의 수정이 필요하다. 두 번 ETL 과정을 돌릴 시 중복 데이터 문제가 발생하는 오류 해결을 위해서DELETE
쿼리문을 추가해 준다.- 이 과정을
Full refresh
라고 부른다. 매번 전체 데이터를 새로 가지고 오는 것이다.- 다만 이런 경우
DELETE
는 수행되고,INSERT
는 수행되지 않는다거나DELETE
가 수행되지 않고INSERT
가 수행되면 데이터의 정합성이 깨지게 되므로SQL Transaction
개념으로BEGIN
END
를 통해 각각의 쿼리가 하나의 플로우로 같이 돌아갈 수 있게 구현해 준다.- 이전 코드가 다음과 같았다면,
def load(records): # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음 cur = get_Redshift_connection() # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: name = r[0] gender = r[1] print(name, "-", gender) sql = "INSERT INTO SSONG_JI_HY.NAME_GENDER VALUES ('{n}', '{g}')".format(n=name, g=gender) cur.execute(sql)
- 각각 주석을 따라 추가적인 코드를 작성해 주도록 한다.
def load(records): schema = "SSONG_JI_HY" # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음 cur = get_Redshift_connection() try: cur.execute("BEGIN;") cur.execute(f"DELETE FROM {schema}.name_gender;") # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: name = r[0] gender = r[1] print(name, "-", gender) sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')" cur.execute(sql) cur.execute("COMMIT;") # cur.execute("END;") except (Exception, psycopg2.DatabaseError) as error: print(error) cur.execute("ROLLBACK;") #롤백을 하게 되면 BEGIN 이전의 상태로 돌아가며 BEGIN-END 사이의 변경들은 다른 쪽에서는 보이지 않는다 COMMIT이 완료되는 순간 물리적 테이블에 반영된다
#변환 작업까지 끝낸 lines를 load(적재)해 줌 load(lines)
%%sql SELECT GENDER , COUNT(1) GENDER_CNT FROM SSONG_JI_HY.NAME_GENDER GROUP BY 1
- 이제 다시 조회해 두 번 돌렸을 때 101 개의 데이터가 한 번 더 쌓이는 문제가 해결되었는지를 확인해 보자.
- 총 헤더 데이터 제외 100 개의 데이터가 다시 적재된 것을 볼 수 있고, 이전 데이터가 삭제되었음을 확인할 수 있다.