[TIL] 데이터 파이프라인, Airflow (2)

이원진·2023년 6월 6일
0

데브코스

목록 보기
42/54
post-thumbnail
post-custom-banner

학습내용


  1. SQL 트랜잭션 이해하기

  2. Airflow 설치

  3. Airflow 기본 프로그램 실행

1. SQL 트랜잭션 이해하기


  • 트랜잭션

    • 하나의 기능처럼 실행되어야 하는 SQL을 묶어서 처리하는 방법

      • 하나라도 실패할 경우, 해당 트랜잭션의 모든 SQL은 DB에 반영되지 않음

    • BEGIN-END(or BEGIN-COMMIT) 블럭 사이에 SQL 작성

    • COMMIT은 변경사항을 DB에 반영, ROLLBACK은 BEGIN 이전 상태로 회귀

      • COMMIT 이전에는 다른 세션에서 변경사항을 확인할 수 없음

    • autocommit 파라미터를 사용해 자동 COMMIT 여부를 설정 가능

      • autocommit 사용은 개인/팀의 선택

      • Python에서는 일반적으로 try-catch를 사용해 에러 발생 시 ROLLBACK, 문제 없을 시 COMMIT

        • 이때, 어떤 에러가 발생했는지 파악하기 위해 ROLLBACK 이후 raise 사용

2. Airflow 설치


  1. 운영체제에 맞는 Docker Engine 설치

  2. Docker 실행 -> 설정 -> Resources -> 메모리 4GB 이상 할당

    • 윈도우의 경우, 6GB 할당

  3. airflow-autosetup Github repo 클론

    • git clone ‘https://github.com/keeyong/airflow-setup.git’

  4. airflow-setup 폴더로 이동해 airflow 2.5.1 이미지 관련 yaml 파일 다운로드

    • curl -LfO ‘https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yml’

  5. 도커 이미지 다운로드 및 컨테이너 실행

    • docker-compose -f docker-compose.yaml pull

    • docker-compose -f docker-compose.yaml up

    • Docker Desktop에서 아래와 같이 컨테이너 동작 상황 확인 가능


  6. localhost:8080으로 웹 UI 로그인


3. Airflow 기본 프로그램 실행


  • Airflow 코드 동작 방식

    1. DAG를 대표하는 객체 생성

      • DAG 이름, 실행 주기, 실행일, 오너 등

    2. DAG를 구성하는 태스크 생성

      • 태스크별로 적합한 오퍼레이터 선택

      • 태스크 ID를 부여하고, 해야할 작업의 세부사항 지정

    3. 최종적으로 태스크들 간 실행 순서 결정

  • DAG 설정 예제

    from airflow import DAG
    
    dag = DAG(
            "dag_v1",
            start_date = datetime(2020, 8, 7, hour = 0, minute = 00),
            schedule = "0 * * * *",
            tags = ["example"],
            catchup = False,
            default_args = default_args
    )

  • Bash Operator를 사용한 예제

    • 3개의 태스크로 구성

      • t1: 현재 시각 출력

      • t2: 5초 간 대기 후 종료

      • t3: 서버의 /tmp 디렉토리 내용 출력

    • t1이 끝나고 t2와 t3를 병렬로 실행


    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from datetime import datetime, timedelta
    
    default_args = {
            'owner': 'owner_name',
            'start_date': datetime(2023, 5, 27, hour = 0, minute = 00),
            'email': ['user@email.com'],
            'retries': 1,
            'retry_delay': timedelta(minutes = 3)
    }
    
    test_dag = DAG(
            "dag_v1",
            schedule = "0 9 * * *",
            tags = ["test"],
            catchup = False,
            default_args = default_args
    )
    
    t1 = BashOperator(
            task_id = 'print_date',
            bash_command = 'date',
            dag = test_dag
    )
    
    t2 = BashOperator(
            task_id = 'sleep',
            bash_command = 'sleep 5',
            dag = test_dag
    )
    
    t1 = BashOperator(
            task_id = 'ls',
            bash_command = 'ls /tmp',
            dag = test_dag
    )
    
    t1 >> [t2, t3]

  • 웹 UI에서 토글 버튼을 통해 DAG 활성화


  • OR, 터미널에서 DAG 실행

    airflow dags list
    airflow tasks list dag_name
    airflow tasks list dag_name task_name date

메모



post-custom-banner

0개의 댓글