어제에 이어 윈도우 환경에서 도커 기반 airflow 설치 진행..
친구 MAC으로는 한번에 진행이 되던데 윈도우 환경에서는 왜인지 계속해서 VALUEERROR가 발생한다...
어제에 이어 오늘은 Airflow 설치하고 밀린 과제도 진행하려 한다.
airflow : airflow 관련 디렉토리와 설정 파일 등을 담을 workspace
dags : DAG 파일들이 위치할 곳. 즉 파이썬 스크립트 .py를 여기에 저장하면 된다.
logs : task 실행 및 스케줄러의 로그가 담겨진다.
plugins : 사용자 정의 플러그인을 여기에 넣을 수 있다.협업으로 Airflow를 사용하기 위해선 AWS EC2를 활용해 Airflow를 설치해야 한다.
지웠다 깔았다 무한반복으로 안되던 와중 아래 코드를 실행하고 yaml.up 진행하니 해결했다.
pip3 install apache-airflow
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env위 명령어는 echo -e 명령어로 텍스트를 출력하고 "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" 를 통해 AIRFLOW_GID 변수를 0으로 설정하여 User로 인식하도록 만든 후 .env파일을 생성해 env파일 내 현 사용자의 UID, GID를 설정하도록 해준다.
env파일로 환경 변수나 설정 파일 등을 사용할 수 있고 permission, processor문제 등은 위 명령어를 실행시켜 사용한다.
그 결과 드디어 해결 완료...!
추가로 찾아봤을 때 유용했던 자료
Airflow 설치
default_args {
}
와 같은 dic 형태로 모든 태스크에 공통으로 지정되는 설정을 처리해준다.
이때 사용되는 인자의 예시는 'owner', 'email', 'retries', 'retry_delay' 등이 있다.
추가로 on_failure_collback, on_success_callback 등 callback에 대해 인자 설정도 가능하다.DAG 생성 예시는 다음과 같다.
from airflow import DAG
dag = DAG(
"dag_name"=datetime(2020,8,7,hour=0,minute=00),
# schedule은 unix.cron 형태
schedule="0 * * * *",
tags=["example"],
# start_date, end_date 기준 dag 실행여부 catchup
catchup=False,
# common settings (앞서 설정한 모든 태스크에 적용되는 기본 default 인자들)
default_args=default_args
)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner':'taejun',
'start_date':datetime(2023,6,8,hour=0,minute=00),
'email':['taejun3305@gmail.com'],
'retries':1,
'retry_delay':timedelta(minutes=3),
}
test_dag=DAG(
'dag_v1',
schedule="0 9 * * *",
tags=['test'],
catchuUp=False,
default_args=default_args
)
t1 = BashOperator(
# 현재 시간 출력
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
# 5초 대기 후 종료
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
t3 = BashOperator(
# 서버의 /tmp 디렉토리 내용 출력
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
# t1 종료 후 t2, t3 병렬 진행
t1 >> [t2, t3]
Airflow 서버에 들어가면 Grid를 통해 해당 DAG 실행여부 및 Success 여부를 확인할 수 있다.
또한, Graph를 통해 각 태스크의 실행 순서를 알 수 있고 code를 통해 각 task가 실행되는 python 코드를 확인할 수 있다.🧨 아래는 command 터미널에서 명령어로 EC2 서버에 로그인 하는 과정
- ssh -i airflow-dev.pem ubuntu@ec2-15-165-39-92.ap-northeast--2.compute.amazonaws.com
- sudo su airflow
이후 Airflow 서버에서 로그인 후 다음 명령 실행
DAG 목록 확인 (paused : True여부로 활성화 확인)
-> airflow dags list
DAG 구성하는 TASK ID 확인 가능
-> airflow tasks list DAG_NAME
TASK 내부에서 확인
-> airflow tasks test DAG_NAME Task_NAME DATEdocker로 확인하는 방법
컨테이너 ID 확인
-> docker ps
id cpoy&paste 후 컨테이너 내부 진입
-> docker exec -it ID sh
이후 위에서 실행한 명령어 그대로 진행
만일, root로 로그인을 원한다면 docker exec -u 0 -it ID sh를 실행한다
이때 MySQLdb에러 발생하면 아래 코드 입력
sudo apt-get update
sudo apt-get install default-libmysqlclient-dev build-essential
이후 airflow 유저로 로그인 후 아래 명령 실행
pip install mysqlclient
Airflow Decorators : 프로그래밍을 더 단순하게 진행
@task 데코레이터 지정 후 아래 줄부터 def 함수명을 진행한다면 함수명이 TASK_ID가 된다.🎈 DAG_PARAMETER
- max_active_runs : # of DAGs instance
- max_active_tasks : # of tasks that can run in parallel
- catchup : whether to backfill past runs
-> DAG parameter와 TASK parameter의 차이를 이해하는 것이 중요하다.
코드 실행은 다음과 같이 진행
with DAG(
dag_id = '파이썬 파일명',
start_date = datetime(년, 월, 일),
catchup여부
tags
schedule=unix.cron형태
) as dag:
로 with 파이썬 문 활용
- 🎈 Xcom?
task(operator)간 데이터를 주고 받기 위한 방식
보통 한 operator의 리턴값을 타 operator에서 읽어가는 형태가 됨
이 값들은 Airflow 메타 데이터 DB에 저장되므로 큰 데이터 주고받기엔 사용불가🎈 DAG에서 task를 어느 정도로 분리?
-> 오래 걸리는 DAG에 한해 실패 시 재실행이 쉽도록 쉽게 다수의 task로 분리하는 것이 좋음💡 Airflow서버 상에서 DAG Import Error 문제 발생 시 admin -> variable (redshift에 연결하려는 csv 링크 입력), connection (host, user, password 입력해 연결)
-> 코드 상 from airflow import DAG 문제 해결 완료💯 airflow.cfg Quiz
- DAGs 폴더는 어디에 지정?
-> 실습 상에선 airflow-setup 내 하위폴더
-> 기본적으로 Airflow가 설치된 디렉토리 밑의 dags 폴더에 저장되며 dags_folder 키에 저장됌.- DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?
-> Airflow 서버에 접속해 DAGs 상에서 앞서 새로만든 Dag가 업로드되었나 확인, 만일 Import Error발생 시 Admin 내 Variables, Connection 연결해주기(기본적으로 5분 주기로 업데이트되며 dag_dir_list_interval 변수로 주기 변경도 가능), 스캔 주기를 결정해주는 키의 이름은 Scheduler- 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어떤 섹션을 변경해야 하는가?
-> airflow.cfg파일에서 api 섹션을 찾아 구성 옵션 중 auth_backends를 airflow.api.auth.backend.basic_auth로 변경해 API 요청 승인- Variable에서 변수 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까?
-> passwd, secret, password, apikey, api_key, access_token- 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해 해야 하는 일은?
-> Airflow webserver, scheduler restart
-> sudo systemctl restart airflow-webserver
-> sudo systemctl restart airflow-scheduler- Metadata DB의 내용을 암호화하는데 사용되는 키는?
-> fernet_key
Airflow를 API 형태로 외부에서 조작하는 방법 예제로 학습!
-> Yahoo Finance
- Full Refresh로 구현 : 매번 테이블을 새로 만들어 구성하는 방법
- cd dags로 접근
- docker ps로 worker id 알아낸 후 docker exec -it worker_id sh진행
2-1. root user로 접근? : docker exec --user -it worker_id sh진행
DAG 코드 직접 작성하여 git에 clone후 Airflow에 올리기
링크텍스트
ex) DAG를 파이썬 파일로 생성이 끝났다면, Windows에서 wsl2로 파일을 업로드 하기 위해선 다음 과정을 거친다.
1. 윈도우 로컬 환경 내 옮기고자 하는 파일 위치로 이동
-> $ /mnt/c/~~
2. wsl 디렉토리로 파일 복사 진행
cp -f 파일명 /home/taejun/~~/저장하고자 하는 파일명
이때 cp명령어 옵션은 다음과 같다.
- -r : 디렉토리 복사, 하위 디렉토리와 파일 모두 복사
- -p : 원본파일 소유주, 그룹, 권한, 시간 정보 보존하면서 복사
- -i : 복사 대상파일 있는 경우 복사 실행여부를 사용자에게 물음
- -f : 복사 대상파일 있는 경우 사용자 확인없이 강제 복사