- Http 요청을 하고 결과로 text를 리턴받는 오퍼레이터
- 리턴값은 Xcom에 저장
- HTTP를 이용하여 API 를 처리하는 RestAPI 호출시 사용가능
- HttpOperator 와 SimpleHttpOperator 두개가 존재함
Http operator 공식문서
실습
인증키 발급
- 서울열린데이터광장
- API 사용을 위해서는 회원가입 및 로그인 필수
- 인증키 신청 -> 일반인증키 신청
- 내용은 적당히 잘 적으면 됨
- 인증키 발급완료
데이터
connection 등록
- airflow -> admin-> connections
data:image/s3,"s3://crabby-images/4e4ae/4e4ae2ffd5b3970e92e7e34d688495e46809cd7f" alt=""
- 내용 채워주기 (id, type, host, port 작성)
data:image/s3,"s3://crabby-images/ee35e/ee35e279bfb4a611e0f651f03cd663f2d233a533" alt=""
키값을 airflow의 variable 등록하기
- airflow -> admin-> variable
- key : apikey_openapi_seoul_go_kr
- val : 내가 발급받은 키
- airlfow는 기본적을
‘access_token’, ‘api_key’, ‘apikey’,’authorization’, ‘passphrase’, ‘passwd’, ‘password’, ‘private_key’, ‘secret’, ‘token’
의 값들이 key속에 들어가 있으면 기본적으로 마스킹 처리
data:image/s3,"s3://crabby-images/de8aa/de8aa987102da3231c54693e28773de6d447d61d" alt=""
- variable에 등록하면 인증키를 템플릿을 통해 사용할 수 있음.
코드 작성
- dag id : dags_simple_http_operator
- api 호출 값을 가지고 오는 task id : tb_cycle_station_info
- 호출 값을 출력해주는 task_id : python_2
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.decorators import task
import pendulum
with DAG(
dag_id='dags_simple_http_operator',
start_date=pendulum.datetime(2024, 6, 16, tz='Asia/Seoul'),
catchup=False,
schedule=None
) as dag:
'''서울시 공공자전거 대여소 정보'''
tb_cycle_station_info = SimpleHttpOperator(
task_id='tb_cycle_station_info',
http_conn_id='openapi.seoul.go.kr',
endpoint='{{var.value.apikey_openapi_seoul_go_kr}}/json/bikeList/1/10/',
method='GET',
headers={'Content-Type': 'application/json',
'charset': 'utf-8',
'Accept': '*/*'
}
)
@task(task_id='python_2')
def python_2(**kwargs):
ti = kwargs['ti']
rslt = ti.xcom_pull(task_ids='tb_cycle_station_info')
import json
from pprint import pprint
pprint(json.loads(rslt))
tb_cycle_station_info >> python_2()