TIL - Day 47

김혁·2024년 1월 8일
0

Full Refresh
● 단순해서 좋지만 데이터가 커지면 사용불가능
● 이 숙제는 Full Refresh하는 예라고 보면 됨

Incremental Update
● 데이터가 클 경우 효과적이지만 복잡도 증가
● 보통타임스탬프혹은일련번호등의필드필요
● execution_date 활용
● backfill에 대해서 문제가 생김

테이블에서 record 삭제하는 방법
● DELETE FROM vs. TRUNCATE
truncate는 조건 없이 모든 데이터 삭제 - 트랜잭션 무시
delete form은 조건을 where로 걸어서 부분만 삭제 - 트랜잭션 포함

전 코드의 문제

  • 멱등성 보장 x, 중복이 계속 생김
  • header 가 데이터로 포함됨
  • full refresh로 구현되어 있음 - 원래 데이터를 다 삭제하고 다시 채우는 형태인데, 불러오는 과정에서 오류가 나면 데이터의 정합성에 문제

트랜잭션

작업을 하나로 묶음으로서 모든 동작이 하나로

Atomic 하게 실행되어야 하는 sql들을 묶어서 하나의 작업처럼 처리하는 방법
begin과 end 혹은 begin과 commit 사이에 해당 sql들을 사용
rollback은 begin이전의 상태로 돌아가라는 sql 명령

트랜잭션 내의 sql 과정 중의 sql 결과는 임시 상태가 됨. 다른 세션에서는 커밋 전까지 보이지 않음. 트랜잭션에 들어가는 sql을 최소화하는 것이 좋음
위의 경우 auto commit을 사용하는 경우임

두 가지 종류의 트랜잭션이 존재

  • 레코드 변경/삭제/추가를 바로 반영하는지 여부

autocommit = True
기본적으로 모든 sql statement가 바로 물리 테이블에 커밋됨
이를 바꾸고 싶다면 begin end, begin commit을 사용

autocommit=False
기본적으로 모든 sql statement가 커밋되지 않음. 즉 모두 스테이징 상태로 존재. 커넥션 객체의 commit()과 rollback() 함수로 커밋할지 말지 결정

auto commit을 사용할 것인지 말지?
이는 개인이나 팀의 선택

python의 경우 try/catch와 같이 사용하는 것이 일반적
try/catch로 에러가 나면 명시적으로 rollback을 실행 에러가 안 나면 commit을 실행

여기서 문제는 외부에 alert가 되지 않음. rollback이 아무일도 없는 것처럼 바로 되버림. 그래서 오류를 찾기 힘듬
대부분 raise를 해서 exception이 위로 전파됨

ETL을 관리하는 입장에서 어떤 에러가 감춰지는 것보다 명확하게 드러나는 것이 . 더좋음

리눅스 이해하기

ssh: 리눅스 혹은 유닉스 서버에 로그인해주는 프로그램 (터미널)
○ private key와 public key를 사용
● sudo: 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램이다.
● apt-get: 우분투/데비안 계열의 리눅스에서 프로그램 설치/삭제를 관리해주는
프로그램
○ apt-get update, apt-get install
● su: substitue user의 약자로 현재 사용 중인 사용자 계정을 로그아웃하지 않고
다른 사용자의 권한을 얻을 때 사용한다
● vi: 텍스트 에디터. https://withcoding.com/112

airflow 설치

ec2로 설치

https://github.com/keeyong/airflow-setup/blob/main/docs/Airflow%202%20Installation.md

도커 설치

https://github.com/keeyong/airflow-setup/blob/main/docs/Airflow%20Docker%20Local%20Setup.md

참고)
docker compose : 단일 서버에서 여러개의 컨테이너를 하나의 서비스로 정의해 컨테이너의 묶음으로 관리할 수 있는 작업 환경을 제공하는 관리 도구

여러 개의 컨테이너가 하나의 어플리케이션으로 동작할 때 한번에 실행시키기 위해서

airflow 설치 같은 경우는 도커로 로컬에 설치했을 때는 개발하기에는 좋지만 공동 작업에서는 효율적이지 않다.. 뭐 당연한 것이 겠지만

Airflow 코드의 기본 구조

Dag 대표하는 객체를 먼저 만듬
dag 이름, 실행주기, 실행 날짜, 오너 등등

다음으로 Dag를 구성하는 태스크들을 만듬
태스크별로 적합한 오퍼레이터를 선택
태스크 ID를 부여하고 해야할 작업의 세부사항 지정

최종적으로 태스크들간의 실행 순서를 결정

catchup 파라미터
start_date가 과거라면, 실행 안된 데그들을 airflow는 실행하려고함. 그래서 false로 한다면, 과거 실행이 안된 시간들에 대해서 실행하지 않음.

how to trigger DAG

airflow web ui 에서 dag 체크 버튼 클릭 or 플레이버튼

먼저 airflow 서버에 로그인하고 다음 명령 실행

  • airflow dags list
    airflow tasks list DAG 이름
    airflow tasks test DAG 이름 Task 이름 날짜 vs run 도 가능(메타데티이터에 기록됨)
    날짜는 YYYY-MM-DD
    start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우 실행 안됨
    이게 바로 execution_date의 값이 됨

웹 UI 살펴보기
터미널로 연결해서 커맨드라인 툴 사용해보기

웹 UI 살펴보기
● 터미널로 연결해서 커맨드라인 툴 사용해보기 (EC2와 Docker 버전 모두)
○ airflow dags list - dag list
○ airflow tasks list dag_v1 - dag 내 task
○ airflow tasks test dag_v1 ls 2020-08-09 - ls task 실행
○ airflow dags test dag_v1 2019-12-08
○ airflow dags backfill dag_v1 -s 2019-01-01 -e 2019-12-31

만약 t1 -> t2,t3 라면
t1을 web ui에서 clear 해준다면 병렬로 다시 아래 작업이 실행됨.

터미널)
docker exec -it 6b7c5225d59e sh (스케쥴러 id)
좌측 sh에 (airflow)가 나온다면
○ airflow dags list
○ airflow tasks list dag_v1
○ airflow tasks test dag_v1 ls 2020-08-09
○ airflow dags test dag_v1 2019-12-08
○ airflow dags backfill dag_v1 -s 2019-01-01 -e 2019-12-31
실행 가능

profile
군도리

0개의 댓글