[5/21] TIL - Airflow 설치 및 프로그래밍

Sangwon Jwa·2024년 5월 21일

데브코스 TIL

목록 보기
33/54
post-thumbnail

📖 학습 주제


  1. 숙제 리뷰
  2. Airflow 설치
  3. Airflow 기본 프로그램 실행

✏️ 주요 메모 사항 소개


숙제 리뷰

저번 시간에 작성한 ETL 코드에서는 SQL Transaction을 고려하지 않아서 이 부분을 수정할 필요가 있다.

def load(records):
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = "jwa4610"
    
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        cur.execute(f"DELETE FROM {schema}.name_gender;")
       
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
            
        cur.execute("COMMIT;")   # cur.execute("END;")
        
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        
        # 에러 시 ROLLBACK
        # except에서 raise 호출 시 발생한 원래 exception이 위로 전파, ETL 관리 입장에서 어떤 에러가 명확하게 드러나는 것이 더 좋음
        cur.execute("ROLLBACK;")
        raise

Airflow 설치하기

Airflow를 설치하고 활용하는 방법으로는 크게 2가지로 나눌 수 있다.

  1. 직접 설치하고 운영
  2. 클라우드 사용 (프로덕션 환경에서 선호)
    • AWS - MWAA (Managed Workflows for Apache Airflow)
    • GCP - Cloud Composer
    • Azure - Azure Data Factory

2번인 클라우드 사용은 기본적으로 서버를 3대이상 사용하기 때문에 개인이 사용하기엔 부담이 될 수 있다. 따라서 우리는 이번 실습에서 직접 설치하고 운영하는 방법으로 진행해보자.

직접 설치하는 방법도 다음과 같은 2가지 방법으로 진행할 수 있다.

  1. Docker 설치 후 Airflow 설치
  2. AWS EC2 등의 리눅스 서버에 직접 설치 (Ubuntu)

EC2 사용 방법도 완전히 무료는 아니기 때문에 (한달에 18$ 정도?) 자신의 상황에 맞춰서 결정하도록 하자.


Airflow 설치 - EC2

EC2설치 방법은 데브코스 한기용 강사님의 자료를 참고하자.
https://github.com/keeyong/airflow-setup/blob/main/docs/Airflow%202%20Installation.md


Airflow 설치 - Docker

자세한 내용은 https://github.com/keeyong/airflow-setup/blob/main/docs/Airflow%20Docker%20Local%20Setup.md 여기서 확인하기

  1. 먼저 Docker를 설치하도록 하자. 자신의 운영체제에 맞는 docker desktop을 선택해서 설치
    https://docs.docker.com/desktop/

  1. 도커 설치가 완료되면 적당한 디렉토리에 airflow-setup git repository를 clone
git clone https://github.com/keeyong/airflow-setup.git

 

  1. airflow-setup 폴더로 이동하고 2.5.1 이미지 관련 yml 파일을 다운로드
cd airflow-setup
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

 

  1. docker 파일을 docker-compose 명령어로 pull한 뒤 up까지 완료. pull과 up은 꽤 많은 시간이 소요되기 때문에 천천히 기다리자.

나는 up 명령어를 실행 시PermissionError: [Errno 13] Permission denied: '/usr/local/airflow/logs/scheduler 오류가 발생했는데, 오류를 해결하려 구글링하다 airflow 공식문서에서 다음과 같은 글을 찾아볼 수 있다.

On Linux, the mounted volumes in container use the native Linux filesystem user/group permissions, so you have to make sure the container and host computer have matching file permissions.

윈도우 환경에서 진행한다면 wsl을 이용한 linux 환경을 사용해야 될텐데 이 linux의 파일시스템의 권한을 사용하는 pc의 권한과 같게 설정해줘야 한다고 나와있다.

echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env

이 명령어를 실행한 후 다시 up 명령어를 실행하면 문제없이 실행될 것이다.

docker-compose -f docker-compose.yaml pull
docker-compose -f docker-compose.yaml up

 

  1. 모두 완료되었다면 docker desktop에서 airflow-setup 컨테이너가 올라간 것을 볼 수 있을것이다.

이후 브라우저에서 localhost:8080 접속하면 airflow 웹 UI를 만날 수 있다.

초기 설정으로 airflow - airflow를 입력하고 Sign in 해서 문제없이 접속했다면 성공적으로 Airflow 설치가 완료된 것


Airflow 기본 프로그램 실행

Airflow 코드의 기본 구조는 다음과 같다.

  1. DAG 대표하는 객체를 먼저 만들기

    • DAG 이름, 실행주기, 실행날짜, 오너 등등
  2. 다음으로 DAG를 구성하는 태스크들을 만들기

    • 태스크별로 적합한 오퍼레이터를 선택
    • 태스크 ID를 부여하고 해야할 작업의 세부사항을 지정
  3. 최종적으로 태스크들간의 실행 순서를 결정


