이전 글에서는 이벤트 서버에서 Spring Batch, Spring QuartZ를 사용하여 매일 새벽 2시 전체 데이터를 CSV 변환 후 S3에 업로드 하여 Kafka를 사용해 Search 서버로 메세지를 발송하는 과정을 다루었으며, 이번 글에서는 메세지 수신, S3에서 CSV 다운로드, Bulk API 형식 변환, 엘라스틱 서치 데이터 삽입을 진행하겠습니다.
IndexingConsumer.java
Search 서버는 Kafka 메시지를 수신한 후 색인 작업을 수행합니다.
@KafkaListener(
topics = "partial-indexing",
groupId = "partial-indexing-group"
)
public void handlePartialIndexing(String message) {
ObjectMapper mapper = new ObjectMapper();
try {
PartialIndexingMessage partialIndexingMessage = mapper.readValue(message, PartialIndexingMessage.class);
log.info("부분 색인 Kafka 메시지 수신: {}", partialIndexingMessage);
if (isFullIndexingInProgress()) {
log.info("전체 색인 작업 진행 중. 부분 색인 메시지를 대기 큐로 추가: {}", partialIndexingMessage);
queue.add(partialIndexingMessage); // 대기 큐에 추가
return;
}
processPartialIndexing(partialIndexingMessage); // 즉시 처리
} catch (JsonProcessingException e) {
log.error("Kafka 메시지 처리 중 오류 발생: {}", message, e);
throw new RuntimeException("Kafka 메시지 처리 실패", e);
}
}
주요 단계
- Kafka메시지 수신:
Kafka 리스너를 통해partial-indexing
토픽의 메시지를 수신하고, 수신된 메시지를 JSON 형식에서PartialIndexingMessage
객체로 변환합니다.- 전체 색인 진행 여부 확인:
isFullIndexingInProgress()
메서드를 호출하여 현재 전체 색인 작업이 진행 중인지 확인합니다.
- 전체 색인 진행 중일 경우: 메시지를 즉시 처리하지 않고 대기 큐(
queue
)에 추가합니다.- 전체 색인 중이 아닐 경우: 메시지를 기반으로 부분 색인 작업을 즉시 처리합니다.
Elasticsearch 인덱스 생성은 전체 색인 작업에서 중요한 부분으로, 새로운 데이터를 색인하기 위한 기본 구조를 정의합니다.
특히, Edge Ngram 기반 분석기를 사용하여 검색어 자동완성과 정밀한 검색 기능을 지원합니다.
private void createIndexWithEdgeNgram() {
try {
IndexSettings indexSettings = new IndexSettings.Builder()
.numberOfShards("5") // 샤드 개수
.numberOfReplicas("1") // 복제본 개수
.analysis(createAnalysis()) // Edge Ngram 분석기 설정
.index(new IndexSettings.Builder().maxNgramDiff(29).build()) // 최대 Ngram 차이
.build();
CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
.index(INDEX_NAME) // 생성할 인덱스 이름
.aliases(ALIAS_NAME, new Alias.Builder().isWriteIndex(false).build()) // Alias 설정
.settings(indexSettings) // 설정 적용
.mappings(createMappings()) // 매핑 설정
.build();
CreateIndexResponse createIndexResponse = elasticsearchClient.indices().create(createIndexRequest);
log.info("Edge Ngram 기반 인덱스 생성 완료: {}", createIndexResponse.index());
} catch (Exception e) {
log.error("인덱스 생성 실패: {}", e.getMessage(), e);
}
}
핵심 기능
- 분석기 설정: Edge Ngram 분석기를 사용하여 검색어 자동완성과 정확성을 제공합니다.
- 매핑 설정: 각 필드(
Title
,Genres
,Schedules
)의 데이터 타입을 정의합니다.- Alias 설정: 인덱스를
current
로 설정하여 항상 최신 데이터로 연결합니다.
IndexingService.java
이벤트 서버에서 Kafka 메시지가 여러 번 올 수 있습니다.
이를 처리하기 위해 initializeIndexing 메서드를 추가하여 하루에 한 번만 초기화 작업이 실행되도록 Redis를 사용해 제어합니다.
또한, S3에서 다운로드한 CSV 데이터를 Elasticsearch에 삽입하는 과정에서 Bulk API를 사용하여 대량 데이터를 효율적으로 처리하며,
CSV 데이터를 Elasticsearch Bulk JSON 형식으로 변환해 데이터의 정합성을 유지합니다.
public void handleFullIndexing(String payload) {
initializeIndexing(); // 하루에 한 번만 실행되는 초기화 작업
// 요청마다 실행되는 로직
processPayload(payload);
}
private void initializeIndexing() {
String today = LocalDate.now().toString();
String lastDate = redisTemplate.opsForValue().get(INITIALIZATION_KEY);
if (lastDate == null || !lastDate.equals(today)) {
synchronized (this) { // 동시성 제어
lastDate = redisTemplate.opsForValue().get(INITIALIZATION_KEY);
if (lastDate == null || !lastDate.equals(today)) {
try {
registerS3Repository(); // S3 저장소 설정
deleteExistingSnapshot(); // 기존 스냅샷 삭제
backupCurrentData(); // 현재 상태를 스냅샷으로 S3에 저장
deleteExistingData(); // 기존 데이터 삭제
createIndexIfNotExist(); // 인덱스 생성
redisTemplate.opsForValue().set(INITIALIZATION_KEY, today);
log.info("Redis 기반 인덱싱 초기화 작업이 완료되었습니다.");
} catch (Exception e) {
log.error("초기화 작업 중 오류 발생: {}", e.getMessage(), e);
throw new IllegalStateException("인덱싱 초기화에 실패했습니다.", e);
}
} else {
log.info("오늘은 이미 초기화 작업이 Redis에 의해 처리되었습니다.");
}
}
} else {
log.info("오늘은 이미 초기화 작업이 Redis에 의해 처리되었습니다.");
}
}
/**
* 주어진 S3 URL에서 CSV 파일을 다운로드하고, 이를 Elasticsearch에 삽입합니다.
*
* @param payload - S3에서 파일을 다운로드할 URL List
*/
public void processPayload(String payload) {
String[] s3UrlList = payload.split(",");
for (String s3Url : s3UrlList) {
String downloadPath = s3Utils.downloadFileWithRetry(s3Url); // S3에서 파일 다운로드
try (Stream<String> bulkJsonStream = csvToBulkApiConverter.convertCsvToBulkJsonStream(downloadPath, INDEX_NAME)) {
insertDataToElasticsearch(bulkJsonStream); // Elasticsearch에 데이터 삽입
} catch (Exception e) {
log.error("전체 색인 처리 중 오류 발생: {}", e.getMessage(), e);
} finally {
cleanUpDownloads(downloadPath);
}
}
}
주요 단계
- 메시지 수신:
handleFullIndexing
메서드를 통해 전체 색인 작업 요청을 처리하며, 전달된 payload를 사용해 색인 데이터를 처리합니다.- 초기화 작업 실행:
initializeIndexing
메서드를 호출하여 하루에 한 번만 실행되는 초기화 작업을 수행합니다.
- Redis를 사용하여 초기화 여부를 판단하고, 오늘 작업이 실행되지 않았을 경우 다음 작업을 수행합니다:
- S3 저장소 설정: Elasticsearch 스냅샷 리포지토리를 등록합니다.
- 기존 스냅샷 삭제: S3에 저장된 이전 스냅샷을 삭제합니다.
- 현재 상태 백업: 현재 Elasticsearch 데이터를 스냅샷으로 S3에 저장합니다.
- 기존 데이터 삭제: 기존 Elasticsearch 데이터를 삭제합니다.
- 인덱스 생성: 새로운 Elasticsearch 인덱스를 생성합니다.
- Redis에 초기화 완료 상태를 저장합니다.
- S3에서 CSV 다운로드 및 데이터 처리:
processPayload
메서드를 호출하여 주어진 payload에서 S3 URL 리스트를 처리합니다.
- 각 URL에 대해 CSV 파일을 S3에서 다운로드합니다.
- CSV를 Elasticsearch Bulk JSON으로 변환:
csvToBulkApiConverter
를 사용해 CSV 데이터를 Elasticsearch Bulk JSON 포맷으로 변환합니다.- Elasticsearch 데이터 삽입:
Bulk API를 사용하여 변환된 데이터를 Elasticsearch에 삽입합니다.