[데이터 엔지니어링 데브코스] TIL 38일차 - 데이터 파이프라인과 Airflow(3)

박단이·2023년 12월 13일
0

데브코스 TIL

목록 보기
38/56

오늘 배운 것🤓

Python Operator

  • 다양한 Operator들 중 적합한 Operator를 찾지못해 직접 작성하는 경우 사용한다.
  • 자유도가 높고, 설정해줘야하는 내용이 많다.
from airflow.operators.python import PythonOperator


"""
 context가 가지고 있는 key
     params : Operator에서 지정해주는 인자
     task_instance : 해당 task의 unique한 id
     execution_date : 실행 날짜
     등등...
"""
# PythonOperator에서 사용할 Python 함수
def python_ft(**context):
	context["params"]['param1']
	context["task_instance"]
	context["execution_date"]
    ...

# PythonOperator로 task 생성
task = PythonOperator(
	dag = dag,
    task_id = "task_id",
    python_callble = python_ft,		# 위에서 작성한 함수
    params = {'parma1' : '', ...},	# python_ft에서 사용할 인자들    
)

Airflow Decorators

  • 위의 코드를 좀더 단순하게 Operator를 직접 지정하지 않고 task Decorator를 사용하여 파이썬 함수 자체를 operator처럼 사용한다.
from airflow.decorators import task

# task decorator를 사용하여 함수 자체가 task로 변환
@task
def python_ft():
	...
    
# DAG 정의와 함께 task 정렬도 같이
with DAG(...) as dag:
	# 함수 이름이 task id가 된다.
	python_tf1() >> python_tf2() ...

DAG 파라미터 (task에 적용 X)

  • max_active_runs = int
    • 동시에 실행할 수 있는 DAG의 수
    • backfill을 위해 필요하다.
  • max_active_tasks = int
    • 동시에 실행할 수 있는 task의 수
    • 병렬 구조일 경우 의미가 있다.
  • 위의 2가지 파라미터는 아무리 큰 값을 지정하도라도 worker가 할당받을 수 있는 최대의 cpu 개수를 넘어갈 수 없다.
  • catchup = boolean
    • full refresh일 경우 의미가 없다. incrumental update일 경우 사용
    • default = True

Connections & Variables

  • Conncetions
    • DB 접속 정보를 코드 내에서 하드 코딩하지 않고, 환경 변수처럼 코드 밖에 저장하고 끌어오는 방법
    • Web UI에서 설정할 때 지정한 'conn id'를 코드에서 사용
  • Varialbes
    • Airflow를 key-value storage처럼 사용하는 것

    • Web UI에서 설정할 때 지정한 'Key'를 코드에서 사용하여 'Value'를 불러온다.

      from airflow.models import Varialbe
      
      Variable.get('key')  # key에 해당하는 Value를 불러온다.

Xcom

  • task(Operator)들 사이에 데이터를 주고 받기 위한 방식
  • Operator의 return 값은 메타 데이터 DB에 저장되고, 이 값을 다른 Operator에서 읽어간다.
  • return 값은 자동으로 저장되며, 해당 task_id를 사용하여 불러온다.
  • 단, 큰 데이터에는 사용할 수 없으므로 큰 데이터는 S3에 저장하고 그 위치를 return 값으로 넘긴다.
def python_ft(**context):
	context["task_instance"].xcom_pull(key='return_value', task_ids='불러와야할 task id')

느낀 점😊

오늘은 일정이 있어서 수업을 조금밖에 듣지 못했다.
오늘 공부한 내용은 내일 다시 복습하면서 자세하게 서치해봐야겠다.

profile
데이터 엔지니어를 꿈꾸는 주니어 입니다!

0개의 댓글