MWAA (Managed Workflows for Apache Airflow)
사용Cloud Composer
사용Azure Data Factory
에 Airflow DAGs
기능 존재substitue user
의 약자로 현재 사용 중인 사용자 계정을 로그아웃하지 않고 다른 사용자의 권한을 얻을 때 사용하며 sudo와 함께 사용된다. sudo su postgres 혹은 sudo su aws 명령어가 사용된다.1. EC2 인스턴스 생성
EC2
인스턴스를 Ubuntu Server 20.04
를 선택해 준다.t3.small
이나 t3a.small
을 선택해 준다.Key pair
를 생성해 주어야 한다. 만약 기존 사용 중인 Key pair
가 존재한다면 해당 Key pair
를 사용해도 된다.Security group
은 Port 번호 24와 8080가 오픈되어야 한다. 2. SSH 로그인
EC2
의 DNS
서버 주소를 복사해서 다음과 같은 명령어에 붙인다.ssh -i airflow-dev.pem DNS_SERVER
chmod
, 윈도우는 file property
를 수정해 준다. ppk의 경우 putty key generator를 사용해 준다. 이건 같은 상황을 겪었을 때 Trouble Shooting에 대해 포스팅을 한 적이 있다. 🔑 chmod 명령어 Window에서 사용 시 해결 방법sudo apt-get update
3. python 3.0 설치
wget https://bootstrap.pypa.io/get-pip.py
sudo python3 get-pip.py
sudo apt-get install -y python3-pip
sudo pip3 install pyopenssl --upgrade
4. airflow 설치
sudo apt-get install -y libmysqlclient-dev
sudo pip3 install --ignore-installed "apache-airflow[celery,amazon,mysql,postgres]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"
sudo pip3 install oauth2client gspread numpy pandas
5. airflow 계정 생성
sudo groupadd airflow
sudo useradd -s /bin/bash airflow -g airflow -d /var/lib/airflow -m
6. Postgres 설치
Postgres
를 사용하기 위해 Postgres
를 설치해 준다.sudo apt-get install -y postgresql postgresql-contrib
Postgres
사용자로 재로그인해 준 후에 Postgres shell
을 실행한 다음 airflow라는 이름의 계정을 생성한다. $ sudo su postgres
$ psql
psql (10.12 (Ubuntu 10.12-0ubuntu0.18.04.1))
Type "help" for help.
postgres=# CREATE USER airflow PASSWORD 'airflow';
CREATE ROLE
postgres=# CREATE DATABASE airflow;
CREATE DATABASE
postgres=# \q
$ exit -- 이를 사용하면 postgres shell에서 나갈 수 있다
Postgres
재시작해 준다.sudo service postgresql restart
7. Airflow 기본 환경 세팅
sqlite
기 때문에 다음과 같이 Postgres
로 변경해 주어야 한다.SequentialExecutor
인데 Sequential Executor
지만 제약이 많기 때문에 주로 LocalExecutor
로 수정한다.sudo su airflow
$ cd ~/
$ mkdir dags
$ AIRFLOW_HOME=/var/lib/airflow airflow db init
$ ls /var/lib/airflow
airflow.cfg airflow.db dags logs unittests.cfg
Airflow
환경 파일에 접근해 주어 앞서 말했던 executor
과 DB
연결 스트링인 sql_alchemy_conn
을 수정해 보자.[core]
...
executor = LocalExecutor
...
[database]
...
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
...
AIRFLOW_HOME=/var/lib/airflow airflow db init
Docker Desktop
을 설치해 주어야 한다. 현재 링크는 windows로 되어 있지만 OS 환경에 맞추어 설치가 가능하다.cd airflow-setup
이라는 명령어를 통해 폴더를 이동해 준다.yaml
파일을 다운로드한다. docker-compose.yaml
파일은 Docker
의 세팅 환경이다.curl -LfO "https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml"
해당 명령어로 yaml 파일을 다운받는다.docker-compose -f docker-compose.yaml pull
명령어를 통해 이미지를 다운로드 받을 수 있다.docker-compose -f docker-compose.yaml up
명령어를 통해 컨테이너를 실행해 준다.default_arg
를 먼저 설정해 주어야 한다. 이때 dictionary 구조로 구현한다.from datetime import datetime, timedelta
default_args = {
'owner': 'ssong',
'email': ['ssong@naver.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
나올 수 있는 설정 값이 많은데 on_failure_callback
과 on_success_callback
을 통해 실패, 성공 여부에 따라 행할 이벤트(이메일로 알림을 보내는 등)를 설정해 줄 수 있다.
메인 DAG를 설정해 준다.
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
catchup
이란? catchup
이 Truecatchup
이 True이면 start_date와 오늘 사이에 실행이 안 된 날짜에 대해 DAG를 실행Full Refresh
DAG라면 False로 설정print_date
를 누르면 print_date
이 어떻게 실행되었는지 확인할 수 있다. (여기서 print_date는 태스크이다.)Clear
를 선택해 주면 DAG를 재실행하게 된다.Log
를 선택하여 확인할 수 있다.docker ps
라는 명령어를 통해 docker
의 CONTAINER ID
를 확인해야 한다.scheduler
가 들어간 CONTAINER ID
를 확인한 후 docker exec -it CONTAINER ID sh
를 실행해 준다.(airflow)
가 뜨면 뒤에 필요한 코드를 입력해 주면 된다. 만약 DAG_V1의 태스크들을 알고 싶다면 airflow tasks list dag_v1
을 입력해 준다.airflow tasks test dag_v1 print_date 2023-06-08
을 해 준다.1. Executor 개념, 왜 Sequential Executor은 제약이 많을까?
- 개인적으로
Executor
의 개념이 잘 이해가 가지 않아 추가적으로 공부해 보았다.Executor
란airflow
가 일을 처리하는 방식 즉, 작업 방식을 결정 짓는 것이다.- 예를 들어, 이런 DAG가 있다고 가정해 본다면 task_0에서 task_3까지는 의존 관계가 없기 때문에 병렬적으로 수행되는 게 더 효율적일 것이다. 물론 start >> task_0 >> task_1 >> task_2 >> task_3 >> end로 간다면 덜 리소스를 사용한다는 장점은 있겠지만 시간적인 측면에서는 더 오랜 시간이 걸리게 될 것이다.
- 이와 같이 어떻게 작업을 수행할지 방법을 지정해 주는 것이
Executor
이다.- 그렇다면 왜 default인
Sequential Executor
은 제약이 많을까?Sequential Executor
는 병렬 프로세스를 적용할 수 없다. 병렬적으로 수행이 가능한 항목들도 순차적으로 진행이 되기 때문에 속도가 느리다.
2. Docker Engine Stopped
- 아직 실습도 안 했고 그냥 airflow만 실행하는데 갑자기 Docker Engine Stopped 상태가 돼서 많이 당황했다.
- 분명 실행됐었는데 다시 처음부터 진행해 보려 하니 다음과 같은 오류가 발생하였다.
error during connect: in the default daemon configuration on Windows, the docker client must be run with elevated privileges to connect
- 다른 블로그 글들을 찾았는데 WSL 업데이트를 하며 해결했다고 하길래 WSL를 다시 다운로드 하려고 했더니 역시나 어제 최신 버전을 받아서 그냥 가장 최근 버전이라는 안내만 떴다.
- 해당 에러 검색해 봤는데 도커 커넥션 문제니까 도커를 켜고 재실행하면 된다고 했다.
- 그냥 로딩만 계속 돌아가고 RESTART 버튼이 활성화가 안 되는데요.
- 해당 문제에 대해 대부분 대책으로 Docker Desktop부터 새로 받길래 최종 보류로 두고 혹시나 하는 마음에 PC 재부팅 하니까 다시 RESTART가 됐다.
- 또 같은 현상이 일어나면 원인을 찾아 봐야 할 것 같다.
- 아무튼 C 드라이브 다 정리해 주고 20 GB 확보한 뒤에 드디어 만난 airflow localhost 화면.
- 이거는 트러블 슈팅이라고 할 수도 없어서 따로 빼 두었다.