def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO jaeho.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
try:
cur.execute("BEGIN;")
cur.execute("DELETE FROM jaeho.name_gender;")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO jaeho.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
cur.execute("COMMIT;") # 혹은 "END;"로 수정해도 괜찮.
except Exception as e:
print(e)
cur.execute("ROLLBACK;")
raise
트랜잭션이란?
- 은행 계좌 입/출금과 같이 중간에 실패하면 불완전한 상황에 놓이는 작업에 활용함.
- 따라서 여러 작업들을 하나의 작업처럼 (atomic)하게 처리하는 개념.
- SQL에서는 "BEGIN END/COMMIT"과 ROLLBACK을 활용하여 구현 가능.
- python에서는 try/catch로 구현 가능.
- psycopg2에서, autocommit을 통해 자동으로 커밋하게 될 지 구현 가능. True일 경우 자동으로 커밋되며, False일 경우 .commit() 함수가 필요.
- try/except 시에 raise를 추가해 주는 것이 좋음.
git clone https://github.com/keeyong/airflow-setup.git
실행.cd airflow-setup
와 curl -LfO https://airflow.apache.org/docs/apache-airflow/{버전}/docker-compose.yaml
실행. docker-compose -f docker-compose.yaml pull
와 docker-compose -f docker-compose.yaml up
실행.리눅스 소개.
- 우분투 : 리눅스 타입 중의 하나.
- ssh : 리눅스 혹은 유닉스 서버에 로그인해 주는 프로그램(터미널).
- sudo : 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램.
- apt-get : 우분투/데비안 계열의 리눅스에서 프로그램 설치/삭제를 관리해 주는 프로그램.
- su : substitute user의 약자로 현재 사용 중인 사용자 계정에서 다른 사용자의 권한을 얻을 때 사용.
- vi : 텍스트 에디터.
DAG 객체를 먼저 만듦.
DAG 안에 task를 만듦.
최종적으로 테스크 간의 실행 순서를 결정.
from datetime import datetime, timedelta
default_args = {
'owner': 'jaeho',
'email': ['jaeho@gmail.com'],
'retries': 1, # 실패 시 최대 재시도 횟수.
'retry_delay': timedelta(minutes=3), # 재시도 시 딜레이 타임.
'on_failure_callback': func, # 실패 시 다음 함수 실행.
'on_success_callback': func, # 성공 시 다음 함수 실행.
}
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchUp=False, # start_date(과거)와 현재 날짜 사이에 대해서 실행이 안 된 작업 수행 X.
# common settings
default_args=default_args
)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'jaeho',
'start_date': datetime(2023, 5, 27, hour=0, minute=00),
'email': ['jaeho@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1", # DAG name
schedule="0 9 * * *",
tags=['test'],
catchUp=False,
default_args=default_args
)
# t1 : 날짜 출력 테스크
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
# t2 : 5초 간 슬립 테스크
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
# t3 : tmp 디렉토리 파일 조회 테스크
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
# t1 실행 후, t2와 t3 동시 실행.
t1 >> [ t2, t3 ]
Airflow 웹 UI에서 dag_v1을 활성화 한다.
만약 또 실행시키고 싶다면 런 버튼을 클릭한다.
각 테스크가 잘 실행된 것을 확인할 수 있다.
테스크의 실행 절차를 확인 가능하다.
특정 테스크의 상세 정보를 보고 싶으면, 녹색 아이콘을 클릭한다.
airflow dags list
: airflow 내의 모든 dag들의 정보(dag_id, filepath, owner, paused)를 출력.airflow tasks list {dag 아이디}
: 해당 dag의 모든 테스크 출력.airflow tasks test {dag 아이디} {task 아이디} {날짜}
: 특정 테스크 실행.docker ps
명령어로 컨테이너들의 아이디 조회.docker exec -it 3199a1254b38 sh
명령어로 쉘 스크립트 실행.