Part 1
01. NIFI 특징
- 시스템 간 데이터 전달을 효율적으로 처리, 관리, 모니터링하기 위한 시스템
- DataFlow를 쉽게 개발할 수 있고, 시스템 간의 데이터 이동과 내용을 볼 수 있는 UI 제공
- 실시간 데이터 전송에 필요한 유용한 기능을 제공
02. NIFI 개념
- FlowFile(FBP Information Packet)
- NIFI 데이터 단위
- 파일명, UUID, 속성, 데이터로 구성
- 속성에 대한 CRUD 가능
- 해당 값들에 대해 통합, 분기, 필터 등 다양한 처리 가능
- Processor(FBP Black Box)
- 작업을 진행하는 모듈
- FlowFile의 생성, 제거, 수정, 검사 등 기능 제공
- 150개 이상의 Processor 지원하고 사용자가 직접 정의하여 사용 가능
- Connection(FBP Bounded Buffer)
- 각 Processor 들을 연결해주는 역할
- Queue가 담길 공간 제공
- Process Group
- 특정 업무, 기능 단위로 여러 Processor 그룹화
- Input과 Output 포트를 제공해 Process Group 간의 데이터 이동 가능
03. NIFI 구조

- JVM 위에서 동작
- Web Server
- UI를 통해 쉽게 NIFI API를 사용할 수 있도록 지원
- Flow Controller(FBP Scheduler)
- Processer에 스레드를 할당하는 등의 스케줄링 담당
- FlowFile Repository
- 현재 활성화된 FlowFile 상태 및 속성을 저장하는 저장소
- Write-Ahead-Log 방식 사용
- 데이터 무결성을 보장하는 표준 방법
- 변경 내용을 로그 저장소에 먼저 기록 후 파일 또는 데이터베이스에 변경 내용을 기록
- 충돌 발생 시 로그를 통해 복구 가능
- Content Repository
- FlowFile에 Content(데이터)를 저장하는 저장소
- 여러 디렉토리에 분산 저장 가능
- 용량이 큰 데이터 저장 및 단일 디스크의 처리량 보다 많은 양 처리 가능
- Provenance Repository
- 데이터의 처리 단계의 FlowFile 변화 데이터를 저장하는 저장소
- 여러 디스크 지원, 각 데이터는 인덱스가 되어 검색 가능
Part 2
01. 자주 쓰는 프로세서
GenerateFlowFile
- 새로운 FlowFile 생성
- 사이즈, 데이터 형식, 문자 형식 등 설정
InvokeHTTP
- 속성에서 지정한 URL과 HTTP 통신
- GET method 를 통해 가지고 온 데이터는 FlowFIle content 영역에 저장
- FlowFile Content 영역 데이터 추출
ExecuteSQL
- DB query 수행하여 Avro 형식으로 결과 반환
- Max Rows Per Flow File: FlowFile에 포함될 최대 결과 행의 수
- Fetch Size: 한 번에 가져올 결과 행의 수
ConvertAvroToJSON
SplitJson
- 지정한 JSON path 형식으로 FlowFile을 분리
PutElasticsearchHttp
- FlowFile를 지정한 ElasticSearch url로 전송
- type: _doc
- 증분의 경우) Index Opration: delete 등 설정
RouteOnAttribute
ReplaceText
- FlowFile Content를 원하는 형태 또는 형식으로 변경
UpdateAttribute
MergeContent
ExecuteScript
- Processor Session과 FlowFile에 script 코드 실행
- Properties에서 Script Engine과 수행할 Script 지정
- 아래는 HTML 태그를 제거하는 script 코드 예시
var flowFile = session.get();
if (flowFile !== null) {
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
function transformText(text) {
return text
.replace(/\<script>.*\<\/script>/gi, " ")
.replace(/'/gi, "'")
.replace(/</gi, "<")
.replace(/>/gi, ">")
.replace(/(<([^>]+)>)/gi, " ")
.replace(/&/gi, "&")
.replace(/ /gi, " ")
.replace(/"/gi, "\"")
.replace(/\s+/gi, " ")
.trim()
}
flowFile = session.write(flowFile, new StreamCallback(function(inputStream, outputStream) {
var inputText = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var inputObj = JSON.parse(inputText);
var outputObj = inputObj;
outputObj.content = outputObj.content ? transformText(outputObj.content) : "";
outputStream.write(JSON.stringify(outputObj, null, "\t").getBytes(StandardCharsets.UTF_8));
}));
session.transfer(flowFile, REL_SUCCESS);
}
EvaluateJsonPath
- FlowFile의 Attribute or Content에 FlowFile의 데이터 전달
ex) SQL문을 통해 id란 값을 얻었을 때, flowfile-attribute를 지정한 후 ID란 변수에 id 값 전달
