airflow 2.0에서는 plugin system에 operator 를 추가해야한다.
존재하는 operator를 확장, 수정해서
Airflow 장점이 operator, views, hooks 등 모든 것을 커스텀할 수 있다는 점이다.
생성 방법
AirflowPlugin 클래스를상속하는 View, Operator,hook... 클래스를 생성한다
plugin 이름 등 속성을 설정한다
생성 후 Lazy Loaded 이기때문에 airflow 인스턴스를 재시작해야한다.
from airflow.plugins_manager import AirflowPlugin
# 모든 hook은 BaseHook 상속해서 메서드, property 등
from airflow.hooks.base import BaseHook
from elasticsearch import Elasticsearch
class ElasticHook(BaseHook):
def __init__(self, conn_id='elastic_default', *args, **kwargs):
super().__init__(*args, **kwargs)
conn = self.get_connection(conn_id)
conn_config = {}
hosts = []
if conn.host:
hosts = conn.host.split(',')
if conn.port:
conn_config['port'] = int(conn.port)
if conn.login:
conn_config['http_auth'] = (conn.login, conn.password)
self.es = Elasticsearch(hosts, **conn_config)
self.index = conn.schema
def info(self):
return self.es.info()
def set_index(self, index):
self.index = index
def add_doc(self, index, doc_type, doc):
self.set_index(index)
res = self.es.index(index=index, doc_type=doc_type, doc=doc)
return res
class AirflowElasticPlugin(AirflowPlugin):
name = 'elastic'
hooks = [ElasticHook]