Custom 오퍼레이터

yoon__0_0·2024년 6월 18일
0

이어드림 수업

목록 보기
69/103
  • airflow는 필요한 오퍼레이터를 직접 만들어 사용할 수 있도록 확장성을 지원함.
  • BaseOperator을 상속하여 원하는 기능은 파이썬으로 직접 구현이 가능함
    공식문서

오퍼레이터를 custom 하는 방법

• BaseOperator 상속시 두 가지 메서드를 재정의해야 함(Overriding)

1) def init
→ 클래스에서 객체 생성시 객체에 대한 초기값 지정하는 함수

2) def execute(self, context) → 실제 로직을 담은 함수

3) Template 적용이 필요한 변수는 class 변수 template_fields에 지정 필요


예시코드
from airflow.models.baseoperator import BaseOperator


class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message
        

실습

이 실습의 기능 강화

  • 서울시 공공자전거의 전체 row를 가져오고 결과를 csv로 저장할 수 있는 오퍼레이터 만들기
    • 한번에는 1000건만 가져올 수 있음.
    • 근데 몇건이 있는지 모르기 때문에 몇번 돌려야하는지 모르고 우리가 다 해줄 수 없기 때문에 이를 해결하기 위해 custom operator 을 해주는 것

  • seoul_api_to_csv_operator.py
from airflow.models.baseoperator import BaseOperator
from airflow.hooks.base import BaseHook
import pandas as pd


class SeoulApiToCsvOperator(BaseOperator):
    # template 문법 적용하고 싶은 변수 허용 지정
    template_fields = ('endpoint', 'path', 'file_name', 'base_dt')

    def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs):
        super().__init__(**kwargs)
        self.http_conn_id = 'openapi.seoul.go.kr'
        self.path = path
        self.file_name = file_name
        self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm
        self.base_dt = base_dt

    def execute(self, context):
        import os
		
        # baskhook : airflow web에서 정의했던 connection 정보를 가져올 수 있음.
        connection = BaseHook.get_connection(self.http_conn_id)
        self.base_url = f'http://{connection.host}:{
            connection.port}/{self.endpoint}'

        total_row_df = pd.DataFrame()
        start_row = 1
        end_row = 1000
        while True:
            self.log.info(f'시작:{start_row}')
            self.log.info(f'끝:{end_row}')
            row_df = self._call_api(self.base_url, start_row, end_row)
            total_row_df = pd.concat([total_row_df, row_df])
            if len(row_df) < 1000:
                break
            else:
                start_row = end_row + 1
                end_row += 1000
		
        # 폴더가 없으면 생성해주는데 -p 옵션으로 상위 폴더까지 생성 
        if not os.path.exists(self.path):
            os.system(f'mkdir -p {self.path}')
        total_row_df.to_csv(self.path + '/' + self.file_name,
                            encoding='utf-8', index=False)

    def _call_api(self, base_url, start_row, end_row):
        import requests
        import json

        headers = {'Content-Type': 'application/json',
                   'charset': 'utf-8',
                   'Accept': '*/*'
                   }

        request_url = f'{base_url}/{start_row}/{end_row}/'
        if self.base_dt is not None:
            request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}'
        response = requests.get(request_url, headers)
        contents = json.loads(response.text)

        key_nm = list(contents.keys())[0]
        row_data = contents.get(key_nm).get('row')
        row_df = pd.DataFrame(row_data)

        return row_df
  • dags_seoul_bikelist.py
from operators.seoul_api_to_csv_operator import SeoulApiToCsvOperator
from airflow import DAG
import pendulum

with DAG(
    dag_id='dags_seoul_bikelist',
    schedule='0 7 * * *',
    start_date=pendulum.datetime(2024, 6, 16, tz='Asia/Seoul'),
    catchup=False
) as dag:
    '''서울시 공공자전거 실시간 대여 현황'''
    seoul_api2csv_bike_list = SeoulApiToCsvOperator(
        task_id='seoul_api2csv_bike_list',
        dataset_nm='bikeList',
        path='/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}',
        file_name='bikeList.csv'
    )
  • docker container와 ec2 서버를 연동해주기 ( docker가 down 되더라도 다시 키면 데이터는 있을 수 있도록)
    • ec2 서버 home 경로에 ingest 폴더 생성
    • docker-compose.yaml 파일 수정
      : volumes : 에 경로 추가
    • docker-compose.yaml 다시 compose up
  • DAG 실행해주기
    • 실행시 home 경로에 csv가 만들어짐.
profile
신윤재입니다

0개의 댓글