1. airflow-day3-1

data_hamster·2023년 6월 9일
0

학습주제
Airflow Dag 작성

학습내용


파이썬 오퍼레이터로 헬로월드를 만들어봄
오퍼레이터는 클래스라고 일단 생각
앞서 구글 콜랩, 네임 젠터 레드쉬프트 테이블 적재 했었음
이걸 에어플로 환경으로 코딩

야후 파이낸스 api 애플 지난 30일 주가정보를 풀리프레쉬
이어 이 대그를 인크리맨탈 업데이트
매일 지난 30일 정보를 불러 추가


더미, 배쉬 오퍼레이터로 구조를 살펴보았다.
본격적인 프로그래밍 시작.

from airflow.operators.python import PythonOperator

load_nps = PythonOperator(
	dag=dag,
    task_id='task_id',
    python_callable=python_func,
    params={
    	'table': 'delighted_nps',
        'schema': 'raw_data'
        },
        )
def python_func(**cxt):
	table = cxt["params"]["table"]
    schema = cxt["params"]["schema"]
    
    ex_date = cxt["excution_date"]
    

cxt의 예시

cxt = {
    "params": {
        "table": "테이블1",
        "color": "빨강",
        "size": "중간"
    },
    "user": {
        "name": "홍길동",
        "age": 30,
        "email": "hong@example.com"
    }
}

파이썬 오퍼레이터를 호출하며 인스턴스 생성
dag, task_id 지정
실행해야되는 파이썬 함수의 이름을 파이썬 콜러블에 지정.
이 태스크가 실행될 때 지정 함수가 지정.
이때 함수에 인자를 넘기고 싶다면 params에 지정하는데 params는 딕셔너리임.
ctx라는 인자. 이것도 딕셔너리.
파람스 키 밑에 딕셔너리가 놓임.

밑에 코드는 파이썬 코드랑 똑같음.
굉장히 자유도가 높음.
내가 원하는 어떤 기능이든 파이썬이라면 다 쓸 수 있음.
다만 해야될 일을 많음.

테스크 데코레이터를 쓰면, 파이썬 오퍼레이터를 지정을 하고 그 오퍼레이터가 실행해야하는 함수를 별도로 지정해줘야하는게 아니라, 파이썬 함수를 바로 태스크로 정의할 수 있음. 이어서 본다
https://github.com/learndataeng/learn-airflow/blob/main/dags/HelloWorld.py

dag = DAG(
	dag_id = "helloWorld",
    start_date = datetime(2021,8,26),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *',
    default_args=default_args
    )

두개로 태스크 구성 대그
print_hello
print_goodbye
각각 스트링을 출력해보는 pythonoperator

그걸하기전 DAG를 세팅함. DAG id 주고, 시작일, 스케줄, 하루에 한번 2시 0분에 실행. 디폴드 아규먼트는 딕셔너리, 파라미터들은 태스크에 일반적으로 적용
소스코드를 본다

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id = 'HelloWorld',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *')

def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

print_hello = PythonOperator(
    task_id = 'print_hello',
    #python_callable param points to the function you want to run 
    python_callable = print_hello,
    #dag param points to the DAG that this task is a part of
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,
    dag = dag)

#Assign the order of the tasks in our DAG
print_hello >> print_goodbye

learn-airflow 아래 있는 코드

다른 에어플로우 코드처럼
DAG 모듈 임포트
PythonOperator 임포트

DAG 인스턴스 생성

파이썬 함수 2개 생성
print 로 되어 있음

각 오퍼레이터를 보면
태스크 아이디, 파이썬콜러블로 실행이되어야 할 함수를 갖는 파라미터를 지정.
그리고 대그 지정.

마지막으로 실행 순서를 세팅
print_hello >> print_goodbye
순서를 정하지않으면 동시에 독립적으로 실행되고 끝.

굿바이는 헬로우가 끝나야지만 실행. 일렬로 실행.

다시 뒤로 돌아와

이번에는 에어플로우에서 제공해주는 태스크 데코레이터를 써서 프로그래밍
보니까, 태스크 id, dag 지정을 안해도 되는거 같음.

from airflow.decorators import task

@task
def print_hello():
	print("hello!")
    return "hello!"
    
@task
def print_goodbye():
	print("goodbye!")
    return "goodbye!"

with DAG(
	dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
    ) as dag:
    
print_hello() >> print_goodbye()

with as는 예전에 책에서 리소스의 할당을 해제하기 편하기 때문에 쓰는 것으로 배웠음

파이썬 콜러블로 지정했던 엔트리함수를 태크스로 어노테이트, 데코레이트를 했음
각각의 파이썬 오퍼레이터다 라는 뜻. 오퍼레이터의 엔트리함수는 저 자체

전에는 오퍼레이터를 따로 정의, 엔트리 함수 따로 정의했음

현재는 간단하게 구현된
대그 세팅은 크게 달라지지 않음
순서를 보면 함수이름을 가지고 순서를 정함
함수 이름이 태스크 ID가 됨.

저 태스크 데코데이터가 자체적으로 맞는 오퍼레이터를 씌워주는 것같음.
https://github.com/learndataeng/learn-airflow/blob/main/dags/HelloWorld_v2.py

각 함수 위에 태스크가 어노테이트 된 상황. 각각이 오퍼레이터가 됨.
태스크 아이디가 함수의 이름이 됨.
간략한 형태로 대그를 파이썬 코드로 작성


이 섹션 마무리 전에
대그에 지정할 수 있는 파라미터 몇개
대그에 직접 지정해주는 파라미터들이 있고
default_args -> 태스크에 일반적으로 적용할 파라미터값. 대그 파라미터 아님!

max_active_runs -> 한번에 동시에 실행될 수 있는 대그의 수, 보통 한번에 한번만 실행. 만약 백필을 한다, 데일리 인크리멘탈 업테이트를 한다면, 지난 1년간의 데이터를 업데이터 365번 실행되어야 함. 한번에 한번만 대그가 실행되어야 한다면 시간이 오래 걸릴 수 있음. 만약 max_active_runs가 30이면 대그가 30개 동시실행. 30개씩 비슷하게 끝난다면 12번 정도에 1년이 업데이트 될 것임. 백필을 할 때 의미가 있음. 아무리 큰값을 지정해도 upper bound는 cpu의 총 합이 됨. 워커 노드가 1개밖에 없고, cpu가 4개로 할당이면, 최대는 4개로 할당할 수 있음. 100을 세팅해도 실제로 적용되는 한계점은 4가 됨. 워커에 할당되어 있는 cpu의 총 합이 한계점. max active tasks에도 적용됨.
max_active_tasks -> 동시가 태스크 몇개 실행, 대그가 1열로 실행이라면 1이나 100을 주나 상관 없음.
catchup -> 이 대그의 시작 날짜가 과거. 활성화 시켰을 때 밀린 날짜들을 따라잡을지 여부 디폴트는 True, 풀리프레쉬인 경우 아무 의미가 없음. 어차피 다 읽어오기에, 인크리멘탈 업데이트에서만 중요.

profile
반갑습니다 햄스터 좋아합니다

0개의 댓글