DAG
내 구성요소, 사전 정의된 Task의 템플릿 (객체지향 프로그래밍에서의 Class와 객체의 관계와 비슷합니다)airflow.models.baseoperator.BaseOperator
또는 특정 Operator를 상속받아 Custom Operator를 작성할 수 있습니다.execute(self, context)
함수 내에 동작할 코드를 작성하며, on_kill(self)
과 같은 사전 정의된 Method를 Override하여 추가적인 동작을 설정할 수 있습니다.[주의사항] Custom Operator 작성 시,
__init__()
함수에 과도한 연산을 발생시킬 경우 해당 Scheduler에서 해당 Operator를 사용한 Task마다 연산이 발생할 수 있기 때문에 서버에 부담이 발생할 수 있으므로 가급적 가볍게 작성할 것
from typing import *
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults # Don't need to use this decorator in recent versions due to its deprecation
from airflow.utils.context import Context
class MyCustomOperator(BaseOperator):
# To use Jinja template, consider the field names present in the template_fields attribute.
template_fields: Sequence[str] = ("param_name", )
template_ext: Optional[str] = ".sql" # If the parameter contains file name, set the file extension here.
template_fields_renderers: Optional[dict] = {
"param_name": "py",
"another_param": "json"
} # This defines in what style the value from template field renders in Web UI
@apply_defaults
def __init__(
self,
param : str,
*args, **kwargs
) -> None
"""
Description
:param param: parameter description
:returns: None
:raises AssertionError: if param is not an str
"""
super().__init__(*args, **kwargs)
self.param = param
assert isinstance(self.param, str), f"param is not str, it is {type(self.param).__name__}."
# This function is called ahead of execute() function.
def pre_execute(self, context: Context) -> None:
# Statements here
super().pre_execute() # Optional
# Defines the operator's actual work.
def execute(self, context: Context) -> None:
# Statements here
super().execute(context) # If you want to execute the parent class's execute method, call it explicitly.
# This function is called when the task instance is killed.
def on_kill(self) -> None:
# Statements here
super().on_kill() # Optional
airflow.models.baseoperator.BaseOperator
airflow.utils.decorators.apply_defaults
template fields
: Jinja2
템플릿을 사용하려면 해당 클래스 변수에 변수명을 지정해야 하며, 지정된 변수명에 해당하는 매개변수에서 템플릿이 렌더링 됨template_ext
template_fields_renderers
: {'field_name': 'json'}
와 같이 설정하면 해당 필드가 JSON 형식으로 렌더링됨pre_execute(self, context: Context)
: execute(self, context: Context)
on_kill(self)
그 외 부가적인 속성 및 메소드는 공식 문서 참고