https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
BaseOperator
클래스를 상속받아 직접 만든 Operator를 사용 가능BaseOperator
상속 시 두 가지 메서드를 재정의(Overriding)해야 함def __int__(self, *args, **kwargs)
kwargs
에 포함)def execute(self, context)
template_fields: Sequence[str] = ("name",)
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
이전에 만들었던 네이버 쇼핑 검색 API를 활용한 HttpOperator의 기능을 개선
start
값을 다르게 한 여러 개의 HttpOperator를 만들어야 함execute()
메서드 안에서 전체 요청을 100개 단위로 나누고, 반복문을 통해 여러 번 요청 및 응답 결과를 합칠 수 있게 로직을 작성endpoint
를 전부 입력해야 했는데, API 종류를 구분할 수 있는 문자열만 전달해서 endpoint
를 자동으로 만들어주면 더 편리할 것이라 생각함endpoint
변경이 용이하기 때문에 쇼핑 검색 뿐 아니라 블로그 검색도 같이 수행할 것을 고려네이버 쇼핑 및 블로그 검색 API를 호출하여 전체 데이터를 받은 후 CSV 파일로 저장하기
BaseOperator
를 상속받는 NaverSearchToCsvOperator
를 구현plugins/
경로 아래에 추가# plugins/operators/nshopping.py
from airflow.models import BaseOperator
from airflow.hooks.base import BaseHook
from typing import Dict, List
class NaverSearchToCsvOperator(BaseOperator):
template_fields = ("file_path", "client_id", "client_secret")
def __init__(self,
search_type: str,
file_path: str,
client_id: str,
client_secret: str,
keyword: str,
display: int = 10,
start: int = 1,
sort = "sim",
**kwargs):
super().__init__(**kwargs)
self.http_conn_id = "openapi.naver.com"
self.endpoint = f"/v1/search/{search_type}.json"
self.method = "GET"
self.file_path = file_path if file_path.startswith("/") else ("/opt/airflow/files/"+file_path)
self.client_id = client_id
self.client_secret = client_secret
self.keyword = keyword
self.display = min(display, 1000)
self.start = min(start, 1000)
self.sort = sort
def execute(self, context):
connection = BaseHook.get_connection(self.http_conn_id)
url = connection.host + self.endpoint
rows = list()
for i, start in enumerate(range(self.start, self.display, 100)):
display = min(self.display + self.start - start, 100)
self.log.info(f"시작: {start}")
self.log.info(f"끝: {start+display-1}")
kwargs = dict(keyword=self.keyword, display=display, start=start, sort=self.sort)
rows = rows + self._request_api(url, show_header=(i == 0), **kwargs)
self._mkdir(self.file_path)
self._to_csv(rows, self.file_path, encoding="utf-8", sep=',')
def _request_api(self, url: str, keyword: str, display: int=10, start: int=1, sort="sim", show_header=True) -> List[List]:
import requests
params = self._get_params(keyword, display, start, sort)
headers = self._get_headers(self.client_id, self.client_secret)
with requests.request(self.method, url, params=params, headers=headers) as response:
return self._parse_api(response, start, show_header)
def _get_params(self, keyword: str, display: int=10, start: int=1, sort="sim") -> Dict:
return {"query": keyword, "display": min(display, 100), "start": min(start, 1000), "sort": sort}
def _get_headers(self, client_id: str, client_secret: str) -> Dict[str,str]:
return {
"Content-Type": "application/json",
"X-Naver-Client-Id": client_id,
"X-Naver-Client-Secret": client_secret
}
def _parse_api(self, response, start: int=1, show_header=True) -> List[List]:
contents = response.json()["items"]
if contents:
header = ([["rank"] + list(contents[0].keys())]) if show_header else []
return header + [[start+i]+list(content.values()) for i, content in enumerate(contents)]
else: return list()
def _mkdir(self, file_path: str):
import os
dir_path = os.path.split(file_path)[0]
if not os.path.exists(dir_path):
os.system(f"mkdir -p {dir_path}")
def _to_csv(self, rows: List[List], file_path: str, encoding="utf-8", sep=','):
def clean(value: str) -> str:
return str(value).replace(sep, '')
with open(file_path, 'w', encoding=encoding) as file:
for row in rows:
file.write(sep.join(map(clean, row))+'\n')
search_type: str
endpoint
는 /v1/search/shop.json
인데, 블로그 검색 또는 뉴스 검색과 같은 다른 API를 확인했을 때, URL에서 {{host}}/v1/search/{{search_type}}.json
와 같은 패턴이 있음을 확인 가능endpoint
대신에 search_type
을 입력으로 받고 endpoint
를 자동으로 생성file_path: str
naverSearch/
경로 아래 실행 날짜별로 폴더를 만들어서 아래에 CSV 파일을 저장할 계획인데, data_interval_end
변수를 사용하기 위해 template_fields
에 해당 파라미터를 등록/
) 경로를 지정하지 않았을 경우 /opt/airflow/files
경로를 Root 경로로 사용하도록 지정docker-compose.yaml
파일에 컨테이너와 로컬 컴퓨터 경로를 연결짓는 구문을 입력# docker-compose.yaml
x-airflow-common:
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/files:/opt/airflow/files
client_id: str
, client_secret: str
template_fields
에 등록keyword: str
, display: int
, start: int
, sort: str
display
는 최대 100까지 허용되지만, 반복문을 통해 나눠서 요청할 수 있기 때문에 1000까지 확장start
는 API 문서에 명세된 것과 같이 최대 1000까지의 값으로 제한sort
는 API 종류에 따라 허용되는 값이 다르지만, 공통적으로 정확도순 정렬(sim
)을 사용할 것이기에 별도로 검증하지 않음template_fields = ("file_path", "client_id", "client_secret")
def __init__(self,
search_type: str,
file_path: str,
client_id: str,
client_secret: str,
keyword: str,
display: int = 10,
start: int = 1,
sort = "sim",
**kwargs):
super().__init__(**kwargs)
self.http_conn_id = "openapi.naver.com"
self.endpoint = f"/v1/search/{search_type}.json"
self.method = "GET"
self.file_path = file_path if file_path.startswith("/") else ("/opt/airflow/files/"+file_path)
self.client_id = client_id
self.client_secret = client_secret
self.keyword = keyword
self.display = min(display, 1000)
self.start = min(start, 1000)
self.sort = sort
execute()
endpoint
를 조합하여 API 요청 URL을 생성start
부터 display
까지의 범위를 100개 단위로 구분하여 반복문을 구성하고, 반복문마다 파라미터를 다르게 하여 API 요청 메서드 _request_api()
에 전달def execute(self, context):
connection = BaseHook.get_connection(self.http_conn_id)
url = connection.host + self.endpoint
rows = list()
for i, start in enumerate(range(self.start, self.display, 100)):
display = min(self.display + self.start - start, 100)
self.log.info(f"시작: {start}")
self.log.info(f"끝: {start+display-1}")
kwargs = dict(keyword=self.keyword, display=display, start=start, sort=self.sort)
rows = rows + self._request_api(url, show_header=(i == 0), **kwargs)
...
_request_api()
requests.Response
객체를 그대로 파싱 메서드 _parse_api()
로 전달def _request_api(self, url: str, keyword: str, display: int=10, start: int=1, sort="sim", show_header=True) -> List[List]:
import requests
params = self._get_params(keyword, display, start, sort)
headers = self._get_headers(self.client_id, self.client_secret)
with requests.request(self.method, url, params=params, headers=headers) as response:
return self._parse_api(response, start, show_header)
_get_params()
def _get_params(self, keyword: str, display: int=10, start: int=1, sort="sim") -> Dict:
return {"query": keyword, "display": min(display, 100), "start": min(start, 1000), "sort": sort}
_get_headers()
def _get_headers(self, client_id: str, client_secret: str) -> Dict[str,str]:
return {
"Content-Type": "application/json",
"X-Naver-Client-Id": client_id,
"X-Naver-Client-Secret": client_secret
}
execute()
rows
를 만들고, 반복문마다 _request_api()
메서드가 반환하는 각각의 리스트 객체를 하나로 결합show_header
파라미터를 활용def execute(self, context):
...
rows = list()
for start in range(self.start, self.display, 100):
...
rows = rows + self._request_api(url, show_header=(i == 0), **kwargs)
...
_parse_api()
requests.Response
객체를 인수로 전달받아 JSON 파싱한 후 items
키에서 배열을 꺼냄execute()
에서 전달한 show_header
파라미터를 통해 첫 번째 반복문에서만 CSV 헤더를 추가해 반환pd.DataFrame
을 활용할 수도 있지만, 단순하게 배열 타입으로 파싱def _parse_api(self, response, start: int=1, show_header=True) -> List[List]:
contents = response.json()["items"]
if contents:
header = ([["rank"] + list(contents[0].keys())]) if show_header else []
return header + [[start+i]+list(content.values()) for i, content in enumerate(contents)]
else: return list()
execute()
_mkdir()
메서드를 통해 file_path
검증rows
객체를 _to_csv()
메서드에 전달해 CSV 파일로 저장def execute(self, context):
...
self._mkdir(self.file_path)
self._to_csv(rows, self.file_path, encoding="utf-8", sep=',')
_mkdir()
file_path
가 있는지 확인하고, 없으면 해당 경로를 생성하는 메서드def _mkdir(self, file_path: str):
import os
dir_path = os.path.split(file_path)[0]
if not os.path.exists(dir_path):
os.system(f"mkdir -p {dir_path}")
_to_csv()
file_path
를 열고, 전달받은 rows
객체를 한 줄씩 기록하는 메서드,
가 있을 경우 쌍따옴표 " "
로 감싸는 것과 같은 조치가 필요하지만, 내용이 중요한건 아니기 때문에 단순히 값에서 구분자 ,
를 모두 제거하는 방식으로 가공하여 저장def _to_csv(self, rows: List[List], file_path: str, encoding="utf-8", sep=','):
def clean(value: str) -> str:
return str(value).replace(sep, '')
with open(file_path, 'w', encoding=encoding) as file:
for row in rows:
file.write(sep.join(map(clean, row))+'\n')
plugins/
에 정의한 NaverSearchToCsvOperator
를 가져와서 두 개의 Task를 생성search_shopping_task
, 다른 하나는 네이버 블로그 검색 API를 사용하는 search_blog_task
HttpOperator
를 사용했으면 길어졌을 내용을 NaverSearchToCsvOperator
로 구조화하여 DAG 선언을 단순화# dags/nsearch_operator.py
from airflow.sdk import DAG
from operators.nshopping import NaverSearchToCsvOperator
import pendulum
with DAG(
dag_id="nsearch_operator",
schedule="0 9 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
catchup=False,
tags=["example", "http"],
) as dag:
api_keys = dict(
client_id="{{var.value.client_id_openapi_naver_com}}",
client_secret="{{var.value.client_secret_openapi_naver_com}}")
common_params = dict(keyword="노트북", display=1000, start=1, sort="sim")
"""네이버 쇼핑 검색"""
search_shopping_task = NaverSearchToCsvOperator(
task_id="search_shopping_task",
search_type="shop",
file_path="naverSearch/{{data_interval_end.in_timezone(\"Asia/Seoul\") | ds_nodash }}/shop.csv",
**api_keys,
**common_params
)
"""네이버 블로그 검색"""
search_blog_task = NaverSearchToCsvOperator(
task_id="search_blog_task",
search_type="blog",
file_path="naverSearch/{{data_interval_end.in_timezone(\"Asia/Seoul\") | ds_nodash }}/blog.csv",
**api_keys,
**common_params
)
search_shopping_task >> search_blog_task
start
1부터 display
1000까지 총 1000개의 검색 결과를 100개 단위로 나눠서 요청하는 과정을 로그로 남겨서 조회
[2025-06-07, 19:44:00] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-07, 19:44:00] INFO - Filling up the DagBag from /opt/airflow/dags/nsearch_operator.py: source="airflow.models.dagbag.DagBag"
[2025-06-07, 19:44:00] INFO - Secrets backends loaded for worker: count=1: backend_classes=["EnvironmentVariablesBackend"]: source="supervisor"
[2025-06-07, 19:44:00] INFO - Secrets backends loaded for worker: count=1: backend_classes=["EnvironmentVariablesBackend"]: source="supervisor"
[2025-06-07, 19:44:00] INFO - Secrets backends loaded for worker: count=1: backend_classes=["EnvironmentVariablesBackend"]: source="supervisor"
[2025-06-07, 19:44:00] INFO - Connection Retrieved 'openapi.naver.com': source="airflow.hooks.base"
[2025-06-07, 19:44:00] INFO - 시작: 1: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:00] INFO - 끝: 100: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:00] INFO - 시작: 101: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:00] INFO - 끝: 200: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:01] INFO - 시작: 201: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:01] INFO - 끝: 300: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:01] INFO - 시작: 301: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:01] INFO - 끝: 400: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:01] INFO - 시작: 401: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:01] INFO - 끝: 500: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:02] INFO - 시작: 501: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:02] INFO - 끝: 600: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:02] INFO - 시작: 601: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:02] INFO - 끝: 700: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:02] INFO - 시작: 701: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:02] INFO - 끝: 800: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:02] INFO - 시작: 801: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:02] INFO - 끝: 900: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:03] INFO - 시작: 901: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
[2025-06-07, 19:44:03] INFO - 끝: 1000: source="airflow.task.operators.operators.nshopping.NaverSearchToCsvOperator"
success
로 처리된 후, 로컬 컴퓨터의 files/
경로에 두 개의 CSV 파일이 생성됨을 확인% tree files -F
files/
└── naverSearch/
└── 20250607/
├── blog.csv
└── shop.csv
rank | title | link | image | lprice | hprice | mallName | productId | productType | brand | maker | category1 | category2 | category3 | category4 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 삼성 갤럭시북... | https://... | https://... | 428000 | 삼성공식파트너 코인비엠에스 | 87952389253 | 2 | 갤럭시북 | 삼성전자 | 디지털/가전 | 노트북 | |||
2 | 삼성전자 갤럭시북4... | https://... | https://... | 799000 | 네이버 | 52631236642 | 1 | 갤럭시북4 | 삼성전자 | 디지털/가전 | 노트북 | |||
3 | LG전자 울트라PC... | https://... | https://... | 599000 | 제이 씨앤에스 | 83718736488 | 2 | LG전자 | LG전자 | 디지털/가전 | 노트북 | |||
4 | 삼성전자 갤럭시북4... | https://... | https://... | 728000 | 네이버 | 52647794278 | 1 | 갤럭시북4 | 삼성전자 | 디지털/가전 | 노트북 | |||
5 | LG그램 노트북... | https://... | https://... | 1149000 | 온라인총판대리점 | 86636005031 | 2 | LG그램 | LG전자 | 디지털/가전 | 노트북 |
rank | title | link | description | bloggername | bloggerlink | postdate |
---|---|---|---|---|---|---|
1 | 노트북에서... | https://... | 버리고 노트북으로... | 란디의 하늘색 꿈 | blog.naver.com/fksel33 | 20250601 |
2 | 고비넘긴 삼성노트북... | https://... | 노원구 공릉동에 위치한... | 하드웨어수리닷컴(엘존) | blog.naver.com/yzenn | 20250603 |
3 | MSI 사무용노트북... | https://... | MSI노트북중에서... | Modulestudio | blog.naver.com/modulestudio | 20250602 |
4 | 아직 이걸 안 써?... | https://... | 아직 이걸 안 써?... | 짜루의 이것저것 리뷰 | blog.naver.com/skdaksdptn | 20250520 |
5 | 가성비 초경량 노트북... | https://... | 그런 노트북을... | 삼성Mall 한사랑씨앤씨 공식 블로그 | blog.naver.com/hansarangcnc | 20250527 |