DAG 설정 예제

  1. DAG 객체를 만들기 전에 모든 태스크들에 공통으로 적용되는 설정을 딕셔너리를 이용해서 먼저 만들어 주자.
from datetime import datetime, timedelta

default_args = {
	'owner' : 'sangwon',
    'email' : 'jwa4610@gmail.com',
    # 태스크 실패 시 재시도 횟수
    'retries' : 1,
    # 재시도할 대 얼마나 기다릴지 설정
    'retry_delay' : timedelta(minutes = 3),
}

 

  1. DAG 객체 만들기
from airflow import DAG

test_dag = DAG(
	"dag_v1", # DAG 이름
    start_date = datetime(2024,8,7,hour=0,minute=00), # 시작 시간
    schedule = "0 * * * *",
    tags = ["example"],
    catchup = False,
    
    # common settings, 위에 만든 딕셔너리 사용
    default_args = default_args
)
  • schedule 값은 다음과 같은 표를 보고 설정하자. "0 * * * *" 은 모든 0분에 이 DAG를 실행한다는 의미이다. 즉, 한 시간에 한번 씩 실행된다는 뜻
  • catchup은 만약 start_date가 현재보다 과거의 시간일 경우, airflow는 start_date부터 현재까지의 시간 GAP 만큼 이 DAG를 실행하려고 한다(catchup). 기본값은 True이기 때문에 만약 이를 원치 않는다면 False로 세팅해줘야 한다. (권장)

 

  1. Task 만들기

예제로 만들어볼 것은 t1, t2, t3라는 3개의 태스크로 구성하려고 한다. 오퍼레이터는 BashOperator를 사용

  • t1 : 현재 시간 출력
  • t2 : 5초간 대기 후 종료
  • t3 : 서버의 /tmp 디렉토리의 내용 출력

t1이 끝나고 t2t3를 병렬로 실행하도록 태스크를 만들어 보자.

from airflow.operators.bash import BashOperator

t1 = BashOperator(
	task_id = 'print_date',
    bash_command = 'date',
    dag = test_dag
)

t2 = BashOperator(
	task_id = 'sleep',
    bash_command = 'sleep 5',
    dag = test_dag
)

t3 = BashOperator(
	task_id = 'ls',
    bash_command = 'ls /tmp',
    dag = test_dag
)

t1 >> [t2,t3]
  • 전체코드
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
	'owner' : 'sangwon',
    'email' : 'jwa4610@gmail.com',
    # 태스크 실패 시 재시도 횟수
    'retries' : 1,
    # 재시도할 대 얼마나 기다릴지 설정
    'retry_delay' : timedelta(minutes = 3),
}

test_dag = DAG(
	"dag_v1", # DAG 이름
    start_date = datetime(2024,8,7,hour=0,minute=00), # 시작 시간
    schedule = "0 * * * *",
    tags = ["example"],
    catchup = False,
    
    # common settings, 위에 만든 딕셔너리 사용
    default_args = default_args
)

t1 = BashOperator(
	task_id = 'print_date',
    bash_command = 'date',
    dag = test_dag
)

t2 = BashOperator(
	task_id = 'sleep',
    bash_command = 'sleep 5',
    dag = test_dag
)

t3 = BashOperator(
	task_id = 'ls',
    bash_command = 'ls /tmp',
    dag = test_dag
)

t1 >> [t2,t3]

 

  1. web ui에서 실행하기

위의 git repository를 클론해서 airflow를 실행했다면 이미 dag_v1이 추가가 되어있을 것이다. 이걸 실행해보자.

상세한 정보를 보기위해 dag_v1을 클릭해서 들어가보면 다음 화면에서 여러가지 정보를 확인할 수 있다.

각각의 태스크 들이 어떻게 실행되는 지 알고싶다면 TASK 실행 현황 안에 태스크 이름을 클릭하면 자세한 정보를 볼 수 있다.

  1. CLI 환경에서 로그인하고 명령 실행하기

먼저 docker ps 명령어를 이용하여 airflow scheduler의 컨테이너 id를 알아내자

알아낸 스케줄러의 id로 docker exec -it [id] sh 명령어를 실행하면 airflow 명령어를 사용할 수 있다.

  1. airflow dags list : DAG 목록 출력
  1. airflow tasks list dag_v1 : 특정 DAG의 TASK 출력

  2. airflow tasks test dag_v1 print_date 2024-05-22 : 특정 TASK(print_date) 실행해보기


0개의 댓글