데브코스 43일차(1) - PythonOperator를 활용한 예제

Pori·2023년 12월 13일
0

데엔

목록 보기
34/47

PythonOperator를 사용한 DAG

: pythonOperator를 활용하여 쉽게 DAG를 작성 가능하다. 두가지 방법을 소개한다.

  • task decorator가 없는 방식
from airflow.operators.python import PythonOperator

t1 = PythonOperator(
	dag = dag,
	task_id='task_id',
	python_callable = python_func,
	params={
		'table':'test_table',
		'schema':'raw_data'
	},
)

# python function
def python_func(**cxt):
	table = cxt["params"]["table"]
	schema = cxt["params"]["schema"]
	ex_datge = cxt["execution_date"]
  • task decorator를 사용한 방식
from airflow.decorators import task

dag = DAG(
	dag_id = 'HelloWorld',
	...
)
# python functions
@task
def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
	dag_id = "HelloWorld",
	...
)as dag:

# taskID == function_name
print_hello() >> print_goodbye()

참고) 중요한 DAG 파라미터들

  • max_active_runs : 동시에 수행 가능한 DAG의 수
  • max_active_tasks : 동시에 수행되는 task들의 수
  • catchup : DAG의 start_date 이전에 밀린 작업들의 수행 여부 (default = True)

Connections & Variables

: 두 기능 모두 Airflow의 webUI를 통해 쉽게 설정 가능하다.

  • connections : 연결에 필요한 정보들을 환경설정의 형태로 코드 밖으로 빼내는 일을 수행한다.
    Connection 설정 후에 다음과 같이 연결 작업 수행이 가능하다. 아래 예시는PostgresHook을 사용하여 Redshift와 연결한 예제이다.
from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_Redshift_connection(autocommit=True):
		# Connection에서 수행한 연결의 id를 적어준다.
    hook = PostgresHook(postgres_conn_id='<Connection id>')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()
  • variables : 변수 역시 환경설정으로 뺄 수 있음, airflow를 key-value 스토리지 형태로 사용,
    from airflow.models import Variable코드로 사용가능하다.

Xcom

  • 태스크들간에 데이터를 주고 받기 위한 방식

  • Operator의 리턴값을 Operator에서 읽는 형태

  • 메타데이터 DB에 저장되기 때문에 큰 데이터는 불가능하다.

    • 큰 데이터는 S3등에 로드하고 위치를 넘기는 것이 일반적이다.
  • 예제 코드 (Extract->Transform->Load 순으로 진행된다.)

def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)
    
# extract의 리턴값이 transform의 입력으로 받아진다.
def transform(**context):
    logging.info("Transform started")
		# xcom은 params가 아니다. task_instance를 통해서 가져옴
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    ...
    return records
    
# Operator 정의
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)
transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    ... 
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    ...
    dag = dag)

extract >> transform >> load

CLI 환경에서 DAG 확인하기

: 작업을 하다보면 CLI환경에서 DAG들을 테스트하고, 작업을 수행해야하는 경우가 발생한다.

  • airflow의 컨테이너 로그인
    1. docker ps로 로그인 하려는 컨테이너의 id를 찾는다.
    2. 쉘 실행 : docker exec -it <container_id> sh
  • 쉘에서 유용한 명령들 : Scheduler에서 사용
    • DAG 출력 : airflow dags list
    • DAG안의 동작 task들을 출력 : airflow tasks list <DAG_id>
    • 변수 리스트 출력 : airflow variables list
    • 변수의 내용을 출력 : airflow variables get <variable>
    • DAG의 task list 확인 : airflow tasks list UpdateSymbol (dag-id)
    • cli에서 dag 실행 : airflow dags test UpdateSymbol 2023-05-30

Task의 수

  • task를 너무 많이 만드는 경우 전체 DAG 실행에 있어 너무 오래걸리게 된다.
  • task를 적게만들면 모듈화가 안되고, 실패 시에 재실행 시간이 오래걸린다.
  • 적절한 수의 task로 분할해서 사용하는 것이 중요하다.

0개의 댓글