Airflow Study21 - SimpleHttp operator

박성현·2024년 6월 18일

Airflow

목록 보기
28/28

서울시 공공데이터 https://data.seoul.go.kr/
API 키 발급 후 ,

airflow web -> admin -> connections 등록

아무거나 공공데이터나 들어가서 ,
Host 및 Port 확인 ; Open API -> 샘플 url 확인

샘플 코드

주요 고려사항 :

  1. http_conn_id :
    connection 등록에서 connection id 사용( 기입한 host 사용되는듯 ? )

  2. endpoint :
    host/port/뒤에 부분 , 각 api url샘플참고 , 여기선
    인증키 / json / 데이터명 / start# / end#

  3. 수집한 api 값은 xcom에 저장 :
    ti.xcom_pull(task_ids = '~') 의 형태로 불러옴

  4. 전역 변수 사용하여 api key 저장 :
    key 변경시 유지보수 편리, 보안 (아래 내용 참고 )

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.decorators import task
import pendulum

with DAG (
    dag_id='dags_simple_http_operator',
    start_date=pendulum.datetime(2023, 4, 1, tz='Asia/Seoul'),
    catchup=False,
    schedule=None
) as dag :

tb_cycle_station_info = SimpleHttpOperator (
        task_id = 'tb_cyle_station_info',
        http_conn_id = 'openapi.seoul.go.kr',
        endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/tbCycleStationInfo/1/10/',
        method = 'GET',
        headers={'Content-Type': 'application/json',
                        'charset': 'utf-8',
                        'Accept': '*/*'
                        }
        
    )
    
    @task(task_id = 'python_2')
    def python_2(**kwargs):
        ti = kwargs['ti']
        rst = ti.xcom_pull(task_ids = 'tb_cyle_station_info')
        import json
        from pprint import pprint
        
        pprint(json.loads(rst))
        
    tb_cycle_station_info >> python_2()

유의?고려 ? 사항 ;

( 전역변수 variable 전역변수 관련 )

  1. 많은 dag에 api key 사용했는데, key 바뀌면 ...
  2. 노출 시키고 싶지 않아.
    업로드중..

access_token, api_key, pass, secret, token 이런거 들어가면 자동으로 마스킹

profile
다소Good한 데이터 엔지니어

0개의 댓글