https://airflow.apache.org/docs/apache-airflow-providers-http/stable/index.html
HttpOperator
는 Provider 패키지로 별도의 설치가 필요apache-airflow>=2.10.0
버전을 요구pip install apache-airflow-providers-http
http_conn_id
: http Connection을 생성해야 하는데 해당 Connection의 IDendpoint
: Connection에 등록한 Host 뒤에 붙는 경로method
: HTTP 메서드 (GET, POST, PUT, DELETE 등)data
: POST 요청 시 전달할 데이터 또는 GET 요청 시 전달할 파라미터headers
: HTTP 요청 헤더response_check
: HTTP 응답 결과가 정상인지 확인하는 함수 (True
반환)response_filter
: HTTP 응답 결과에 대한 전처리 함수https://developers.naver.com/docs/serviceapi/search/shopping/shopping.md
HttpOperator
활용을 위해 간단한 API를 예제로 사용Application->애플리케이션 등록
으로 이동
Client ID
와 Client Secret
을 조회 가능
Host
를 입력Port
에 포트 번호를 입력하지만, 네이버 Open API는 표준 HTTPS 포트를 사용하기 때문에 미표기
Client ID
와 Client Secret
을 각각의 Variable로 등록Client Secret
의 키 명칭에는 "secret" 이 포함되어 UI에서 자동으로 마스킹 처리되는 것을 확인http_conn_id
에 입력하고, endpoint
에 Host를 제외한 나머지 경로를 입력data
에는 GET 요청에 대한 파라미터를 전달하는데, query
로 원하는 키워드를 입력headers
에는 Jinja 템플릿을 활용하여 Client ID
와 Client Secret
에 대한 Variable 값을 추가Response
객체를 JSON 파싱하여 items
배열 내 첫 번째 항목, 즉 "노트북" 키워드로 검색한 결과에서 1위로 노출되는 상품을 딕셔너리 타입으로 반환print_product_task
에서는 XCom에서 반환된 값을 꺼내 단순 출력# dags/http_operator.py
from airflow.sdk import DAG, task
from airflow.providers.http.operators.http import HttpOperator
from airflow.models.taskinstance import TaskInstance
import pendulum
with DAG(
dag_id="http_operator",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
schedule=None,
catchup=False,
tags=["example", "http"],
) as dag:
def parse(response):
return response.json()["items"][0]
nshopping_search_task = HttpOperator(
task_id="nshopping_search_task",
http_conn_id="openapi.naver.com",
endpoint="/v1/search/shop.json",
method="GET",
data={
"query": "노트북",
"display": 10,
"start": 1,
"sort": "sim"
},
headers={
"Content-Type": "application/json",
"X-Naver-Client-Id": "{{var.value.client_id_openapi_naver_com}}",
"X-Naver-Client-Secret": "{{var.value.client_secret_openapi_naver_com}}"
},
response_filter=parse
)
@task(task_id="print_product_task")
def print_product_task(ti: TaskInstance, **kwargs):
result = ti.xcom_pull(task_ids="nshopping_search_task")
from pprint import pprint
pprint(result)
nshopping_search_task >> print_product_task()
print_product_task
의 실행 로그를 확인했을 때, 아래와 같이 nshopping_search_task
에서 반환된 상품에 대한 정보를 조회 가능# print_product_task
[2025-06-07, 12:33:35] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-07, 12:33:35] INFO - Filling up the DagBag from /opt/airflow/dags/http_operator.py: source="airflow.models.dagbag.DagBag"
[2025-06-07, 12:33:35] INFO - {'brand': '갤럭시북',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'category1': '디지털/가전',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'category2': '노트북',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'category3': '',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'category4': '',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'hprice': '',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'image': 'https://shopping-phinf.pstatic.net/main_8795238/87952389253.11.jpg',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'link': 'https://smartstore.naver.com/main/products/10407884292',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'lprice': '428000',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'maker': '삼성전자',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'mallName': '삼성공식파트너 코인비엠에스',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'productId': '87952389253',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'productType': '2',: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - 'title': '삼성 갤럭시북 인강용 사무용 업무용 가성비 윈도우11 저가 싼 태블릿 <b>노트북</b> 추천 기본팩'}: chan="stdout": source="task"
[2025-06-07, 12:33:35] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"