[데이터 엔지니어링 데브코스] TIL 37일차 - 데이터 파이프라인과 Airflow(2)

박단이·2023년 12월 12일
0

데브코스 TIL

목록 보기
37/56

오늘 배운 것🤓

트랜잭션(Transaction)

  • Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법
  • 중간에 실패하면 불완전한 상황에 놓이는 일련의 작업들이 있을 때 사용한다.
  • BEGIN; 부터 END; 또는 COMMIT; 사이에 놓은 작업들을 하나의 작업처럼 사용한다.
  • ROLLBACK;BEGIN; 이전으로 돌아가라는 명령어
  • BEGIN;이 실행되면 END;/COMMIT;을 만나기 전까지 SQL은 임시 상태에 놓이고 모두 성공하면 최종 상태가 된다.
BEGIN;
  INSERT ~~~;
  DELETE ~~~;
END;   # COMMIT과 같음
  • autocommit=true : 기본적으로 모든 SQL statement가 바로 테이블에 커밋
  • autocommit=false : 모든 SQL statement가 커밋되지 않아 명시적으로 선언해야한다. 모두 staging 상태를 유지하고 있다.
  • Python의 경우 try/catch문과 같이 사용한다.
    • 에러가 나면 ROLLBACK;을, 에러가 안나면 COMMIT;을 명시적으로 작성해야 한다.
    • 에러가 생기면 외부에 명시적으로 표출해야하기 때문에 except문에서 raise를 사용하자.

Airflow 설치

  • 직접적으로 설치하는 방법과 클라우드를 사용하는 방법이 있다.
  • 클라우드를 사용하려면 최소 서버 3대를 운용해야하기 때문에 비용적으로 큰 무리가 있을 수 있다.
  • 직접 설치하는 방법에는 docker의 이미지를 사용하는 방법과 AWS의 EC2에 linux 서버를 구축하여 사용하는 방법이 있다.
  • 개인이 프로젝트를 진행한다면 docker에 이미지를 사용하면 되지만, 다수의 인원과 프로젝트를 진행할 때는 EC2를 사용하도록 하자.(물론 비용이 든다 ㅠ 최소 t3, t3a여야한다.)
  • 가장 best는 EC2 위에 docker engine을 구동하여 airflow 이미지를 실행하는 것이지만 최소 8GB의 메모리와 그 절반을 docker에 사용해야한다는 부담이 있다.

Airflow 코드 기본 구조

  1. DAG 객체 생성
    • DAG 이름, 실행 주기, 실행 날짜, 담당자 등을 기입
    • 모든 task의 공통 설정을 dictionary를 만들어서 지정한다.
from airflow import DAG

# task 공통 설정
default_args = {
  'owner' : 'owner_id',    	# 담당자 id
  'email' : 'owner_email', 	# 담당자 email
  'retries' : 1,			# 재시도 횟수
  'retry_delay': timedelta(minutes=3),	# 재시도 사이에 기다리는 시간
  'on_failure_callback' : fail_ft,		# 실패 시 실행되는 함수
  'on_success_Callback' : success_tf	# 성공 시 실행되는 함수
}

# DAG 생성
dag = DAG(
		'dag_name',
		start_date = datetime(  ),
        schedule = '0 * * * *',		# dag를 실행하는 주기
        tags = ['tag_nm1', 'tag_nm2', ...], # 하나만 있어도 된다.
        catchup = False, 	# start_date로 생성하는 날보다 과거로 설정시 그 gap을 따라잡을지(True), 따라잡지 않을 지(False)
        default_args = default_args
	  )
  1. DAG를 구성하는 task 생성
    • task 별로 적합한 operator 선택
    • task id를 부여
    • 해야할 작업의 세부 사항 지정
from airflow.operators.bash import BashOperator

task1 = BashOperator(
			task_id = 'task_id',
            bash_command = 'command',
            dag = dag
)
  1. task 사이의 실행 순서 결정
    • 직렬, 병렬 모두 가능하다.
    • 직렬일 때 >>, 병렬이면 [ ] 사이에 task 작성
task1 >> [task2, task3]

느낀 점😊

아직까지는 큰 어려움 없이 열심히 따라가고 있다.
설치하는 방법에 대해서는 따로 더 자세하게 게시글로 작성해봐야지!

profile
데이터 엔지니어를 꿈꾸는 주니어 입니다!

0개의 댓글