from airflow.operators.python import PythonOperator
"""
context가 가지고 있는 key
params : Operator에서 지정해주는 인자
task_instance : 해당 task의 unique한 id
execution_date : 실행 날짜
등등...
"""
# PythonOperator에서 사용할 Python 함수
def python_ft(**context):
context["params"]['param1']
context["task_instance"]
context["execution_date"]
...
# PythonOperator로 task 생성
task = PythonOperator(
dag = dag,
task_id = "task_id",
python_callble = python_ft, # 위에서 작성한 함수
params = {'parma1' : '', ...}, # python_ft에서 사용할 인자들
)
from airflow.decorators import task
# task decorator를 사용하여 함수 자체가 task로 변환
@task
def python_ft():
...
# DAG 정의와 함께 task 정렬도 같이
with DAG(...) as dag:
# 함수 이름이 task id가 된다.
python_tf1() >> python_tf2() ...
max_active_runs = int
max_active_tasks = int
catchup = boolean
Airflow를 key-value storage처럼 사용하는 것
Web UI에서 설정할 때 지정한 'Key'를 코드에서 사용하여 'Value'를 불러온다.
from airflow.models import Varialbe
Variable.get('key') # key에 해당하는 Value를 불러온다.
def python_ft(**context):
context["task_instance"].xcom_pull(key='return_value', task_ids='불러와야할 task id')
오늘은 일정이 있어서 수업을 조금밖에 듣지 못했다.
오늘 공부한 내용은 내일 다시 복습하면서 자세하게 서치해봐야겠다.