5. airflow-day2-5

data_hamster·2023년 6월 9일
0

학습주제
Airflow 기본 프로그램 실행
일반적인 구조 확인

학습내용

ec2 다시 만들어서 했더니 됨.
vpc를 새로 만들어서 연결함.

대그 모듈 임포트
오브젝트 하나 만들고 파라미터 지정
대그의 이름, 자주 실행될 스케줄 파라미터, 언제부터 실행될지 지정, 끝나는 날짜 지정 없으면 지정 안해도 됨.
태스크 일반 설정 딕셔너리로 만듦
대그를 구성하는 태스크들을 오퍼레이터라고 함
ID 지정하고 어떤 오퍼레이터 쓸지 지정, 파라미터 채워주면 됨.
내가 몇개의 태스크로 대그를 구성할 것이며
각 태스크의 할일 지정
지금까지 우린 배쉬 오퍼레이터 봤었음

from datetime import datetime, timedelta

default_args = {
	'owner': 'keeyong',
    'email': ['keeyonghan@hotmail.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=3),
    }

공통이 되는 설정을 함
주인 누구
이메일 뭐고
실패하면 재시도를 몇번?
몇분 기다릴지?
다른 아규먼트 더 적용 가능
on_failure_callback, 실패, 성공시 어떤 함수.
예 슬랙으로 이메일로 보냄.
on_success_callback
태스크가 성공적으로 끝났을 때 무슨 일을 이어서 하고 싶다.
함수를 이 파라미터에 키의 값으로 지정.

from airflow import DAG

dag = DAG(
	"dag_v1", # DAG name
    start_date=datetime(2020,8,7,hour=0,minute=00),
    schedule="0 * * * *",
    tags=["example"],
    catchup=False,
    # common setting
    default_args=default_args
    )

대그
시작날짜
스케줄 0 첫번째 자리가 분, 한시간에 0분에 hourly
tag로는 example을 부여
catchup 좀더 자세히 설명 예정
앞에 만들었던 default_args딕셔너리 집어넣음
스케줄 같은 경우, 실행을 주기적으로 안한다면 None, @once로 세팅 따른 대그가 끝났을 때 트리거하는 형태로 쓸 수 있음.
그냥 매시 영분, 매일 0시0분0초 @daily 이런식으로 세팅 가능

catchup?
시작날짜를 과거로 세팅. 그 사이에 엄청난 갭이 있음. 아무런 세팅을 안하면, 지난 거의 2년 9개월간 실행이 안된 기간이 있는데, 매 시간마다 실행시키려고 함. False로 세팅을 하면 그 갭에 대해 아무 행동을 안함.
기본은 Catchup이 True.
시작날짜가 과거. 오늘 처음 대그를 활성화 했다면 airflow는 실행이 안된 날짜 시간들에 대해서 실행을 하나씩 할 수 있음.
그게 아니라면 catchup은 False
만일 내가 풀리프레쉬로 데이터를 가져온다면 False로 해두는 것이 좋음
2년이 밀렸다고 해서 지금이나, 그때 각각 실행해주나 똑같은
인크리멘탈 업데이트에서 의미가 있음
그 이외의 경우 False

나중에 에어플로우 백필 방식 설명할 때 추가


세개의 태스크로 구성
t3는 ls 명령어를 수행

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
	'owner': 'keeyong',
    'start_date': datetime(2023,5,27, hour=0, minute=00),
    'email':['keeyonghan@hotmail.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=3),
    }
test_dag = DAG(
	"dag_v1", # DAG name
    schedule = "0 9 * * *",
    tags=['test'],
    catchUp=False,
    default_args=default_args
    )
t1 = BashOperator(
	task_id='print_date',
    bash_command='date',
    dag=test_dag)

t2 = BashOperator(
	task_id='sleep',
    bash_command='sleep 5',
    dag_test_dag)

t3 = BashOperator(
	task_id='ls',
    bash_command='ls /tmp',
    dag=test_dag)
    
t1 >> [t2, t3]

배시오퍼레이터(BashOperator)는 Apache Airflow에서 제공하는 오퍼레이터 중 하나로, Bash 명령어 또는 Bash 스크립트를 실행할 수 있게 해줍니다. Airflow는 작업의 흐름(DAG, Directed Acyclic Graph)을 관리하고 스케줄링하는 도구로, 여기서 각 작업(task)은 '오퍼레이터(operator)'를 통해 정의됩니다.
오퍼레이터는 일련의 작업을 수행하는 파이썬 클래스를 의미하며, 여러 가지 유형의 오퍼레이터가 있습니다. 예를 들어 PythonOperator는 파이썬 함수를 실행하고, MySqlOperator는 MySQL 쿼리를 실행하며, BashOperator는 배시 스크립트를 실행하는 등의 기능을 수행합니다.
배쉬오퍼레이터를 이용하면, 배시 스크립트를 사용해 커맨드 라인에서 수행할 수 있는 거의 모든 작업을 Airflow의 작업으로 스케줄링하고 실행할 수 있습니다. 예를 들어, 특정 시간에 서버에서 파일을 다운로드하고 압축을 풀고, 데이터를 처리하는 작업을 자동화하고 싶다면, 이러한 명령을 배시 스크립트로 작성하고 BashOperator를 사용해 Airflow DAG에 추가할 수 있습니다.
Apache Airflow에서 오퍼레이터(Operator)는 단일 작업을 나타내는 클래스입니다. 즉, 오퍼레이터는 일련의 작업을 수행하는 도구라고 할 수 있습니다.
배쉬(Bash)는 주로 Unix와 Linux 환경에서 사용되는 셸 스크립트 언어입니다. 운영체제와의 상호 작용을 위한 간단한 명령어 실행, 파일 시스템 조작, 프로세스 관리 등의 작업에 주로 사용됩니다. 배쉬 스크립트는 일련의 커맨드 라인 명령어를 순서대로 실행하는 간단한 작업을 자동화하는 데 유용합니다.

bash_command 뒤에 말 그대로 명령어를 실행시켜줌
배쉬는 쉘 스크립트 언어

웹 UI 커맨드라인에서 실행
https://github.com/keeyong/airflow-setup/blob/main/dags/TestDAG.py

파이프라인을 활성화 시켜봄.

대그가 실행이 될것임
지금 시간 시작날짜 catchup에 따라 실행횟수가 정해질것

CLI로 로그인해서 실행할수도 있음

과거라고 해서 자동실행 X, 활성화 후 스케줄링, 얼마나 스케줄링은 catchup 파라미터에 따라 결정.
TRUE면 밀린것들을 실행.
false라면 한번만 실행


터미널로 들어가면
모든 대그를 화면에 뿌려줌
그 대그에 속한 태스크가 리스트로 뜸
특정 태스크를 실행하고 싶다면, 날짜를 입력하는 이유는 백필
test, run 동일. run은 실행결과가 메타데이터 DB 기록, test는 아님


웹 UI를 통해 실행시켜본다
터미널도 명령어 몇개 실행해본다

ec2, 도커 로그인만 설명, 실제는 다 똑같

실행이 되고있다
들어가본다

이렇게 실행 과정을 볼수있고 오토 리프레쉬 켜주면 실시간 반영됨
코드를 볼 수 도 있음
우측에 코드


다시 그리드로 돌아와서

태스크들 실행 순서를 알기위해 그래프를 눌러보면

시각화가 가능
각 태스크가 얼마나 걸렸는지 알고싶으면 태스크 듀레이션을 들어가면 됨

현재 별다른 정보는 없음
그리드로 돌아와서


껐다가 키니까 서버가 정지. docker-compose up 을 했는데 안됨.

docker-compose -f docker-compose.yaml up
으로 시도함


일단 다른 예제 대그들도 모두 실패하는걸로봐서
airflow.cfg를 수정해야할 것 같았다.
그러나 도커 컨테이너 내의 airflow.cfg에 접근하는건 다소 번거로워
볼륨에 추가해서 직접 접근을 한다.
단점으로는 변경사항은 컨테이너를 재시작해야한다고 한다

환경변수로 Localexecutor로 변환해본다


일단 aws는 성공해서
aws로 수업을 계속 진행한다

노노
윈도우 포기하고

우분투 wsl 깔아서 도커 airflow 컨테이너 구동까지 하고 healthy상태 확인.
그러나 로컬호스트:8080으로 윈도우에서 접속 실패
이에, wsl 우분투에서 크롬 깔아서 접속 시도.

중요 compose up 할 때 sudo 붙여야함

일단 윈도우꺼 포기하고

aws 리눅스 기반으로 시도.
쿠팡에서 ssd 주문했음. 500G. 내일 오는대로 우분투 20.04 설치해서 멀티부팅 시도 예정.


윈도우가 답이 없는게
로그도 안뜸.
위화면은 aws 서버에서 실행한 화면
원래 AWS도 속도가 느리고 응답이 끊기는 이슈가 있었는데, VPC를 새로 팠더니 잘됨.

CLI로도 실행 예정
대그.
기본적으로 시작날짜가 과거여도 자동 실행 X. 내가 명시적으로 활성화 시켜야 스케줄링이 됨. 얼마나 스케줄링될지는, catchup이 True, False여부.(false는 단 한번)

풀리프레쉬 데이터 파이프라인의 경우 catchup False


test, run 동일
지난번 배웠던 내용 복습.

로그인 방법만 다르고, 실행은 동일함.


dag_v1 정상 실행.
들어가본다

나의 경우두번 돌아감

코드에도 접근할 수 있음

다시 그리드로 돌아와서



테두리로 실행 성공여부 확인

정상적이면 상당히 빨리 끝남
어떻게 실행됐는지 확인

재실행은 우측 클리어
해당 print_date 부터 재시작
사실상 대그를 다시 실행과 동일해짐.
log을 확인할 수 있음 - 디버깅할 때 도움


좀전에 배쉬 커맨드의 date의 결과가 찍힌 모습

비슷하게 슬립은 아무것도 안찍힘
ls 로그 보면

oupt 아래에
/tmp 디렉토리 밑의 내용들을 프린트 함

대그의 내용들을 살펴본다
대그스로 이동해서

나중에 헬로월드도 살펴볼 예정

커맨드라인, 로그인, 실행

ec2 서버부터

이미 ssh 로그인을 해놓은
airflow 유저로 변신
sudo su airflow

모든 대그리스트를 본다
airflow dags list


paused가 True면 아직 실행 안됨
False면 지금 활성화 됨
dag_v1만 활성화 확인

활성 대그의 태스크들을 본다
airflow tasks list dag_v1

태스크 아이디를 보여준다

그중 태스크 하나를 실행시켜본다. 주의! 날짜를 적어주어야 한다
airflow tasks test dag_v1 ls 2023-05-27

로그식으로 뜬다
output으로

디렉토리 목록이 뜬 것을 확인함

이어서

도커 로그인

정확하게는 에어플로우 스케줄러 안으로 로그인 해야함
docker ps로 컨테이너 아이디를 알아내야함

6개의 컨테이너가 돌고
이미지의 이름에 스케줄러가 들어간 것을 확인한다

왼쪽이 컨테이너 아이디가 된다
이 컨테이어 안으로 로그인
docker exec -it "컨테이너ID" sh
sh 쉘 스크립트를 안에서 띄운다

스케줄러 안으로 들어옴

이제부턴 아까 ec2의 에어플로우의 작업과 같음
airflow dags list

대그들이 잘 나오는 것을 확인

오너도 확인할 수 있음

airflow tasks list dag_v1

airflow tasks test dag_v1 2023-05-27
으로 태스크 실행

output 이후 날짜가 프린트 된 것을 확인했다.

요약

꼭 다음에 우분투 환경에서 도커, 에어플로우 설치해보기.
일단 윈도우 안된다고 좌절하지 말고, 다행이 aws 서버는 잘 돌아가니까 이걸로 수업을 밀자.

profile
반갑습니다 햄스터 좋아합니다

0개의 댓글