서드 파티(Like Kafka, HDFS...)를 제외하고 할 수 있는 작업 중,
REST API에 대해서 상기시키고 적용할 겸, 진행한 실습.
- 1. 데이터를 받아올 Open API 선정 및 설정
- 2. REST API를 통해 FlowFile을 생성
- 3. FlowFile를 확인 후, 파일 형식에 따라 변환
(이번 실습에서는 Json 형식)
- 4. 변환된 Flowfile를 SQL로 DB에 삽입
Open API 중,
aviationStack
선정
- 일단, 무료
- 상당히 거대한 데이터를 가져올 수 있다.
다른 분들이 많이 활용하시길래
- 오른쪽 상단에 있는
SIGN UP FREE
클릭 후
- Free로 회원가입을 진행한다.
- 해당 부분에 생성된 API Access Key를 잘 기억해둔다.
Docs 에서 API에 대한 용법을 확인할 수 있다.
- 매우 방대하므로 직접 확인하는 것을 추천한다.
Flowfile 자체를 생성하고
Flowfile이 API를 통해 데이터를 가져오는 프로세스
- 생성 직후의 속성값
- 작성자가 수정한 속성값
- API를 통해 데이터를 가져오려면
Flowfile이access_key
라는 속성을 가져야합니다.
- 오른쪽 상단의
+
를 통해 속성을 추가합니다.
- 그 외
flight_status
와limit
는 Docs을 보고 가져올 데이터와 그 숫자를 지정한 속성입니다.
- 스케쥴링은 3분으로 임의 지정했다.
- API 요청 제한이 있으므로...
위의 이미지와 이름이 다른 건 작성자가 임의로 수정한 사항
- REST API로 데이터를 가져올 것이므로
GET
method 지정
- HTTP URL 설정
http://api.aviationstack.com/v1/flights?access_key=${access_key}&flight_status=${active}&limit=${limit}
- 이전 프로세서에서
access_key
라는 속성을 추가했으므로,
본인의access_key
를 추가하며, 옵션도 추가한다.
중간점검
- API를 통해 가져온 데이터이다.
위에서 확인한 데이터는 여러 비행 상태를 나타내는 Bulk 데이터이다.
따라서, 해당 데이터를 하나의 비행 상태씩 분할하는 작업이 요구된다.
- SplitJson
- 프로세스 이름 그대로,
Json 데이터를 분할하는 프로세서이다.
- 속성에서, Json 데이터를 분할하는 것이기에
$.data
를 설정
- ControlRate
- 데이터가 후속 프로세서로 전송되는 속도를 제어한다.
- 속도를 조절함으로서, 정확도를 높이기 위함이다.
- Control 기준은 Flowfile의 수로 지정한다.
flowfiles를 separate해줬기 때문에 flowfilecount로 설정.
분할 결과
- Jolt is an open-source, JSON-to-JSON transformation library
- 데이터 변환을 위한 라이브러리
- NiFi는 해당 라이브러리가 Default로 있으므로,
그대로 직접 사용해도 된다.
속성
- Jolt Specification만 설정하면 된다.
[{
"operation": "shift",
"spec": {
"flight_date": "flight_date",
"flight_status": "flight_status",
"departure": {
"airport": "depAirport",
"timezone": "arrTimezone"
},
"arrival": {
"airport": "depAirport",
"timezone": "arrTimezone",
"terminal": "arrTerminal",
"gate": "arrGate"
},
"airline": {
"name": "airlineName"
},
"flight": {
"number": "flightNumber"
},
"aircraft": {
"registration": "aircraftRegistration"
},
"live": {
"updated": "LiveUpdated",
"latitude": "latitude",
"longitude": "longitude",
"altitude": "altitude",
"direction": "direction",
"speed_horizontal": "speedHorizontal",
"speed_vertical": "speedVertical",
"is_ground": "isGround"
}
}
}]
속성
- JDBC Connection Pool
- 이전에 진행했던 JDBC Connection Pool을 설정한다.
- Statement Type
- DB로 데이터를 입력할 것이므로 INSERT
- TableName
- CatalogName : Database 이름
- 위 두 속성은 사전에 DB(MySQL)에 생성한 다음에 입력해야한다.
4.1.1 DB table 생성
create table activeFlights( id int NOT NULL AUTO_INCREMENT, flight_date date NOT NULL, flight_status varchar(6) NOT NULL, depAirport varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, depTimezone varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, arrAirport varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, arrTimezone varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, arrGate varchar(256) DEFAULT NULL, airlineName varchar(256) DEFAULT NULL, flightNumber varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, aircraftRegistration varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, LiveUpdated varchar(256) DEFAULT NULL, altitude varchar(256) DEFAULT NULL, direction varchar(256) DEFAULT NULL, speedHorizontal varchar(256) DEFAULT NULL, speedVertical varchar(256) DEFAULT NULL, isGround varchar(256) DEFAULT NULL, Column1 varchar(100) DEFAULT NULL, insertTimestamp timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) ENGINE=InnoDB AUTO_INCREMENT=16901 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
SQL로 DB에 데이터 삽입 프로세서
mysql> select * from activeFlights;
+-------+-------------+---------------+------------+-------------+------------+-------------+---------+---------
| id | flight_date | flight_status | depAirport | depTimezone | arrAirport | arrTimezone | arrGate | airlineN
+-------+-------------+---------------+------------+-------------+------------+-------------+---------+---------
| 16901 | 2024-01-11 | schedu | | NULL | NULL | | NULL | empty
| 16902 | 2024-01-10 | schedu | | NULL | NULL | | NULL | Wizz Air
+-------+-------------+---------------+------------+-------------+------------+-------------+---------+---------
2 rows in set (0.01 sec)
한번에 삽입하면 파악하기 어려워서 하나씩 넣어봤다.