SQL 트랜잭션 이해하기
Airflow 설치
Airflow 기본 프로그램 실행
트랜잭션
하나의 기능처럼 실행되어야 하는 SQL을 묶어서 처리하는 방법
BEGIN-END(or BEGIN-COMMIT) 블럭 사이에 SQL 작성
COMMIT은 변경사항을 DB에 반영, ROLLBACK은 BEGIN 이전 상태로 회귀
autocommit 파라미터를 사용해 자동 COMMIT 여부를 설정 가능
autocommit 사용은 개인/팀의 선택
Python에서는 일반적으로 try-catch를 사용해 에러 발생 시 ROLLBACK, 문제 없을 시 COMMIT
운영체제에 맞는 Docker Engine 설치
Docker 실행 -> 설정 -> Resources -> 메모리 4GB 이상 할당
airflow-autosetup Github repo 클론
git clone ‘https://github.com/keeyong/airflow-setup.git’
airflow-setup 폴더로 이동해 airflow 2.5.1 이미지 관련 yaml 파일 다운로드
curl -LfO ‘https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yml’
도커 이미지 다운로드 및 컨테이너 실행
docker-compose -f docker-compose.yaml pull
docker-compose -f docker-compose.yaml up
Docker Desktop에서 아래와 같이 컨테이너 동작 상황 확인 가능
localhost:8080으로 웹 UI 로그인
Airflow 코드 동작 방식
DAG를 대표하는 객체 생성
DAG를 구성하는 태스크 생성
태스크별로 적합한 오퍼레이터 선택
태스크 ID를 부여하고, 해야할 작업의 세부사항 지정
최종적으로 태스크들 간 실행 순서 결정
DAG 설정 예제
from airflow import DAG
dag = DAG(
"dag_v1",
start_date = datetime(2020, 8, 7, hour = 0, minute = 00),
schedule = "0 * * * *",
tags = ["example"],
catchup = False,
default_args = default_args
)
Bash Operator를 사용한 예제
3개의 태스크로 구성
t1: 현재 시각 출력
t2: 5초 간 대기 후 종료
t3: 서버의 /tmp 디렉토리 내용 출력
t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'owner_name',
'start_date': datetime(2023, 5, 27, hour = 0, minute = 00),
'email': ['user@email.com'],
'retries': 1,
'retry_delay': timedelta(minutes = 3)
}
test_dag = DAG(
"dag_v1",
schedule = "0 9 * * *",
tags = ["test"],
catchup = False,
default_args = default_args
)
t1 = BashOperator(
task_id = 'print_date',
bash_command = 'date',
dag = test_dag
)
t2 = BashOperator(
task_id = 'sleep',
bash_command = 'sleep 5',
dag = test_dag
)
t1 = BashOperator(
task_id = 'ls',
bash_command = 'ls /tmp',
dag = test_dag
)
t1 >> [t2, t3]
웹 UI에서 토글 버튼을 통해 DAG 활성화
OR, 터미널에서 DAG 실행
airflow dags list
airflow tasks list dag_name
airflow tasks list dag_name task_name date