[데이터 엔지니어링 데브코스 2기] TIL-10주차-파트03 데이터 파이프라인, Airflow(2)

이재호·2023년 12월 12일
0

1. 멱등성 보장 이슈 관련 예


  • 멱등성 보장 X 버전
def load(records):
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
    for r in records:
        name = r[0]
        gender = r[1]
        print(name, "-", gender)
        sql = "INSERT INTO jaeho.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
        cur.execute(sql)
  • 멱등성 보장 버전
def load(records):
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
    try:
      cur.execute("BEGIN;")
      cur.execute("DELETE FROM jaeho.name_gender;")
      for r in records:
          name = r[0]
          gender = r[1]
          print(name, "-", gender)
          sql = "INSERT INTO jaeho.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
          cur.execute(sql)
          cur.execute("COMMIT;") # 혹은 "END;"로 수정해도 괜찮.
    except Exception as e:
      print(e)
      cur.execute("ROLLBACK;")
      raise
  • BEGIN; ~ COMMIT; ~ ROLLBACK; 을 통해서 데이터 정합성을 만족 가능함.

트랜잭션이란?

  • 은행 계좌 입/출금과 같이 중간에 실패하면 불완전한 상황에 놓이는 작업에 활용함.
  • 따라서 여러 작업들을 하나의 작업처럼 (atomic)하게 처리하는 개념.
  • SQL에서는 "BEGIN END/COMMIT"과 ROLLBACK을 활용하여 구현 가능.
  • python에서는 try/catch로 구현 가능.
  • psycopg2에서, autocommit을 통해 자동으로 커밋하게 될 지 구현 가능. True일 경우 자동으로 커밋되며, False일 경우 .commit() 함수가 필요.
  • try/except 시에 raise를 추가해 주는 것이 좋음.

2. Airflow 설치


  • 직접 설치하고 운영하는 방법과 클라우드를 통해서 사용하는 방법이 있음.
  • 가장 좋은 건 EC2에서 docker를 설치하여 docker 컨테이너로 airflow를 실행하는 것이 가장 좋음.

2-1. 도커를 통한 Airflow 설치


  1. 터미널 실행 및 디렉토리 이동.
  2. git clone https://github.com/keeyong/airflow-setup.git 실행.
  3. cd airflow-setupcurl -LfO https://airflow.apache.org/docs/apache-airflow/{버전}/docker-compose.yaml 실행.
  4. docker-compose -f docker-compose.yaml pulldocker-compose -f docker-compose.yaml up 실행.
  5. http://localhost:8080으로 웹 UI 로그인. (id/pw : airflow/airflow)

2-2. 리눅스 서버에 직접 설치


리눅스 소개.

  • 우분투 : 리눅스 타입 중의 하나.
  • ssh : 리눅스 혹은 유닉스 서버에 로그인해 주는 프로그램(터미널).
  • sudo : 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램.
  • apt-get : 우분투/데비안 계열의 리눅스에서 프로그램 설치/삭제를 관리해 주는 프로그램.
  • su : substitute user의 약자로 현재 사용 중인 사용자 계정에서 다른 사용자의 권한을 얻을 때 사용.
  • vi : 텍스트 에디터.

3. Airflow 구조


  • DAG 객체를 먼저 만듦.

    • DAG 이름, 실행 주기, 실행 날짜, 오너 등
  • DAG 안에 task를 만듦.

    • 테스크별 적합한 오퍼레이터를 선택.
    • 테스크 id를 부여하고 해야될 작업의 세부사항 지정
  • 최종적으로 테스크 간의 실행 순서를 결정.

3-1. 예제


  • 먼저 모든 테스크에 공통적으로 적용되는 설정 파일을 dictionary로 생성.
from datetime import datetime, timedelta

default_args = {
	'owner': 'jaeho',
    'email': ['jaeho@gmail.com'],
    'retries': 1, # 실패 시 최대 재시도 횟수.
    'retry_delay': timedelta(minutes=3), # 재시도 시 딜레이 타임.
    'on_failure_callback': func, # 실패 시 다음 함수 실행.
    'on_success_callback': func, # 성공 시 다음 함수 실행.
}
from airflow import DAG
  dag = DAG(
  "dag_v1", # DAG name
  start_date=datetime(2020,8,7,hour=0,minute=00),
  schedule="0 * * * *",
  tags=["example"],
  catchUp=False, # start_date(과거)와 현재 날짜 사이에 대해서 실행이 안 된 작업 수행 X.
  # common settings
  default_args=default_args
)
  • 전체 코드(Bash Operator)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
 'owner': 'jaeho',
 'start_date': datetime(2023, 5, 27, hour=0, minute=00),
 'email': ['jaeho@hotmail.com'],
 'retries': 1,
 'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
 "dag_v1", # DAG name
 schedule="0 9 * * *", 
 tags=['test'],
 catchUp=False,
 default_args=default_args 
)

# t1 : 날짜 출력 테스크
t1 = BashOperator(
 task_id='print_date',
 bash_command='date',
 dag=test_dag)
# t2 : 5초 간 슬립 테스크
t2 = BashOperator(
 task_id='sleep',
 bash_command='sleep 5',
 dag=test_dag)
# t3 : tmp 디렉토리 파일 조회 테스크
t3 = BashOperator(
 task_id='ls',
 bash_command='ls /tmp',
 dag=test_dag)
# t1 실행 후, t2와 t3 동시 실행.
t1 >> [ t2, t3 ]
  • Airflow 웹 UI에서 dag_v1을 활성화 한다.

  • 만약 또 실행시키고 싶다면 런 버튼을 클릭한다.

  • 각 테스크가 잘 실행된 것을 확인할 수 있다.

  • 테스크의 실행 절차를 확인 가능하다.

  • 특정 테스크의 상세 정보를 보고 싶으면, 녹색 아이콘을 클릭한다.

    • Clear를 누르면 해당 테스크를 초기화 및 다시 실행시킬 수 있다.
    • Log를 누르면 log 정보를 확인할 수 있다.

3-2. 커맨드 라인


  • airflow dags list : airflow 내의 모든 dag들의 정보(dag_id, filepath, owner, paused)를 출력.
  • airflow tasks list {dag 아이디} : 해당 dag의 모든 테스크 출력.
  • airflow tasks test {dag 아이디} {task 아이디} {날짜} : 특정 테스크 실행.

3-3. Docker에서 airflow 커맨드 라인 실행


  • docker ps 명령어로 컨테이너들의 아이디 조회.
    • 해당 컨테이너의 아이디 복사
  • docker exec -it 3199a1254b38 sh 명령어로 쉘 스크립트 실행.
  • 이제부터 위 커맨드 라인 실행 가능.
profile
천천히, 그리고 꾸준히.

0개의 댓글