[NiFi] invokeHTTP로 REST API를 받아와서 DB에 INSERT하기

이혜지·2020년 9월 22일
0

NiFi

목록 보기
3/7
post-thumbnail

전체적인 흐름
REST API를 불러와서 -> json형식으로 변환 -> 데이터베이스에 삽입

환경설정
OS : CentOS 7
DBMS : MySQL

전체적인 흐름

불러올 open api 고르기
필자는 https://aviationstack.com 사이트에서 비행기 정보를 불러오는 api를 이용

무료 회원가입 하고


예시를 확인해줍니다. 회원가입 하면 본인의 access_key 확인 가능

&기호를 통해 옵션을 지정 가능. end_point를 바꿔서 다른걸 불러 올 수도있다.


사용가능한 end_point들

GenerateFlowFile설정


access_key 라는 property를 추가해 본인의 access_key 작성
더 추가할 옵션도 지정해줬기 때문에 flight_status와 limit도 추가했다.

scheduling은 3 min으로 설정

InvokeHTTP설정

setting엔 failure, no retry, original, retry설정

Remote URL에 aviationstack.com에서 예시로 보여줬던 url를 기입

기입할때 GenerateFlowFile에서 설정했던 access_key와 flight_status와 limit을 변수로 사용한다.
access_key=${access_key} 나머지도 이처럼

여기까지 Rest API를 불러오는데 성공했다.


여기까지 진행했을때의 큐를 확인해보자.

(List Queued를 눌러서 눈모양 누르면 확인가능)

json형식으로 들어온 파일 확인

SplitJson 과 ControlRate 설정


SplitJson은 말그대로 Json파일을 한개한개 나눠주는것이다.

사진을 보면 Json형식에서 반복되면서 나타나는 부분은 "data":[{}] ~ 부분이다 저 부분을 가져와서 database에 넣도록 하겠다.

SplitJson에서 속성 JsonPath Expression은 json expression을 구분해서 써넣으면 된다. $.data를 하면 불러올 수 있다.

그리고 ControlRate로 넘겨준다.
ControlRate의 기능은 데이터가 후속 프로세서로 전송되는 속도를 제어한다. 프로세서에서 더 많은 작업이 일어날 수 있지만 정확도를 높여준다.

ControlRate settings는 failure, properties 설정

rate control criteria와 maximunrate를 설정

flowfiles를 separate해줬기 때문에 flowfilecount로 설정.

JoltTransformJSON 설정

settings는 failure만 설정

advanced 눌러서 Jolt Specification안에 바꿔줄 json파일을 써넣는다.


  • api로 불러왔을때 json파일의 모습
{
  "pagination" : {
    "limit" : 100,
    "offset" : 0,
    "count" : 100,
    "total" : 8866
  },
  "data" : [ {
    "flight_date" : "2020-09-18",
    "flight_status" : "active",
    "departure" : {
      "airport" : "Fukuoka",
      "timezone" : "Asia/Tokyo",
      "iata" : "FUK",
      "icao" : "RJFF",
      "terminal" : null,
      "gate" : null,
      "delay" : 15,
      "scheduled" : "2020-09-18T14:29:00+00:00",
      "estimated" : "2020-09-18T14:29:00+00:00",
      "actual" : "2020-09-18T14:43:00+00:00",
      "estimated_runway" : "2020-09-18T14:43:00+00:00",
      "actual_runway" : "2020-09-18T14:43:00+00:00"
    },
    "arrival" : {
      "airport" : "Chitose",
      "timezone" : "Asia/Tokyo",
      "iata" : "CTS",
      "icao" : "RJCC",
      "terminal" : "D",
      "gate" : null,
      "baggage" : null,
      "delay" : 13,
      "scheduled" : "2020-09-18T16:28:00+00:00",
      "estimated" : "2020-09-18T16:28:00+00:00",
      "actual" : null,
      "estimated_runway" : null,
      "actual_runway" : null
    },
    "airline" : {
      "name" : "ANA",
      "iata" : "NH",
      "icao" : "ANA"
    },
    "flight" : {
      "number" : "8573",
      "iata" : "NH8573",
      "icao" : "ANA8573",
      "codeshared" : null
    },
    "aircraft" : null,
    "live" : null
  }]
}
  • jolt Specification 에 넣을 바꿔줄 json형식
[{
 "operation": "shift",
 "spec": {
 "flight_date": "flight_date",
 "flight_status": "flight_status",
 "departure": {
 "airport": "depAirport",
 "timezone": "arrTimezone"
    },
   "arrival": {
    "airport": "depAirport",
 "timezone": "arrTimezone",
     "terminal": "arrTerminal",
     "gate": "arrGate"
    },
   "airline": {
    "name": "airlineName"
   },
   "flight": {
     "number": "flightNumber"
    },
   "aircraft": {
     "registration": "aircraftRegistration"
    },
   "live": {
     "updated": "LiveUpdated",
     "latitude": "latitude",
     "longitude": "longitude",
     "altitude": "altitude",
     "direction": "direction",
      "speed_horizontal": "speedHorizontal",
     "speed_vertical": "speedVertical",
     "is_ground": "isGround"
    }
  }
}]

이처럼 Jolt Specipication에 json 형식을 넣어주고 save를 한뒤

Json input에 SplitJson으로 넘어온 json 파일을 복사 붙여넣기 해서 TRANSFORM을 해주면 Json output에 변환 된 json이 나온다.

ConvertJSONToSQL

settings 에는 failure와 original을 체크
properties 에서
JDBC Connection Pool에는 이전에 Mysql과 NiFi 연동하기 에서 해놨던 DBCPConnectionPool을 선택.
DB에 insert를 하는것이므로 INSERT 선택


* MySQL 설정 (DBeaver사용)

DB에 insert 하기전에 mysql에 테이블과 컬럼 생성

create table activeFlights(
     id int NOT NULL AUTO_INCREMENT,
     flight_date date NOT NULL,
     flight_status varchar(6) NOT NULL,
     depAirport varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     depTimezone varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     arrAirport varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     arrTimezone varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     arrGate varchar(256) DEFAULT NULL,
     airlineName varchar(256) DEFAULT NULL,
     flightNumber varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     aircraftRegistration varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     LiveUpdated varchar(256) DEFAULT NULL,
     altitude varchar(256) DEFAULT NULL,
     direction varchar(256) DEFAULT NULL,
     speedHorizontal varchar(256) DEFAULT NULL,
     speedVertical varchar(256) DEFAULT NULL,
     isGround varchar(256) DEFAULT NULL,
     Column1 varchar(100) DEFAULT NULL,
     insertTimestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
     PRIMARY KEY (id)
     ) ENGINE=InnoDB AUTO_INCREMENT=16901 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

sql syntax 오류가 난다면 utf8mb4_0900을 utf8_general로 바꿔준다 (version오류임)


다시 돌아와서
ConvertJsonToSQL의 property에서
Table Name에는 activeFlights 설정
Catalog Name에는 자신의 DB이름 설정

PutSQL

ConverJsonToSQL과 이어줄때 sql설정
setting는 failure와 success설정

properties에서
DBCPConnectionPool을 설정해준다. 그래야 sql이 DB로 들어가니까~ putsql할때는 필수


본인한테 retry하는것까지 하면 작성 끝.

profile
공유 문화를 지향하는 개발자입니다.

2개의 댓글

comment-user-thumbnail
2022년 2월 11일

nifi restApi 설정법 잘 보았습니다. 궁금한 부분있어 글 남깁니다.
SplitJson settings 설정은 어떻게 하면 될까요?

1개의 답글