이번 글에서는 Elasticsearch 부분 색인 작업에 대해 다룹니다. 부분 색인은 데이터 변경 이벤트(생성, 수정, 삭제)가 발생했을 때, 해당 데이터를 Elasticsearch에 실시간으로 반영하는 작업입니다.
Kafka, Redis, MongoDB를 활용하여 변경된 데이터만 색인함으로써 데이터 정합성을 보장합니다.
부분 색인은 데이터가 변경될 때마다 Kafka 메시지를 통해 작업 요청이 전달되고, Search 서버에서 작업을 처리합니다. 아래는 주요 흐름입니다:
Event 서버
Search 서버
IndexingProducer.java
Event 서버에서는 데이터 변경 이벤트가 발생하면 Kafka 메시지를 생성하여 Search 서버로 전달합니다.
전달되는 메시지는 작업 데이터(payload
), 동작 타입(OperationType
)을 포함합니다.
public void sendIndexingMessage(Object payload, OperationType operationType) {
try {
String message = objectMapper.writeValueAsString(
new PartialIndexingMessage(payload, operationType)
);
kafkaTemplate.send(TOPIC_NAME, message);
log.info("Kafka로 메시지를 전송했습니다: {}", message);
} catch (JsonProcessingException e) {
log.error("색인 메시지 직렬화 실패. 데이터: {}, 오류: {}",
payload, e.getMessage());
throw new RuntimeException("Kafka로 메시지 전송 실패", e);
}
}
주요 단계
- 작업 데이터: 변경된 데이터(예: Event 정보, 삭제 ID 등)를 포함합니다.
- 동작 타입: 데이터 생성, 수정, 삭제 작업을 구분합니다.
IndexingConsumer.java
Kafka 메시지를 수신한 후, 작업을 분류하여 처리합니다. Redis 락을 활용해 작업 중복을 방지합니다.
@KafkaListener(
topics = "partial-indexing",
groupId = "partial-indexing-group"
)
public void handlePartialIndexing(String message) {
ObjectMapper mapper = new ObjectMapper();
try {
PartialIndexingMessage partialIndexingMessage = mapper.readValue(message, PartialIndexingMessage.class);
log.info("부분 색인 Kafka 메시지 수신: {}", partialIndexingMessage);
if (isFullIndexingInProgress()) {
log.info("전체 색인 작업 진행 중. 부분 색인 메시지를 대기 큐로 추가: {}", partialIndexingMessage);
queue.add(partialIndexingMessage); // 대기 큐에 추가
return;
}
processPartialIndexing(partialIndexingMessage); // 즉시 처리
} catch (JsonProcessingException e) {
log.error("Kafka 메시지 처리 중 오류 발생: {}", message, e);
throw new RuntimeException("Kafka 메시지 처리 실패", e);
}
}
private void processPartialIndexing(PartialIndexingMessage message) {
log.info("부분 색인 작업을 시작합니다. 작업 유형: {}", message.getOperationType());
String operationType = message.getOperationType();
switch (operationType) {
case "CREATE":
log.info("CREATE 작업을 처리 중입니다.");
indexingService.handlePartialIndexingCreate((Map<String, Object>) message.getPayload(), operationType);
break;
case "UPDATE":
log.info("UPDATE 작업을 처리 중입니다.");
indexingService.handlePartialIndexingUpdate((Map<String, Object>) message.getPayload(), operationType);
break;
case "DELETE":
log.info("DELETE 작업을 처리 중입니다.");
indexingService.handlePartialIndexingDelete((String) message.getPayload(), operationType);
break;
default:
log.warn("알 수 없는 작업 유형입니다: {}", operationType);
break;
}
log.info("부분 색인 작업이 완료되었습니다. 작업 유형: {}", message.getOperationType());
}
// 전체 색인 실행 상태 확인
private boolean isFullIndexingInProgress() {
return (lockingService.isLockAcquired(INDEXING_LOCK));
}
주요 단계
- Kafka 메시지 수신: 메시지를 파싱하여 작업 데이터를 추출합니다.
- Redis 락 사용: Redis를 활용해 작업 중복을 방지합니다.
- 작업 분류: 메시지의
OperationType
을 기반으로 작업을 분류(생성, 수정, 삭제).
MongoDB는 Elasticsearch 색인 작업 중 데이터 변경 사항을 안전하게 관리하기 위한 캐싱 및 추적 시스템으로 활용됩니다.
MongoDB의 주요 역할은 다음과 같습니다
작업 상태 저장:
indexed
)를 추적합니다.장애 복구 지원:
변경 데이터 기록:
색인 작업 데이터를 MongoDB에 저장하기 위한 엔티티는 @Document
어노테이션을 사용해 정의합니다.
이 클래스는 이벤트 데이터(EventId
, Title
등)와 작업 상태(operationType
, indexed
)를 포함합니다.
@Data
@Document(collection = "partial-indexing")
public class PartialIndexing {
@Id
private String id; // MongoDB 기본 _id 필드
@JsonProperty("EventId")
private String eventId;
@JsonProperty("Title")
private String title;
@JsonProperty("Poster_Url")
private String posterUrl;
@JsonProperty("Stage")
private String stage;
@JsonProperty("Location")
private String location;
@JsonProperty("Genres")
private List<Map<String, String>> genres;
@JsonProperty("Schedules")
private List<Map<String, String>> schedules;
@JsonProperty("Ticketing")
private LocalDateTime ticketing;
private OperationType operationType; // 작업 타입 (CREATE, UPDATE, DELETE)
private boolean indexed; // 색인 성공 여부
}
주요 필드 설명
operationType
: 작업의 유형(CREATE, UPDATE, DELETE).indexed
: 색인 작업 성공 여부를 나타내는 플래그.genres
,schedules
: 데이터 구조가 JSON 배열로 저장되므로 유연한 데이터 표현이 가능합니다.
MongoDB에 데이터를 저장하거나 조회하기 위해 ReactiveMongoRepository
를 사용합니다.
기본 CRUD 메서드 외에도 특정 필드 기반으로 데이터를 조회할 수 있는 커스텀 메서드를 정의할 수 있습니다.
import com.example.ficketsearch.domain.search.entity.PartialIndexing;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface IndexingRepository extends ReactiveMongoRepository<PartialIndexing, String> {
Mono<PartialIndexing> findByEventId(String eventId); // EventId로 데이터 조회
}
IndexingService.java
Search 서버는 MongoDB에 저장된 데이터를 기반으로 Elasticsearch 작업을 실행합니다.
public void handlePartialIndexingCreate(Map<String, Object> map, String operationType) {
try {
String eventId = (String) map.get("EventId");
PartialIndexing partialIndexing = objectMapper.convertValue(map, PartialIndexing.class);
partialIndexing.setOperationType(operationType);
partialIndexing.setIndexed(true);
indexingRepository.save(partialIndexing).doOnSuccess(saved -> {
try {
IndexRequest<Map<String, Object>> request = IndexRequest.of(builder ->
builder.index(INDEX_NAME).document(map));
IndexResponse response = elasticsearchClient.index(request);
log.info("Elasticsearch 문서 생성 성공: {}", response.result());
} catch (Exception e) {
log.error("Elasticsearch 색인 실패: {}", e.getMessage());
saved.setIndexed(false);
indexingRepository.save(saved).subscribe();
}
}).subscribe();
} catch (Exception e) {
log.error("문서 생성 실패: {}", e.getMessage(), e);
throw new RuntimeException("문서 생성 실패", e);
}
}
public void handlePartialIndexingUpdate(Map<String, Object> map, String operationType) {
try {
String eventId = (String) map.get("EventId");
PartialIndexing partialIndexing = objectMapper.convertValue(map, PartialIndexing.class);
partialIndexing.setOperationType(operationType);
indexingRepository.save(partialIndexing).doOnSuccess(saved -> {
try {
SearchRequest searchRequest = SearchRequest.of(builder ->
builder.index(INDEX_NAME).query(q -> q.term(t -> t.field("EventId").value(eventId))));
SearchResponse<Map> searchResponse = elasticsearchClient.search(searchRequest, Map.class);
if (!searchResponse.hits().hits().isEmpty()) {
String documentId = searchResponse.hits().hits().get(0).id();
IndexRequest<Map<String, Object>> indexRequest = IndexRequest.of(builder ->
builder.index(INDEX_NAME).id(documentId).document(map));
elasticsearchClient.index(indexRequest);
}
} catch (Exception e) {
log.error("문서 업데이트 실패: {}", e.getMessage(), e);
partialIndexing.setIndexed(false);
indexingRepository.save(partialIndexing).subscribe();
}
}).subscribe();
} catch (Exception e) {
log.error("문서 업데이트 실패: {}", e.getMessage(), e);
throw new RuntimeException("문서 업데이트 실패", e);
}
}
public void handlePartialIndexingDelete(String eventId, String operationType) {
try {
PartialIndexing partialIndexing = new PartialIndexing();
partialIndexing.setEventId(eventId);
partialIndexing.setOperationType(operationType);
indexingRepository.save(partialIndexing).doOnSuccess(saved -> {
try {
DeleteByQueryRequest request = DeleteByQueryRequest.of(builder ->
builder.index(INDEX_NAME).query(q -> q.term(t -> t.field("EventId").value(eventId))));
elasticsearchClient.deleteByQuery(request);
} catch (Exception e) {
log.error("문서 삭제 실패: {}", e.getMessage(), e);
saved.setIndexed(false);
indexingRepository.save(saved).subscribe();
}
}).subscribe();
} catch (Exception e) {
log.error("문서 삭제 실패: {}", e.getMessage(), e);
throw new RuntimeException("문서 삭제 실패", e);
}
}
ID | Event ID | Operation Type | Indexed | Location | Title | Genres | Ticketing Date | Poster URL | Schedules |
---|---|---|---|---|---|---|---|---|---|
6785fee1d711556d228b6672 | 10 | CREATE | True | 서울특별시 | 뮤지컬 〈틱틱붐〉 | 뮤지컬 | 2025-01-18T00:00:00Z | 링크 | 2025-01-30 ~ 2025-02-09 |
6785ff52d711556d228b6673 | 10 | UPDATE | True | 서울특별시 | 뮤지컬 〈틱틱붐〉 | 뮤지컬 | 2025-01-18T00:00:00Z | 링크 | 2025-01-30 ~ 2025-02-09 |
6785ff7ad711556d228b6674 | 10 | DELETE | True |
부분 색인 작업은 데이터 변경 이벤트를 실시간으로 Elasticsearch에 반영하여 시스템의 데이터 정합성을 유지합니다.
다음 글에서는 검색 기능 구현에 대해 다룰 예정입니다.
부분 색인 작업은 데이터 변경 이벤트를 실시간으로 Elasticsearch에 반영하여 시스템의 데이터 정합성을 유지합니다.
다음 글에서는 검색 기능 구현에 대해 다룰 예정입니다.