ELK 공부 (2/3)

KIMAKUMA·2023년 4월 11일

ELK

목록 보기
2/7
post-thumbnail

Part 1

01. NIFI 특징

  • 시스템 간 데이터 전달을 효율적으로 처리, 관리, 모니터링하기 위한 시스템
  • DataFlow를 쉽게 개발할 수 있고, 시스템 간의 데이터 이동과 내용을 볼 수 있는 UI 제공
  • 실시간 데이터 전송에 필요한 유용한 기능을 제공

02. NIFI 개념

  • FlowFile(FBP Information Packet)
    • NIFI 데이터 단위
    • 파일명, UUID, 속성, 데이터로 구성
    • 속성에 대한 CRUD 가능
    • 해당 값들에 대해 통합, 분기, 필터 등 다양한 처리 가능
  • Processor(FBP Black Box)
    • 작업을 진행하는 모듈
    • FlowFile의 생성, 제거, 수정, 검사 등 기능 제공
    • 150개 이상의 Processor 지원하고 사용자가 직접 정의하여 사용 가능
  • Connection(FBP Bounded Buffer)
    • 각 Processor 들을 연결해주는 역할
    • Queue가 담길 공간 제공
  • Process Group
    • 특정 업무, 기능 단위로 여러 Processor 그룹화
    • Input과 Output 포트를 제공해 Process Group 간의 데이터 이동 가능

03. NIFI 구조

  • JVM 위에서 동작
  • Web Server
    • UI를 통해 쉽게 NIFI API를 사용할 수 있도록 지원
  • Flow Controller(FBP Scheduler)
    • Processer에 스레드를 할당하는 등의 스케줄링 담당
  • FlowFile Repository
    • 현재 활성화된 FlowFile 상태 및 속성을 저장하는 저장소
    • Write-Ahead-Log 방식 사용
      • 데이터 무결성을 보장하는 표준 방법
      • 변경 내용을 로그 저장소에 먼저 기록 후 파일 또는 데이터베이스에 변경 내용을 기록
      • 충돌 발생 시 로그를 통해 복구 가능
  • Content Repository
    • FlowFile에 Content(데이터)를 저장하는 저장소
    • 여러 디렉토리에 분산 저장 가능
    • 용량이 큰 데이터 저장 및 단일 디스크의 처리량 보다 많은 양 처리 가능
  • Provenance Repository
    • 데이터의 처리 단계의 FlowFile 변화 데이터를 저장하는 저장소
    • 여러 디스크 지원, 각 데이터는 인덱스가 되어 검색 가능

Part 2

01. 자주 쓰는 프로세서

GenerateFlowFile

  • 새로운 FlowFile 생성
  • 사이즈, 데이터 형식, 문자 형식 등 설정

InvokeHTTP

  • 속성에서 지정한 URL과 HTTP 통신
  • GET method 를 통해 가지고 온 데이터는 FlowFIle content 영역에 저장

ExtractText

  • FlowFile Content 영역 데이터 추출

ExecuteSQL

  • DB query 수행하여 Avro 형식으로 결과 반환
  • Max Rows Per Flow File: FlowFile에 포함될 최대 결과 행의 수
  • Fetch Size: 한 번에 가져올 결과 행의 수

ConvertAvroToJSON

  • Avro 형식을 JSON 형식으로 변환

SplitJson

  • 지정한 JSON path 형식으로 FlowFile을 분리

PutElasticsearchHttp

  • FlowFile를 지정한 ElasticSearch url로 전송
  • type: _doc
  • 증분의 경우) Index Opration: delete 등 설정

RouteOnAttribute

  • FlowFile 속성 값에 따라 라우팅

ReplaceText

  • FlowFile Content를 원하는 형태 또는 형식으로 변경

UpdateAttribute

  • FlowFile에 속성 추가 및 변경

MergeContent

  • FlowFile Content 병합

JoltTransformJSON

ExecuteScript

  • Processor Session과 FlowFile에 script 코드 실행
  • Properties에서 Script Engine과 수행할 Script 지정
  • 아래는 HTML 태그를 제거하는 script 코드 예시
var flowFile = session.get();

if (flowFile !== null) {

  var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
  var IOUtils = Java.type("org.apache.commons.io.IOUtils");
  var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");

  function transformText(text) {
    return text
	  .replace(/\<script>.*\<\/script>/gi, " ")
      .replace(/&#39;/gi, "'")
      .replace(/&lt;/gi, "<")
      .replace(/&gt;/gi, ">")
      .replace(/(<([^>]+)>)/gi, " ")
      .replace(/&amp;/gi, "&")
      .replace(/&nbsp;/gi, " ")
      .replace(/&quot;/gi, "\"")
      .replace(/\s+/gi, " ")
      .trim()
  }

  flowFile = session.write(flowFile, new StreamCallback(function(inputStream, outputStream) {
    var inputText = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    var inputObj = JSON.parse(inputText);

    var outputObj = inputObj;

    outputObj.content = outputObj.content ? transformText(outputObj.content) : "";

    outputStream.write(JSON.stringify(outputObj, null, "\t").getBytes(StandardCharsets.UTF_8));
  }));

  session.transfer(flowFile, REL_SUCCESS);
}

EvaluateJsonPath

  • FlowFile의 Attribute or Content에 FlowFile의 데이터 전달

    ex) SQL문을 통해 id란 값을 얻었을 때, flowfile-attribute를 지정한 후 ID란 변수에 id 값 전달

0개의 댓글