[NiFi] REST API TO DB 실습

CHAN LIM·2024년 1월 11일
0

NiFi

목록 보기
12/13
post-thumbnail

Intro.

서드 파티(Like Kafka, HDFS...)를 제외하고 할 수 있는 작업 중,
REST API에 대해서 상기시키고 적용할 겸, 진행한 실습.


0. 전체 프로세스

  • 1. 데이터를 받아올 Open API 선정 및 설정
  • 2. REST API를 통해 FlowFile을 생성
  • 3. FlowFile를 확인 후, 파일 형식에 따라 변환
    (이번 실습에서는 Json 형식)
  • 4. 변환된 Flowfile를 SQL로 DB에 삽입

1. API 선정 및 설정

Open API 중, aviationStack 선정

  • 일단, 무료
  • 상당히 거대한 데이터를 가져올 수 있다.
    다른 분들이 많이 활용하시길래
  • 오른쪽 상단에 있는 SIGN UP FREE 클릭 후
  • Free로 회원가입을 진행한다.

  • 해당 부분에 생성된 API Access Key를 잘 기억해둔다.

Docs 에서 API에 대한 용법을 확인할 수 있다.

  • 매우 방대하므로 직접 확인하는 것을 추천한다.

2. REST API TO Flowfile

Flowfile 자체를 생성하고
Flowfile이 API를 통해 데이터를 가져오는 프로세스


2.1 GenerateFlowfile

  • 생성 직후의 속성값

  • 작성자가 수정한 속성값

  • API를 통해 데이터를 가져오려면
    Flowfile이 access_key 라는 속성을 가져야합니다.
    • 오른쪽 상단의 +를 통해 속성을 추가합니다.
  • 그 외 flight_statuslimit는 Docs을 보고 가져올 데이터와 그 숫자를 지정한 속성입니다.

  • 스케쥴링은 3분으로 임의 지정했다.
    • API 요청 제한이 있으므로...

2.2 InvokeHTTP

위의 이미지와 이름이 다른 건 작성자가 임의로 수정한 사항

  • REST API로 데이터를 가져올 것이므로 GET method 지정
  • HTTP URL 설정
    • http://api.aviationstack.com/v1/flights?access_key=${access_key}&flight_status=${active}&limit=${limit}
    • 이전 프로세서에서 access_key라는 속성을 추가했으므로,
      본인의 access_key를 추가하며, 옵션도 추가한다.

중간점검

  • API를 통해 가져온 데이터이다.

3. Data 변환

3.1 Data 분할

위에서 확인한 데이터는 여러 비행 상태를 나타내는 Bulk 데이터이다.
따라서, 해당 데이터를 하나의 비행 상태씩 분할하는 작업이 요구된다.

  • SplitJson
    • 프로세스 이름 그대로,
      Json 데이터를 분할하는 프로세서이다.
      • 속성에서, Json 데이터를 분할하는 것이기에 $.data를 설정
  • ControlRate
    • 데이터가 후속 프로세서로 전송되는 속도를 제어한다.
    • 속도를 조절함으로서, 정확도를 높이기 위함이다.
      • Control 기준은 Flowfile의 수로 지정한다.
        flowfiles를 separate해줬기 때문에 flowfilecount로 설정.

분할 결과


3.2 JoltTransformJSON

JoltTransform

  • Jolt is an open-source, JSON-to-JSON transformation library
  • 데이터 변환을 위한 라이브러리
  • NiFi는 해당 라이브러리가 Default로 있으므로,
    그대로 직접 사용해도 된다.

속성

  • Jolt Specification만 설정하면 된다.

3.2.1 Jolt Specification

[{
 "operation": "shift",
 "spec": {
 "flight_date": "flight_date",
 "flight_status": "flight_status",
 "departure": {
 "airport": "depAirport",
 "timezone": "arrTimezone"
    },
   "arrival": {
    "airport": "depAirport",
 "timezone": "arrTimezone",
     "terminal": "arrTerminal",
     "gate": "arrGate"
    },
   "airline": {
    "name": "airlineName"
   },
   "flight": {
     "number": "flightNumber"
    },
   "aircraft": {
     "registration": "aircraftRegistration"
    },
   "live": {
     "updated": "LiveUpdated",
     "latitude": "latitude",
     "longitude": "longitude",
     "altitude": "altitude",
     "direction": "direction",
      "speed_horizontal": "speedHorizontal",
     "speed_vertical": "speedVertical",
     "is_ground": "isGround"
    }
  }
}]
  • 설정 후 결과


4. Json To DB

4.1 ConvertJsonToSQL

속성

  • Statement Type
    • DB로 데이터를 입력할 것이므로 INSERT
  • TableName
  • CatalogName : Database 이름
    • 위 두 속성은 사전에 DB(MySQL)에 생성한 다음에 입력해야한다.

4.1.1 DB table 생성

create table activeFlights(
     id int NOT NULL AUTO_INCREMENT,
     flight_date date NOT NULL,
     flight_status varchar(6) NOT NULL,
     depAirport varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     depTimezone varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     arrAirport varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     arrTimezone varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     arrGate varchar(256) DEFAULT NULL,
     airlineName varchar(256) DEFAULT NULL,
     flightNumber varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     aircraftRegistration varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
     LiveUpdated varchar(256) DEFAULT NULL,
     altitude varchar(256) DEFAULT NULL,
     direction varchar(256) DEFAULT NULL,
     speedHorizontal varchar(256) DEFAULT NULL,
     speedVertical varchar(256) DEFAULT NULL,
     isGround varchar(256) DEFAULT NULL,
     Column1 varchar(100) DEFAULT NULL,
     insertTimestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
     PRIMARY KEY (id)
     ) ENGINE=InnoDB AUTO_INCREMENT=16901 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

4.2 PutSQL

SQL로 DB에 데이터 삽입 프로세서


결과

mysql> select * from activeFlights;
+-------+-------------+---------------+------------+-------------+------------+-------------+---------+---------
| id    | flight_date | flight_status | depAirport | depTimezone | arrAirport | arrTimezone | arrGate | airlineN
+-------+-------------+---------------+------------+-------------+------------+-------------+---------+---------
| 16901 | 2024-01-11  | schedu        |            | NULL        | NULL       |             | NULL    | empty
| 16902 | 2024-01-10  | schedu        |            | NULL        | NULL       |             | NULL    | Wizz Air
+-------+-------------+---------------+------------+-------------+------------+-------------+---------+---------
2 rows in set (0.01 sec)

한번에 삽입하면 파악하기 어려워서 하나씩 넣어봤다.


출처

NiFi-invokeHTTP로-REST-API를-받아와서-DB에-INSERT하기

profile
클라우드, 데이터, DevOps 엔지니어 지향 || 글보단 사진 지향

0개의 댓글