try:
cur.execute(create_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
# raise 하여 에러 발생이 필요하다.
: 이 경우에 에러가 발생하면 아무것도 알 수 없기 때문에 주의해야한다.메타데이터베이스는 Postgres사용
Airflow 설치 경로 : /var/lib/airflow/
Airflow 서버에 3개의 어카운트가 사용된다.
이번 과정에서는 Docker에 설치 시 git에 준비된 레포를 활용하여 설치하였다. 이외에 설치 방법을 간단하게 정리하려한다.
apache-airflow 설치 : pip install "apache-airflow[celery]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.2/docker-compose.yaml'
환경 변수와 dags 폴더 생성
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
AIRFLOW_UID=50000
를 .env에 붙여넣는다.초기화 후 실행
docker compose up airflow-init
docker compose up
참고) Running Airflow in Docker — Airflow Documentation
ssh - i airflow-dev.pem ubuntu@<복사한 IPv4 address>
chmod 600 airflow-dev.pem
⇒ 권한을 줄여야한다.sudo apt-get update
를 통해 apt-get 업데이트 진행wget https://bootstarp.pypa.io/get-pip.py
sudo python3 get-pip.py
sudo apt-get install -y python3-pip
sudo pip3 install pyopenssl --upgrade
sudo apt-get install -y libmysqlclient-dev
pip install "apache-airflow[celery,amazon,mysql,postgres]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"
위 코드를 활용해서 버전과 모듈을 설치한다.sudo groupadd airflow
sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m
sudo apt-get install -y postgresql postgresql-contrib
sudo su postgres
psql
로 접속CREATE USER airflow PASSWORD 'airflow';
CREATE DATABASE airflow;
sudo service postgresql restart
sudo su airflow
cd ~/
mkdir dags
로 dag들이 만들어질 폴더 생성AIRFLOW_HOME = /var/lib/airflow airflow db init
postgresql+psycopg2: //airflow:airflow@localhost:5432/airflow
로 변경한다.AIRFLOW_HOME = /var/lib/airflow airflow db init
웹서버를 서비스로 등록: sudo vi /etc/systemd/system/airflow-webserver.service
파일의 내용을 다음과 같이 채워준다.
[Unit]
Description=Airflow webserver
After=network.target
[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow webserver -p 8080
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target
스케쥴러 서비스 등록
[Unit]
Description=Airflow scheduler
After=network.target
[Service]
Environment=AIRFLOW_HOME=/var/lib/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow scheduler
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target
서비스 활성화
sudo systemctl daemon-reload
sudo systemctl enable airflow-webserver
sudo systemctl enable airflow-scheduler
# 서비스 시작
sudo systemctl start airflow-webserver
sudo systemctl start airflow-scheduler
sudo su airflow
AIRFLOW_HOME=/var/lib/airflow airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password adminpassword
AIRFLOW_HOME=/var/lib/airflow airflow users delete --username admin
from datetime import datetime,timedelta
default_args = {
'owner' : 'poriz',
'retries': 1,
...
}
dag = DAG(
"dag_v1" # DAG name
start_date= datetime(2020,1,1,hour=0,minute=00),
schedule = "0 * * * *"
catchup = False
)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# default_args 설정
default_args{
'owner' : 'poriz',
'start_date' : datetime(2023,1,1,hour=0,minute=00),
...
}
# dag 정의
test_dag = DAG(
"dag_v1" # DAG name
start_date= datetime(2020,1,1,hour=0,minute=00),
schedule = "0 * * * *"
catchup = False
)
# task 생성
t1 = BashOperator(
task_id = 'print_date',
# bash 명령어 사용
bash_command='date',
dag = test_dag)
t2 = BashOperator(
task_id = 'sleep',
bash_command='sleep 5',
dag = test_dag)
t3 = BashOperator(
task_id = 'ls',
bash_command='ls /tmp',
dag = test_dag)
t1 >> [t2,t3]