• BaseOperator 상속시 두 가지 메서드를 재정의해야 함(Overriding)
1) def init
→ 클래스에서 객체 생성시 객체에 대한 초기값 지정하는 함수
2) def execute(self, context) → 실제 로직을 담은 함수
3) Template 적용이 필요한 변수는 class 변수 template_fields에 지정 필요
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
from airflow.models.baseoperator import BaseOperator
from airflow.hooks.base import BaseHook
import pandas as pd
class SeoulApiToCsvOperator(BaseOperator):
# template 문법 적용하고 싶은 변수 허용 지정
template_fields = ('endpoint', 'path', 'file_name', 'base_dt')
def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs):
super().__init__(**kwargs)
self.http_conn_id = 'openapi.seoul.go.kr'
self.path = path
self.file_name = file_name
self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm
self.base_dt = base_dt
def execute(self, context):
import os
# baskhook : airflow web에서 정의했던 connection 정보를 가져올 수 있음.
connection = BaseHook.get_connection(self.http_conn_id)
self.base_url = f'http://{connection.host}:{
connection.port}/{self.endpoint}'
total_row_df = pd.DataFrame()
start_row = 1
end_row = 1000
while True:
self.log.info(f'시작:{start_row}')
self.log.info(f'끝:{end_row}')
row_df = self._call_api(self.base_url, start_row, end_row)
total_row_df = pd.concat([total_row_df, row_df])
if len(row_df) < 1000:
break
else:
start_row = end_row + 1
end_row += 1000
# 폴더가 없으면 생성해주는데 -p 옵션으로 상위 폴더까지 생성
if not os.path.exists(self.path):
os.system(f'mkdir -p {self.path}')
total_row_df.to_csv(self.path + '/' + self.file_name,
encoding='utf-8', index=False)
def _call_api(self, base_url, start_row, end_row):
import requests
import json
headers = {'Content-Type': 'application/json',
'charset': 'utf-8',
'Accept': '*/*'
}
request_url = f'{base_url}/{start_row}/{end_row}/'
if self.base_dt is not None:
request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}'
response = requests.get(request_url, headers)
contents = json.loads(response.text)
key_nm = list(contents.keys())[0]
row_data = contents.get(key_nm).get('row')
row_df = pd.DataFrame(row_data)
return row_df
from operators.seoul_api_to_csv_operator import SeoulApiToCsvOperator
from airflow import DAG
import pendulum
with DAG(
dag_id='dags_seoul_bikelist',
schedule='0 7 * * *',
start_date=pendulum.datetime(2024, 6, 16, tz='Asia/Seoul'),
catchup=False
) as dag:
'''서울시 공공자전거 실시간 대여 현황'''
seoul_api2csv_bike_list = SeoulApiToCsvOperator(
task_id='seoul_api2csv_bike_list',
dataset_nm='bikeList',
path='/opt/airflow/ingest/bikeList/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}',
file_name='bikeList.csv'
)