데브코스 42일차 - 트랜잭션과 Airflow설치

Pori·2023년 12월 12일
0

데엔

목록 보기
32/47

트랜잭션이란

  • Atomic → 한번에 실행되어야 하는 SQL을 묶어 하나의 작업처럼 처리하는 방법
  • 구현 방법
    • autocommit = True
      • 변경 사항을 바로 적용시킴
    • autocommit = False
      • .commit이나 rollback이 나오기전에는 스테이징 상태로 존재하며, 커밋되지 않는다.
    • 유의 점
      try:
      	cur.execute(create_sql)
      	cur.execute("COMMIT;")
      except Exception as e:
      	cur.execute("ROLLBACK;")
       # raise 하여 에러 발생이 필요하다.
      : 이 경우에 에러가 발생하면 아무것도 알 수 없기 때문에 주의해야한다.
      → 에러가 명시적으로 보여야한다.

Airflow 설치 (Airflow 2.5.1)

직접 설치 (Docker)

  • 메타데이터베이스는 Postgres사용

  • Airflow 설치 경로 : /var/lib/airflow/

  • Airflow 서버에 3개의 어카운트가 사용된다.

    • ubuntu : 메인 어카운트
    • porstgres : root이외에 postgres 액세스를 위한 airflow 계정이 필요
    • airflow : airflow용 어카운트.
  • 이번 과정에서는 Docker에 설치 시 git에 준비된 레포를 활용하여 설치하였다. 이외에 설치 방법을 간단하게 정리하려한다.

  1. apache-airflow 설치 : pip install "apache-airflow[celery]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"

  2. curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.2/docker-compose.yaml'

  3. 환경 변수와 dags 폴더 생성

    1. mkdir -p ./dags ./logs ./plugins
    2. echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
    3. 만약 수동으로 할 경우에 AIRFLOW_UID=50000 를 .env에 붙여넣는다.
  4. 초기화 후 실행

    1. docker compose up airflow-init
    2. docker compose up

참고) Running Airflow in Docker — Airflow Documentation

EC2를 활용하여 설치 (Mac 이용)

  • EC2 구축
    • Key pair 설정 → 서버에 설치된다.
      • airflow-dev.pem 다운로드
    • 보안 그룹 설정 (24, 8080포트 오픈이 필요)
      • SSH traffic 설정
  • Instance를 누르고 private IPv4 address를 복사.
  • 터미널 오픈 후 SSH 커맨드를 사용
    • ssh - i airflow-dev.pem ubuntu@<복사한 IPv4 address>
    • 키 권한에러 발생 시
      • chmod 600 airflow-dev.pem ⇒ 권한을 줄여야한다.
  • 접속 후 기본 세팅
    • sudo apt-get update 를 통해 apt-get 업데이트 진행
    • python3 세팅을 해주어야 한다.
      • wget https://bootstarp.pypa.io/get-pip.py
      • sudo python3 get-pip.py
      • sudo apt-get install -y python3-pip
    • openssl 버전 업그레이드
      • sudo pip3 install pyopenssl --upgrade
  • Airflow 모듈 설치
    • mySQL 모듈 업그레이드
      • sudo apt-get install -y libmysqlclient-dev
    • Airflow 모듈 설치 (airflow: 2.5.1, modules : celery,amazon,mysql,postgres)
      • pip install "apache-airflow[celery,amazon,mysql,postgres]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt" 위 코드를 활용해서 버전과 모듈을 설치한다.
  • ubuntu에서 airflow를 사용하기 위한 유저 생성
    • sudo groupadd airflow
    • sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m
  • Postgresql
    • sudo apt-get install -y postgresql postgresql-contrib
    • 설치 완료가 되면 postgres라는 계정이 생성된다.
    • 계정 변경 : sudo su postgres
    • postgresql에서 사용할 airflow 계정을 만들어야한다. psql 로 접속
      • CREATE USER airflow PASSWORD 'airflow';
      • 전용 db 생성 : CREATE DATABASE airflow;
    • postrgresql 재시작하기 : sudo service postgresql restart
  • Airflow 계정 변환 후 설치
    • sudo su airflow
    • Home directory로 이동 : cd ~/
    • mkdir dags 로 dag들이 만들어질 폴더 생성
    • DB 초기화 : AIRFLOW_HOME = /var/lib/airflow airflow db init
    • airflow.cfg 파일설정
      • dags_folder 위치를 맞추기
      • executor를 LocalExecutor로 변경해준다. → Postgresql용 최적의 executor
      • Postgresql 설정
        • [database]의 sql_alchemy_conn을 변경해야한다.
        • sqlite를 postgresql+psycopg2: //airflow:airflow@localhost:5432/airflow 로 변경한다.
    • cfg 재설정 완료 후 DB 초기화를 다시 실행한다. AIRFLOW_HOME = /var/lib/airflow airflow db init
  • Airflow가 백그라운드에서 돌아가게 하기.
    • 웹서버를 서비스로 등록: sudo vi /etc/systemd/system/airflow-webserver.service

    • 파일의 내용을 다음과 같이 채워준다.

      [Unit]
      Description=Airflow webserver
      After=network.target
      
      [Service]
      Environment=AIRFLOW_HOME=/var/lib/airflow
      User=airflow
      Group=airflow
      Type=simple
      ExecStart=/usr/local/bin/airflow webserver -p 8080
      Restart=on-failure
      RestartSec=10s
      
      [Install]
      WantedBy=multi-user.target
    • 스케쥴러 서비스 등록

      [Unit]
      Description=Airflow scheduler
      After=network.target
      
      [Service]
      Environment=AIRFLOW_HOME=/var/lib/airflow
      User=airflow
      Group=airflow
      Type=simple
      ExecStart=/usr/local/bin/airflow scheduler
      Restart=on-failure
      RestartSec=10s
      
      [Install]
      WantedBy=multi-user.target
    • 서비스 활성화

      sudo systemctl daemon-reload
      sudo systemctl enable airflow-webserver
      sudo systemctl enable airflow-scheduler
      
      # 서비스 시작
      sudo systemctl start airflow-webserver
      sudo systemctl start airflow-scheduler
  • Airflow webserver에 로그인 어카운트 생성하기
    • sudo su airflow
    • AIRFLOW_HOME=/var/lib/airflow airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password adminpassword
    • ubuntu에서 실행한 경우에 admin 계정을 지워야한다.
      • AIRFLOW_HOME=/var/lib/airflow airflow users delete --username admin
  • 웹서버 포트를 열어야한다.
    • EC2 > 보안그룹 > 인바운드 규칙에서 8080포트를 열어준다.

