Apache Airflow Custom Operator Template

graphy-young·2024년 6월 24일
0

Apache Airflow

목록 보기
3/3
post-thumbnail
post-custom-banner

Apache Airflow Custom Operator

01. Apache Airflow의 Operator란 무엇인가?


  • Apache Airflow의 DAG 내 구성요소, 사전 정의된 Task의 템플릿 (객체지향 프로그래밍에서의 Class와 객체의 관계와 비슷합니다)
  • DAG 내에서 선언적으로 정의할 수 있으며, 파이프라인에서 데이터가 어떻게 처리되는지에 대한 로직을 포함하고 있습니다.
  • 기본적으로 제공되는 Operator 외에 Provider Package를 통해 확장할 수 있으며, 필요에 따라서 airflow.models.baseoperator.BaseOperator 또는 특정 Operator를 상속받아 Custom Operator를 작성할 수 있습니다.
  • Custom Operator는 상속 후 execute(self, context) 함수 내에 동작할 코드를 작성하며, on_kill(self)과 같은 사전 정의된 Method를 Override하여 추가적인 동작을 설정할 수 있습니다.

[주의사항] Custom Operator 작성 시, __init__() 함수에 과도한 연산을 발생시킬 경우 해당 Scheduler에서 해당 Operator를 사용한 Task마다 연산이 발생할 수 있기 때문에 서버에 부담이 발생할 수 있으므로 가급적 가볍게 작성할 것

02. Custom Operator Template


from typing import *

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults # Don't need to use this decorator in recent versions due to its deprecation
from airflow.utils.context import Context


class MyCustomOperator(BaseOperator):


    # To use Jinja template, consider the field names present in the template_fields attribute.
    template_fields: Sequence[str] = ("param_name", )
    template_ext: Optional[str] = ".sql" # If the parameter contains file name, set the file extension here.
    template_fields_renderers: Optional[dict] = {
        "param_name": "py",
        "another_param": "json"
    } # This defines in what style the value from template field renders in Web UI

    @apply_defaults
    def __init__(
            self,
            param : str,
            *args, **kwargs
    ) -> None
        
        """
        Description
        
        :param param: parameter description
        :returns: None
        :raises AssertionError: if param is not an str
        """

        super().__init__(*args, **kwargs)
        
        self.param = param
        
        assert isinstance(self.param, str), f"param is not str, it is {type(self.param).__name__}."

    # This function is called ahead of execute() function.
    def pre_execute(self, context: Context) -> None:
        # Statements here
        super().pre_execute() # Optional

    # Defines the operator's actual work.
    def execute(self, context: Context) -> None:
        # Statements here
        super().execute(context) # If you want to execute the parent class's execute method, call it explicitly.

    # This function is called when the task instance is killed.
    def on_kill(self) -> None:
        # Statements here
        super().on_kill() # Optional

설명

Class

  • airflow.models.baseoperator.BaseOperator
    • 모든 Airflow의 Operator의 기초 모델링이 포함된 클래스
    • 일반적으로 Airflow Operator를 개발할 때 해당 클래스를 상속받아 작성
    • 이 클래스는 작업의 예약, 실행, 상태 업데이트 등의 기본적인 동작을 정의하고 있음
  • airflow.utils.decorators.apply_defaults
    • Airflow 1.X 버전에서 Operator 작성 시, 클래스 초기화 메소드(init)의 기본 매개변수 값을 설정하고 상속된 클래스의 기본값을 포함한 인수들을 자동으로 초기화하는데 사용되었으며, 현재는 사용하지 않아도 상관없음
    • 현재는 사용 중단(deprecated)되었으며, 대신 명시적인 매개변수 초기화를 사용하는 것이 권장됨

Class variable

  • template fields:
    • Airflow에서 지원하는 Jinja2 템플릿을 사용하려면 해당 클래스 변수에 변수명을 지정해야 하며, 지정된 변수명에 해당하는 매개변수에서 템플릿이 렌더링 됨
    • 이를 통해 실행 시점에 동적으로 값을 설정할 수 있음
  • template_ext
    • 템플릿 파일의 확장자를 지정할 수 있음
    • 예시: ['.sql', '.hql']와 같이 설정하면 SQL 및 HiveQL 템플릿 파일을 사용할 수 있음
  • template_fields_renderers:
    • 특정 템플릿 필드에 대한 렌더링 방식을 지정
    • 예시: {'field_name': 'json'}와 같이 설정하면 해당 필드가 JSON 형식으로 렌더링됨

Method

  • pre_execute(self, context: Context):
    • Operator가 실행되기 전에 호출되는 메소드로, 주로 실행 전 준비 작업을 수행
    • 작업 실행 전후의 작업 흐름을 조절할 때 유용함
  • execute(self, context: Context)
    • Operator가 동작할 때 실행되는 실제 메소드로, 작업의 주요 로직이 구현됨
    • 이 메소드 내에서 작업이 수행되고, 작업 결과가 반환
  • on_kill(self)
    • 작업이 강제 종료될 때 호출되는 메소드로종료 작업이나 클린업 작업을 수행
    • 작업이 중단될 때 필요한 정리 작업을 정의하는 데 사용됨

그 외 부가적인 속성 및 메소드는 공식 문서 참고

참고자료


profile
키보드 한 자루로 시작하는 데이터 엔지니어링 삽질기
post-custom-banner

0개의 댓글