생각해보니 Local To Snowflake는 해봤는데,
Local에서 DB로 Loading을 해보지 않아서 생각난 김에 진행.
- 또한 바로 CSV TO Json을 포함한 실습이다.
- 대부분 이전에 사용했던 프로세서들이다.
- 앞으로는 각 프로세서의 동작을 파악하기 쉽게 이름을 수정하려고 한다.
Local에서 File을 가져오는 프로세서
- 이전에 사용했던 프로세서이므로 익숙할 것
Input Directory
속성만 설정하면 된다.
- 파일을 가져올 디렉토리 경로 설정.
텍스트를 여러 개의 작은 텍스트 파일로 분할하는 프로세서
- Line Split Count
- 헤더 행을 제외하고 각 분할 파일에 추가될 행 수.
- 해당 실습은 한 줄 씩 분할할 것이므로
1
로 설정
- Maximum Fragment Size
- 헤더 행을 포함한 각 분할 파일의 최대 크기.
- Header Line Count
- 헤더의 일부로 간주되어야 하는 행 수.
- Header Line Marker Characters
- 헤더 라인을 나타내는 데이터 파일 라인의 첫 번째 문자.
- Remove Trailing Newlines
- 각 분할 파일의 끝에서 줄 바꿈을 제거할지 여부.
Flowfile의 속성을 변경하는 프로세서
- 이름이 다른 건, 사용자가 용도에 맞게 수정했기 때문이다.
- 따라서 당황하지 않아도 된다.
- UpdateAttribute 관하여 이전 정리
- 해당 프로세서를 활용한다면, 매우 복잡하고 다양하기 때문에 추후에 따로 정리할 필요성을 느낌.
일단, 목적은 Flowfile에 Schema Name을 추가하는 것이므로
위와 같이 설정해준다.
- 속성 추가 방법은 마찬가지로
+
를 클릭하여 추가한다.
Record를 변환하는 새로운 Processor
- Record Reader
- 들어오는 데이터를 읽는 데 사용할
컨트롤러 서비스
를 지정한다.- CSV 파일을 읽을 것이기 때문에
CSVReader
로 지정- Record Writer
- 레코드 작성에 사용할
컨트롤러 서비스
를 지정한다.- CSV를 Json으로 변환할 것이기 때문에
JsonRecordSetWriter
로 지정- Include Zero Record FlowFile
- 들어오는 FlowFile을 변환할 때 변환 결과 데이터가 없는 경우,
이 속성은 해당 관계에 FlowFile을 보낼지 여부를 지정한다.
Controller Service
먼저!
CSV TO Json
실습과 설정이 다르므로 주의한다.
- AvroSchemaRegistry
- 스키마 등록 및 접근을 위한 서비스를 제공한다.
name
:
- 스키마 이름
value
:
- 실제 스키마의 텍스트 표현을 나타내는 동적 속성으로 스키마를 등록
- CSVReader
- Schema Access Strategy
- 데이터 해석에 사용할 스키마를 얻는 방법을 지정
- 먼저, Schema Access Strategy를 `Use 'Schema Name' Property로 변경한다.
- Schema Registry
- 스키마 레지스트리에 사용할 컨트롤러 서비스를 지정
- 바로 직전에 생성한 AvroSchemaRegistry를 Schema Registry에 등록한다.
- JsonRecordSetWriter
- 레코드의 스키마를 데이터에 추가하는 방법을 지정
- Schema Access Strategy
- 데이터 해석에 사용할 스키마를 얻는 방법을 지정
- 먼저, Schema Access Strategy를 `Use 'Schema Name' Property로 변경한다.
- Schema Registry
- 스키마 레지스트리에 사용할 컨트롤러 서비스를 지정
- 바로 직전에 생성한 AvroSchemaRegistry를 Schema Registry에 등록한다.
- 따라서, Controller Service를 생성하고 활성화해주면 된다.
Log를 띄우는 프로세서
- 프로세서 이름에서 확인하듯이, CSV Header를 걸러내기 위함이다.
- 데이터베이스 스키마가
ID (int)
,Name (string)
이므로
해당 데이터 형태가 아니라면 여기서 Filtering 된다.- (Header는 보통 string, string
---
형식이기 때문에 걸러진다.)
- Record Reader
- JsonTreeReader를 생성해서 그대로 적용해준다.
- JsonTree 형식의 데이터를 읽기 위함이다.
- CSV To JSON으로 변환되어 적용되기 때문이다.
- D.C.P.S
- MySQL과 연결하므로, 이전에 만들어둔 ConnectionPool를 설정한다.
성공 Log를 띄우는 프로세서
MySQL 데이터베이스 TEST2 테이블 비어있는 것을 확인
실습할 간단한 csv 파일 확인
LogMessage
2024-01-05 13:37:34,641 INFO [Timer-Driven Process Thread-3] o.a.nifi.processors.standard.LogMessage LogMessage[id=d735a963-018c-1000-37cf-ac0e0a8aa1cc] ^^^^^^^^^^^^^^HEADER^^^^^^^^^^^^^^LO2DB 2024-01-05 13:37:34,651 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,678 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,697 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,697 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,710 INFO [Timer-Driven Process Thread-8] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,710 INFO [Timer-Driven Process Thread-8] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,710 INFO [Timer-Driven Process Thread-8] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,734 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,734 INFO [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,748 INFO [Timer-Driven Process Thread-9] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,748 INFO [Timer-Driven Process Thread-9] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB 2024-01-05 13:37:34,760 INFO [Timer-Driven Process Thread-1] o.a.nifi.processors.standard.LogMessage LogMessage[id=d6fc98a1-018c-1000-d0ec-ea77aa0de1d3] ****************************SUCCESS****************************LO2DB
How to read data from local and store it into MySQL table in NiFi