학습주제
airflow Dag 개발
OLTP 복사와 ELT
학습내용
대그를 만들 때 제일 처음 하는게 프로덕션 DB를 DW로 복제 해옴 OLTP -> OLAP
대표적인 mysql, postgres 등이 있음.
처음엔 풀리프레쉬로 만들어보고, 이후 인크리멘탈로 바꿔봄.
앞서 배웠던 execution_date를 사용해보기로 함.
이렇게 execution date 썼다면 백필이 쉬워짐. 실제로 실습진행.
대그는 여러개의 task, 오퍼레이터로 구성됨.
start_date 대그가 처음 읽어와야하는 데이터의 날짜.
execution_date -> 2월 5일
스케줄 크론 표현식
30 * * * *
매 시 30분 마다 실행한다는 뜻
매일 0시 30분마다 실행은
30 0 * * *
일요일 마다 매시 30분에 실행
30 * * * 0
0 * * * *
는 hourly 이므로 다음은 start date의 다음 한 시간
catchup 파라미터는 기본 True로 설정되어 있음.
https://drive.google.com/file/d/18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY/view
중복이 완전하게 제거가 안된 모습. DISTINCT를 사용했을때
보면 약간씩 값이 다른경우 DISTINCT는 완벽하게 같을 경우에만 중복제거를 하기 때문에 살아남게됨.
미국 주식시장이 끝난 후에 실행되면 이런 이슈가 없음. 그러나 장이 열리고 있는 경우 값의 변동이 생김.
DISTINCT상황에 따라 동작할수도, 동작하지 않을수도 있음.
이에 ROW_NUMBER() 윈도우 함수를 사용해본다.
이렇게 SELECT를 두번쓰는 이유는, where은 하위 쿼리에서 못사용함. 또 seq 이거 빼려고.
INSERT INTO {schema}.{table}
SELECT 필요 열들 FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t)
WHERE seq = 1;
DISTINCT 대신 저거 넣어주면 된다.
그 전에는 SELECT DISTINCT * FROM t
장 중간에 호출된 경우, 종가, volume, 최고, 최저가 달라짐.
오토커밋을 True로 줘서 BEGIN;으로 트랜잭션을 열어줌
테이블을 DELETE FROM으로 스키마만 냄김.
INSERT INTO로
SELECT 어떤 열을 가져갈지 선택
이하 새로운 쿼리로 ROW_NUMBER() 사용
DESC를 해주면 생성날짜가 가장 최신인 값이 1 부여됨.
이에 마지막에 WHERE 에서 seq = 1로 레코드 1개만 가져감.
대그를 새로 만들 때
꼭 대그 ID 확인. (보통 개선 버전은 기존 코드를 복사할 것이기에)