클라우드 사용

  • AWS : MWAA 사용
  • 구글 클라우드 : Cloud Composer
  • MS Azure : Azure Data Factory에 Airflow DAGs 기능이 존재한다.

Airflow 기본 구조

  1. DAG를 대표하는 객체를 먼저 만든다.
  2. DAG를 구성하는 태스크들을 생성
  3. 태스크들간의 실행 순서를 결정한다.

DAG 설정 예제

  • default_args 설정 : 이곳에 지정되는 인자들은 모든 태스크들에 공통으로 적용된다.
    • owner
    • email
    • retries
    • retry_delay 등등
      from datetime import datetime,timedelta
      
      default_args = {
      	'owner' : 'poriz',
      	'retries': 1,
      	...
      }
  • DAG 정의
    • catchup : 실행날짜 이전에 계획된 것이 있는 경우 True로 변경하면 실행된다.
    • schedule : cron 표현식을 사용한다.
dag = DAG(
 "dag_v1" # DAG name
	start_date= datetime(2020,1,1,hour=0,minute=00),
	schedule = "0 * * * *"
	catchup = False
)

Bash Operator 예제

  • 3개의 태스크
  • t1은 현재시간을 출력
  • t2는 5초간 대기 후 종료
  • t3는 서버의 /tmp 디렉토리의 내용 출력
  • t1이 끝나고 t2와 t3를 병렬로 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# default_args 설정
default_args{
	'owner' : 'poriz',
	'start_date' : datetime(2023,1,1,hour=0,minute=00),
...
}

# dag 정의
test_dag = DAG(
 "dag_v1" # DAG name
	start_date= datetime(2020,1,1,hour=0,minute=00),
	schedule = "0 * * * *"
	catchup = False
)

# task 생성
t1 = BashOperator(
	task_id = 'print_date',
	# bash 명령어 사용
	bash_command='date',
	dag = test_dag)

t2 = BashOperator(
	task_id = 'sleep',
	bash_command='sleep 5',
	dag = test_dag)

t3 = BashOperator(
	task_id = 'ls',
	bash_command='ls /tmp',
	dag = test_dag)

t1 >> [t2,t3]

0개의 댓글