[Airflow] Http Operator

minyeamer·2025년 6월 7일
0

Apache Airflow 배우기

목록 보기
10/13
post-thumbnail

HttpOperator

https://airflow.apache.org/docs/apache-airflow-providers-http/stable/index.html

  • HTTP 요청을 하고 응답 결과를 반환받는 Operator (반환값은 XCom에 저장)
  • HTTP를 이용하여 API를 처리하는 RestAPI 호출 시 사용 가능

패키지 설치

  • HttpOperator 는 Provider 패키지로 별도의 설치가 필요
  • 5.3.0 버전을 기준으로 apache-airflow>=2.10.0 버전을 요구
pip install apache-airflow-providers-http

파라미터

https://airflow.apache.org/docs/apache-airflow-providers-http/stable/_api/airflow/providers/http/operators/http/index.html

  • http_conn_id : http Connection을 생성해야 하는데 해당 Connection의 ID
  • endpoint : Connection에 등록한 Host 뒤에 붙는 경로
  • method : HTTP 메서드 (GET, POST, PUT, DELETE 등)
  • data : POST 요청 시 전달할 데이터 또는 GET 요청 시 전달할 파라미터
  • headers : HTTP 요청 헤더
  • response_check : HTTP 응답 결과가 정상인지 확인하는 함수 (True 반환)
  • response_filter : HTTP 응답 결과에 대한 전처리 함수

네이버 쇼핑 검색 API

https://developers.naver.com/docs/serviceapi/search/shopping/shopping.md

  • HttpOperator 활용을 위해 간단한 API를 예제로 사용
  • 네이버 쇼핑 검색 API는 네이버에서 제공하는 Open API 중 하나로 네이버 쇼핑 페이지에서 검색한 결과를 XML 형식 또는 JSON 형식으로 반환하는 RestAPI
  • 하나의 키워드를 전달하여 정확도순으로 1위에 노출되는 상품 정보를 출력하는 기능을 구현

API 키 생성

  • 네이버 개발자 센터에서 로그인 후 상단의 메뉴 중 Application->애플리케이션 등록 으로 이동
  • 아래 이미지와 같이 사용 API로 "검색"을 선택하고, 임의로 애플리케이션 이름과 서비스 환경을 입력하여 애플리케이션 등록

naver-app-1

  • 생성한 애플리케이션 정보에서 Client IDClient Secret 을 조회 가능

naver-app-2

Connection 추가

  • 문서에 명시된 요청 URL을 참고하여 Host 를 입력
  • API가 별도로 알려지지 않은 포트를 사용할 경우 Port 에 포트 번호를 입력하지만, 네이버 Open API는 표준 HTTPS 포트를 사용하기 때문에 미표기

connection

Variable 추가

  • API키는 Variable로 추가하는데, Client IDClient Secret 을 각각의 Variable로 등록
  • Client Secret 의 키 명칭에는 "secret" 이 포함되어 UI에서 자동으로 마스킹 처리되는 것을 확인

variable

HttpOperator 활용

  • 앞서 등록한 Connection의 ID를 http_conn_id 에 입력하고, endpoint 에 Host를 제외한 나머지 경로를 입력
  • data 에는 GET 요청에 대한 파라미터를 전달하는데, query 로 원하는 키워드를 입력
  • headers 에는 Jinja 템플릿을 활용하여 Client IDClient Secret 에 대한 Variable 값을 추가
  • HTTPS 요청 후 반환되는 Response 객체를 JSON 파싱하여 items 배열 내 첫 번째 항목, 즉 "노트북" 키워드로 검색한 결과에서 1위로 노출되는 상품을 딕셔너리 타입으로 반환
  • 다음 단계의 Task print_product_task 에서는 XCom에서 반환된 값을 꺼내 단순 출력
# dags/http_operator.py

from airflow.sdk import DAG, task
from airflow.providers.http.operators.http import HttpOperator
from airflow.models.taskinstance import TaskInstance
import pendulum

with DAG(
    dag_id="http_operator",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    schedule=None,
    catchup=False,
    tags=["example", "http"],
) as dag:
    def parse(response):
        return response.json()["items"][0]

    nshopping_search_task = HttpOperator(
        task_id="nshopping_search_task",
        http_conn_id="openapi.naver.com",
        endpoint="/v1/search/shop.json",
        method="GET",
        data={
            "query": "노트북",
            "display": 10,
            "start": 1,
            "sort": "sim"
        },
        headers={
            "Content-Type": "application/json",
            "X-Naver-Client-Id": "{{var.value.client_id_openapi_naver_com}}",
            "X-Naver-Client-Secret": "{{var.value.client_secret_openapi_naver_com}}"
        },
        response_filter=parse
    )

    @task(task_id="print_product_task")
    def print_product_task(ti: TaskInstance, **kwargs):
        result = ti.xcom_pull(task_ids="nshopping_search_task")
        from pprint import pprint
        pprint(result)

    nshopping_search_task >> print_product_task()

DAG 실행

  • DAG 실행 후 print_product_task 의 실행 로그를 확인했을 때, 아래와 같이 nshopping_search_task 에서 반환된 상품에 대한 정보를 조회 가능
# print_product_task

[2025-06-07, 12:33:35] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-07, 12:33:35] INFO - Filling up the DagBag from /opt/airflow/dags/http_operator.py: source="airflow.models.dagbag.DagBag"
[2025-06-07, 12:33:35] INFO - {'brand': '갤럭시북',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'category1': '디지털/가전',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'category2': '노트북',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'category3': '',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'category4': '',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'hprice': '',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'image': 'https://shopping-phinf.pstatic.net/main_8795238/87952389253.11.jpg',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'link': 'https://smartstore.naver.com/main/products/10407884292',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'lprice': '428000',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'maker': '삼성전자',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'mallName': '삼성공식파트너 코인비엠에스',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'productId': '87952389253',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'productType': '2',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO -  'title': '삼성 갤럭시북 인강용 사무용 업무용 가성비 윈도우11 저가 싼 태블릿 <b>노트북</b> 추천 기본팩'}: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
profile
데이터의 모든 것을 추구합니다.

0개의 댓글