NiFi to Snowflake Internal Stage
- NiFi는 (1.24.0 버전 기준으로) Snowflake 내부 스테이지와의 연결을 지원하지 않습니다.
- 따라서,
이전 작업으로 Snowflake 관련 Custom Processor를 설치하여 적용했습니다.
- 이번에는,
해당 Processor가 잘 동작하는 지 확인하면서 내부 스테이지에 데이터를 넣어봅시다.
이전 페이지에서 적용한 프로세서 중,
내부 스테이지에 데이터를 넣는 프로세서인 PutSnowflake를 생성한다.
- JDBC Connection Pool
- Input Directory
- 내부 스테이지로 삽입할 데이터를 가져오는 디렉토리 설정
- File Name
- NiFi의 Expression Language를 사용하여 해당 파일이 플로우 파일과 동일한 이름을 가지고 있다는 사실을 기반으로 파일 이름을 동적으로 가져올 수 있습니다.
- {$filename}
- 이 의미는 filename으로 명명된 FlowFile attribute 사용한다.
- Internal Stage
- Snowflake의 Internal Stage 설정
문제 발생
PutSnowflake 프로세서는
분명히InputDirectory
를 속성으로 갖고 있음에도,
단독으로 동작시킬 수 없다.
해당 프로세서로
Upstream
동작을 연결해야지 동작하도록 설계되었다.아마, 내부 스테이지는 다른 플랫폼에서 가져온 데이터를
Local에 저장한 후 Local에서 Stage로 Loading할 때 사용하기 때문에 이렇게 설계한 것으로 예상한다.
따라서,
Local to Local로 파일을 보낸 후 Internal Stage로 보내는 방안으로 설계했다.
지금은 간단히 Local To Local이지만,
- 데이터를 크롤링 후, Local에 저장된 전처리된 데이터
- DB에서 가져온 csv 또는 Json 파일
등등 상황을 가정할 수 있다.
이전과 같은 작업이므로 간략히 정리한다.
# SYSADMIN 역할 설정
USE ROLE SYSADMIN;
# COMPUTE_WH 웨어하우스 설정
USE WAREHOUSE COMPUTE_WH;
# DataBase 생성
CREATE OR REPLACE DATABASE TEST4
COMMENT = 'Custom TEST';
# Database 사용
USE DATABASE TEST4;
# Named Stage 생성 (내부 스테이지)
CREATE OR REPLACE STAGE custom
file_format = (type = 'CSV' FIELD_DELIMITERTEST4.PUBLIC.CUSTOM = ',' SKIP_HEADER = 1);
# 스테이지 생성 확인
LIST @custom;
# DB 테이블 생성
CREATE OR REPLACE TABLE TEST4.PUBLIC.CUST
(ID int,
NAME varchar(32)
);
# 스테이지에서 DB 테이블로 전송
// COPY INTO 시, Stage에 데이터 남아있음.
COPY INTO CUST FROM @custom;
// COPY INTO 시, Stage 데이터 삭제.
COPY INTO CUST FROM @custom PURGE = TRUE;
# 결과 확인
SELECT * FROM CUST;
NiFI 동작 확인
Internal Stage Loading 확인
COPY INTO 후, 데이터 확인