DevCourse TIL Day4 Week10

김태준·2023년 6월 8일
0

Data Enginnering DevCourse

목록 보기
44/93
post-thumbnail

어제에 이어 윈도우 환경에서 도커 기반 airflow 설치 진행..
친구 MAC으로는 한번에 진행이 되던데 윈도우 환경에서는 왜인지 계속해서 VALUEERROR가 발생한다...

어제에 이어 오늘은 Airflow 설치하고 밀린 과제도 진행하려 한다.

✅ Airflow

airflow : airflow 관련 디렉토리와 설정 파일 등을 담을 workspace
dags : DAG 파일들이 위치할 곳. 즉 파이썬 스크립트 .py를 여기에 저장하면 된다.
logs : task 실행 및 스케줄러의 로그가 담겨진다.
plugins : 사용자 정의 플러그인을 여기에 넣을 수 있다.

협업으로 Airflow를 사용하기 위해선 AWS EC2를 활용해 Airflow를 설치해야 한다.

지웠다 깔았다 무한반복으로 안되던 와중 아래 코드를 실행하고 yaml.up 진행하니 해결했다.

pip3 install apache-airflow
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env

위 명령어는 echo -e 명령어로 텍스트를 출력하고 "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" 를 통해 AIRFLOW_GID 변수를 0으로 설정하여 User로 인식하도록 만든 후 .env파일을 생성해 env파일 내 현 사용자의 UID, GID를 설정하도록 해준다.

env파일로 환경 변수나 설정 파일 등을 사용할 수 있고 permission, processor문제 등은 위 명령어를 실행시켜 사용한다.

그 결과 드디어 해결 완료...!

추가로 찾아봤을 때 유용했던 자료
Airflow 설치

✅ DAG 설정

default_args {
}
와 같은 dic 형태로 모든 태스크에 공통으로 지정되는 설정을 처리해준다.
이때 사용되는 인자의 예시는 'owner', 'email', 'retries', 'retry_delay' 등이 있다.
추가로 on_failure_collback, on_success_callback 등 callback에 대해 인자 설정도 가능하다.

DAG 생성 예시는 다음과 같다.

from airflow import DAG

dag = DAG(
	"dag_name"=datetime(2020,8,7,hour=0,minute=00),
    # schedule은 unix.cron 형태
    schedule="0 * * * *",
    tags=["example"],
    # start_date, end_date 기준 dag 실행여부 catchup
    catchup=False,
    # common settings (앞서 설정한 모든 태스크에 적용되는 기본 default 인자들)
    default_args=default_args
)
    

🎈 Bash Operator 예제

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
	'owner':'taejun',
    'start_date':datetime(2023,6,8,hour=0,minute=00),
    'email':['taejun3305@gmail.com'],
    'retries':1,
    'retry_delay':timedelta(minutes=3),
}
test_dag=DAG(
	'dag_v1',
    schedule="0 9 * * *",
    tags=['test'],
    catchuUp=False,
    default_args=default_args
)
t1 = BashOperator(
	# 현재 시간 출력
	task_id='print_date',
    bash_command='date',
    dag=test_dag)
t2 = BashOperator(
	# 5초 대기 후 종료
    task_id='sleep',
    bash_command='sleep 5',
    dag=test_dag)
t3 = BashOperator(
	# 서버의 /tmp 디렉토리 내용 출력
	task_id='ls',
    bash_command='ls /tmp',
    dag=test_dag)
# t1 종료 후 t2, t3 병렬 진행
t1 >> [t2, t3]

🎈 How to Trigger a DAG (Terminal)

Airflow 서버에 들어가면 Grid를 통해 해당 DAG 실행여부 및 Success 여부를 확인할 수 있다.
또한, Graph를 통해 각 태스크의 실행 순서를 알 수 있고 code를 통해 각 task가 실행되는 python 코드를 확인할 수 있다.

🧨 아래는 command 터미널에서 명령어로 EC2 서버에 로그인 하는 과정

  • ssh -i airflow-dev.pem ubuntu@ec2-15-165-39-92.ap-northeast--2.compute.amazonaws.com
  • sudo su airflow

이후 Airflow 서버에서 로그인 후 다음 명령 실행
DAG 목록 확인 (paused : True여부로 활성화 확인)
-> airflow dags list
DAG 구성하는 TASK ID 확인 가능
-> airflow tasks list DAG_NAME
TASK 내부에서 확인
-> airflow tasks test DAG_NAME Task_NAME DATE

