
- 숙제 리뷰
- Airflow 설치
- Airflow 기본 프로그램 실행
저번 시간에 작성한 ETL 코드에서는 SQL Transaction을 고려하지 않아서 이 부분을 수정할 필요가 있다.
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
schema = "jwa4610"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
cur.execute(f"DELETE FROM {schema}.name_gender;")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# 에러 시 ROLLBACK
# except에서 raise 호출 시 발생한 원래 exception이 위로 전파, ETL 관리 입장에서 어떤 에러가 명확하게 드러나는 것이 더 좋음
cur.execute("ROLLBACK;")
raise
Airflow를 설치하고 활용하는 방법으로는 크게 2가지로 나눌 수 있다.
2번인 클라우드 사용은 기본적으로 서버를 3대이상 사용하기 때문에 개인이 사용하기엔 부담이 될 수 있다. 따라서 우리는 이번 실습에서 직접 설치하고 운영하는 방법으로 진행해보자.
직접 설치하는 방법도 다음과 같은 2가지 방법으로 진행할 수 있다.
EC2 사용 방법도 완전히 무료는 아니기 때문에 (한달에 18$ 정도?) 자신의 상황에 맞춰서 결정하도록 하자.
EC2설치 방법은 데브코스 한기용 강사님의 자료를 참고하자.
https://github.com/keeyong/airflow-setup/blob/main/docs/Airflow%202%20Installation.md
자세한 내용은 https://github.com/keeyong/airflow-setup/blob/main/docs/Airflow%20Docker%20Local%20Setup.md 여기서 확인하기

airflow-setup git repository를 clonegit clone https://github.com/keeyong/airflow-setup.git
cd airflow-setup
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
docker-compose 명령어로 pull한 뒤 up까지 완료. pull과 up은 꽤 많은 시간이 소요되기 때문에 천천히 기다리자.나는 up 명령어를 실행 시PermissionError: [Errno 13] Permission denied: '/usr/local/airflow/logs/scheduler 오류가 발생했는데, 오류를 해결하려 구글링하다 airflow 공식문서에서 다음과 같은 글을 찾아볼 수 있다.
On Linux, the mounted volumes in container use the native Linux filesystem user/group permissions, so you have to make sure the container and host computer have matching file permissions.
윈도우 환경에서 진행한다면 wsl을 이용한 linux 환경을 사용해야 될텐데 이 linux의 파일시스템의 권한을 사용하는 pc의 권한과 같게 설정해줘야 한다고 나와있다.
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
이 명령어를 실행한 후 다시 up 명령어를 실행하면 문제없이 실행될 것이다.
docker-compose -f docker-compose.yaml pull
docker-compose -f docker-compose.yaml up




이후 브라우저에서 localhost:8080 접속하면 airflow 웹 UI를 만날 수 있다.

초기 설정으로 airflow - airflow를 입력하고 Sign in 해서 문제없이 접속했다면 성공적으로 Airflow 설치가 완료된 것

Airflow 코드의 기본 구조는 다음과 같다.
DAG 대표하는 객체를 먼저 만들기
다음으로 DAG를 구성하는 태스크들을 만들기
최종적으로 태스크들간의 실행 순서를 결정
from datetime import datetime, timedelta
default_args = {
'owner' : 'sangwon',
'email' : 'jwa4610@gmail.com',
# 태스크 실패 시 재시도 횟수
'retries' : 1,
# 재시도할 대 얼마나 기다릴지 설정
'retry_delay' : timedelta(minutes = 3),
}
from airflow import DAG
test_dag = DAG(
"dag_v1", # DAG 이름
start_date = datetime(2024,8,7,hour=0,minute=00), # 시작 시간
schedule = "0 * * * *",
tags = ["example"],
catchup = False,
# common settings, 위에 만든 딕셔너리 사용
default_args = default_args
)
"0 * * * *" 은 모든 0분에 이 DAG를 실행한다는 의미이다. 즉, 한 시간에 한번 씩 실행된다는 뜻
예제로 만들어볼 것은 t1, t2, t3라는 3개의 태스크로 구성하려고 한다. 오퍼레이터는 BashOperator를 사용
t1 : 현재 시간 출력t2 : 5초간 대기 후 종료t3 : 서버의 /tmp 디렉토리의 내용 출력t1이 끝나고 t2와 t3를 병렬로 실행하도록 태스크를 만들어 보자.
from airflow.operators.bash import BashOperator
t1 = BashOperator(
task_id = 'print_date',
bash_command = 'date',
dag = test_dag
)
t2 = BashOperator(
task_id = 'sleep',
bash_command = 'sleep 5',
dag = test_dag
)
t3 = BashOperator(
task_id = 'ls',
bash_command = 'ls /tmp',
dag = test_dag
)
t1 >> [t2,t3]
- 전체코드
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner' : 'sangwon',
'email' : 'jwa4610@gmail.com',
# 태스크 실패 시 재시도 횟수
'retries' : 1,
# 재시도할 대 얼마나 기다릴지 설정
'retry_delay' : timedelta(minutes = 3),
}
test_dag = DAG(
"dag_v1", # DAG 이름
start_date = datetime(2024,8,7,hour=0,minute=00), # 시작 시간
schedule = "0 * * * *",
tags = ["example"],
catchup = False,
# common settings, 위에 만든 딕셔너리 사용
default_args = default_args
)
t1 = BashOperator(
task_id = 'print_date',
bash_command = 'date',
dag = test_dag
)
t2 = BashOperator(
task_id = 'sleep',
bash_command = 'sleep 5',
dag = test_dag
)
t3 = BashOperator(
task_id = 'ls',
bash_command = 'ls /tmp',
dag = test_dag
)
t1 >> [t2,t3]
위의 git repository를 클론해서 airflow를 실행했다면 이미 dag_v1이 추가가 되어있을 것이다. 이걸 실행해보자.

상세한 정보를 보기위해 dag_v1을 클릭해서 들어가보면 다음 화면에서 여러가지 정보를 확인할 수 있다.

각각의 태스크 들이 어떻게 실행되는 지 알고싶다면 TASK 실행 현황 안에 태스크 이름을 클릭하면 자세한 정보를 볼 수 있다.

먼저 docker ps 명령어를 이용하여 airflow scheduler의 컨테이너 id를 알아내자

알아낸 스케줄러의 id로 docker exec -it [id] sh 명령어를 실행하면 airflow 명령어를 사용할 수 있다.
airflow dags list : DAG 목록 출력
airflow tasks list dag_v1 : 특정 DAG의 TASK 출력

airflow tasks test dag_v1 print_date 2024-05-22 : 특정 TASK(print_date) 실행해보기
