학습주제
Airflow 설치와 프로그래밍
실제로 설치 및 코딩
학습내용
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
튜플을 바로 리스트 원소로 넣을 수 있음.
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
schema = "keeyong"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
파이프라인 실행 때마다 중복이 생성.
멱등성 일부. 풀리프레쉬 구현함에 있어, 테이블 레코드 삭제 후 새로운 레코드 적재. 중간 에러가 나 정합성이 깨졌음.
풀리프레쉬
가장 선호되는 방식. 데이터 커지면 못씀
인크리멘탈
복잡도가 증가
데이터 소스단, 특정 날짜 기준으로 변경 레코드를 읽을 수 있는 방법을 지원해줘야 함.
백필 이슈가 생김.
엑시큐션 데이트 이용 -> 수업 때 설명
에어플로우가 인크리멘탈 업테이트를 쉽게 하기 위해 부여한 시스템 변수
DELETE FROM, TRUNCATE
TRUNCATE -> 조건 없고, 모든 레코드를 날림.
DELETE FROM -> WHERE을 걸어서 삭제 가능.
만약 컬럼을 날리고 싶으면
ALTER TABLE, DROP COLUMNS 이용
SQL 트랜젝션 유무 상관없이 TRUNCATE 바로 실행
DELETE FROM은 트랜잭션 준수함.
나의 경우 여전히 ipython-sql==0.3.9 설치해야 원활하게 돌아간다.
set_session이 autocommit=True로 되어있다. 기억해두기.
이 경우 한줄씩 cur.execute 때마다 커밋이 된다.
헤더를 제거하기 위해, 한줄 추가됨.
def transform(text):
lines = text.strip().split("\n")[1:] # 첫번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",")
records.append([name, gender])
return records
트랜잭션 처리
def load(records):
schema = "kjw9684k"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH를 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;")
except (EXception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
COMMIT과 END는 동의어
try, except로
에러가 나면 except로 이동해 ROLLBACK BEGIN 하기 전 상태로 돌아감. BEGIN이후 COMMIT 전 수행은 다른 사용자들에게 안보임. 물리적 테이블에 적용되지 않음. COMMIT, END 부른 후에 물리적 테이블에 반영됨.
이렇게 데이터의 정합성을 보장해줌.
끝까지 잘 성공하면 물리적 테이블에 반영. 실패시 ROLLBACK으로 BEGIN 전으로 돌아감.
헤더가 빠진 것을 볼 수 있다.
비밀번호를 잘못 입력함. 그러나 load 함수 불러올 때에는 별다른 에러 메세지 내지 않음.
비밀번호 고치니까 정상 작동.
DELETE, INSERT INTO가 깨졌음에도 정합성이 깨지지 않는 것을 시연.
실수로 문장에 오타를 냄.
DELETE는 수행되었으나, INSERTT로 INSERT는 수행되지 않는 상황
뒤에 load함수를 다시 호출
트랜잭션을 걸지 않으면 DELETE만 수행되어 테이블 내용이 없는 상황으로 끝남.
정확히는 AUTOCOMMIT 이 TRUE이므로 DELETE가 수행
에러가 났지만
테이블 레코드 수를 카운트해보면
여전히 100개인 것을 확인
DELETE을 했지만 트랜잭션을 열고 except로 넘어가 ROLLBACK했기 때문
뭔가 연달아서 실행되는 작업.
중간과정 실패하면 불완전.
은행 이체 과정
내 계좌에서 돈만 빠지고, 송금 실패하면 안됨.
다 같이 성공하던지, 아니면 완전히 실패하던지.
https://postgresql.kr/docs/9.2/tutorial-transactions.html
BEGIN - COMMIT, END
ROLLBACK
1번부터 N번까지 하나라도 실패하면 데이터의 정합성이 깨지는 수행이 있다면
BEGIN 먼저 실행
이후 COMMIT
ROLLBACK으로 분기
파이썬이라면 try, except를 붙임
try 처음과 끝에 begin, commit
이게 가능한건 autocommit=True인 경우임.
트랜잭션은
autocommit True
자동 커밋. 변경사항이 있으면 물리 테이블에 바로 반영
이를 막고 싶으면 BEGIN으로 명시적으로 시작. 그 사이에 ROLLBACK을 부르면 BEGIN 전으로 돌아감
FALSE인 경우
자동 커밋하지 않음
모든 오퍼레이션이 COMMIT, END 부르기 전까진 변경사항들이 나한테만 보임. 세션의 스테이징 테이블에만 보임.
writing 작업이 트랜잭션 상태
개인의 선호, 팀의 선호. 여러 사람과 같이 일한다고 하면 공통의 정책을 정함. 그 결정사항을 따라가면 됨.
롤백 뒤에 아무것도 없기에
롤백되었다는 사실이 누구에게도 알려지지 않음. 아무일도 없었던 것처럼 쭉 진행됨.
만약, 외부에 명시적으로 보이지 않고 계속 롤백을 하면, 사실 try 부분의 코드가 실행되지 않음.
데이터 엔지니어들이 모르고 넘어감. try 블록이 실행되지 않는다는걸 모르고 넘어감.
업데이트가 안되는걸 나중에 알게됨.
불안정한 에러처리는 굉장히 위험함.
raise를 사용하면 프로그램이 결국 fail함
에어플로우가 바로 알게됨.
이렇게 되면, 슬랙, 이메일로 보내지기 때문에 데이터 엔지니어가 인식할 수 있음.