docker로 확인하는 방법
컨테이너 ID 확인
-> docker ps
id cpoy&paste 후 컨테이너 내부 진입
-> docker exec -it ID sh
이후 위에서 실행한 명령어 그대로 진행
만일, root로 로그인을 원한다면 docker exec -u 0 -it ID sh를 실행한다
이때 MySQLdb에러 발생하면 아래 코드 입력
sudo apt-get update
sudo apt-get install default-libmysqlclient-dev build-essential
이후 airflow 유저로 로그인 후 아래 명령 실행
pip install mysqlclient

✅ Airflow 코드 작성

Airflow Decorators : 프로그래밍을 더 단순하게 진행
@task 데코레이터 지정 후 아래 줄부터 def 함수명을 진행한다면 함수명이 TASK_ID가 된다.

🎈 DAG_PARAMETER

  • max_active_runs : # of DAGs instance
  • max_active_tasks : # of tasks that can run in parallel
  • catchup : whether to backfill past runs
    -> DAG parameter와 TASK parameter의 차이를 이해하는 것이 중요하다.
    코드 실행은 다음과 같이 진행
    with DAG(
    dag_id = '파이썬 파일명',
    start_date = datetime(년, 월, 일),
    catchup여부
    tags
    schedule=unix.cron형태
    ) as dag:
    로 with 파이썬 문 활용
  • 🎈 Xcom?
    task(operator)간 데이터를 주고 받기 위한 방식
    보통 한 operator의 리턴값을 타 operator에서 읽어가는 형태가 됨
    이 값들은 Airflow 메타 데이터 DB에 저장되므로 큰 데이터 주고받기엔 사용불가

🎈 DAG에서 task를 어느 정도로 분리?
-> 오래 걸리는 DAG에 한해 실패 시 재실행이 쉽도록 쉽게 다수의 task로 분리하는 것이 좋음

💡 Airflow서버 상에서 DAG Import Error 문제 발생 시 admin -> variable (redshift에 연결하려는 csv 링크 입력), connection (host, user, password 입력해 연결)
-> 코드 상 from airflow import DAG 문제 해결 완료

💯 airflow.cfg Quiz

  1. DAGs 폴더는 어디에 지정?
    -> 실습 상에선 airflow-setup 내 하위폴더
    -> 기본적으로 Airflow가 설치된 디렉토리 밑의 dags 폴더에 저장되며 dags_folder 키에 저장됌.
  2. DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나? 이 스캔 주기를 결정해주는 키의 이름이 무엇인가?
    -> Airflow 서버에 접속해 DAGs 상에서 앞서 새로만든 Dag가 업로드되었나 확인, 만일 Import Error발생 시 Admin 내 Variables, Connection 연결해주기(기본적으로 5분 주기로 업데이트되며 dag_dir_list_interval 변수로 주기 변경도 가능), 스캔 주기를 결정해주는 키의 이름은 Scheduler
  3. 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어떤 섹션을 변경해야 하는가?
    -> airflow.cfg파일에서 api 섹션을 찾아 구성 옵션 중 auth_backends를 airflow.api.auth.backend.basic_auth로 변경해 API 요청 승인
  4. Variable에서 변수 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까?
    -> passwd, secret, password, apikey, api_key, access_token
  5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해 해야 하는 일은?
    -> Airflow webserver, scheduler restart
    -> sudo systemctl restart airflow-webserver
    -> sudo systemctl restart airflow-scheduler
  6. Metadata DB의 내용을 암호화하는데 사용되는 키는?
    -> fernet_key

✅ DAG 실습

Airflow를 API 형태로 외부에서 조작하는 방법 예제로 학습!
-> Yahoo Finance

  • Full Refresh로 구현 : 매번 테이블을 새로 만들어 구성하는 방법
  1. cd dags로 접근
  2. docker ps로 worker id 알아낸 후 docker exec -it worker_id sh진행
    2-1. root user로 접근? : docker exec --user -it worker_id sh진행

DAG 코드 직접 작성하여 git에 clone후 Airflow에 올리기
링크텍스트
ex) DAG를 파이썬 파일로 생성이 끝났다면, Windows에서 wsl2로 파일을 업로드 하기 위해선 다음 과정을 거친다.
1. 윈도우 로컬 환경 내 옮기고자 하는 파일 위치로 이동
-> $ /mnt/c/~~
2. wsl 디렉토리로 파일 복사 진행
cp -f 파일명 /home/taejun/~~/저장하고자 하는 파일명
이때 cp명령어 옵션은 다음과 같다.

  • -r : 디렉토리 복사, 하위 디렉토리와 파일 모두 복사
  • -p : 원본파일 소유주, 그룹, 권한, 시간 정보 보존하면서 복사
  • -i : 복사 대상파일 있는 경우 복사 실행여부를 사용자에게 물음
  • -f : 복사 대상파일 있는 경우 사용자 확인없이 강제 복사
profile
To be a DataScientist

0개의 댓글