
저희 연구비 업무지원 시스템은 매일 약 500만 건의 트랜잭션 로그를 실시간으로 처리하고 있습니다.
이 로그들은 시스템 로그, 사용자 행동 로그, 결제 로그 등 다양한 형태로 Elasticsearch에 저장되어 분석됩니다.
그러나 일정 시간이 지난 후 시스템에서 다음과 같은 심각한 성능 문제들이 발생하기 시작했습니다.
피크 시간대(오전 10시 ~ 오후 2시) 로그 저장 지연: 로그가 실시간으로 처리되지 않아, 데이터 저장 지연이 발생하고 서비스 지연을 유발.
검색 쿼리 응답 시간 증가: 사용자들이 쿼리를 실행할 때, 평균 3초에서 8초로 응답 시간이 크게 증가.
디스크 사용량 급증: 매월 디스크 사용량이 20% 이상 증가하여 자원의 비효율적인 사용.
OOM(Out of Memory) 발생: 특정 인덱스에서 메모리 부족 현상이 자주 발생하여 시스템 안정성 저하.
현재 사용 중인 인덱스의 설계는 기본적인 필드들로 구성되어 있었고, 그 중에서 request_body, response_body 필드가 검색에 필요하지 않음에도 불구하고 인덱스에 포함되어 있어, 디스크와 메모리 리소스를 비효율적으로 소비하고 있었습니다.
특히, response_body 필드는 검색이 불필요한 큰 텍스트 데이터로, 해당 필드를 인덱싱하는 것 자체가 리소스 낭비였으며, 메모리 소비와 쿼리 성능 저하를 일으키고 있었습니다.
Elasticsearch의 Stats API를 통해 시스템 성능을 분석한 결과, 다음과 같은 병목 지점들을 발견할 수 있었습니다.
불필요한 필드 인덱싱: 로그 데이터를 검색할 때 실질적으로 필요한 필드만 인덱싱되지 않고, 전혀 사용되지 않는 필드들까지 인덱싱되고 있었습니다.
샤드 크기 불균형: 모든 데이터를 하나의 인덱스에 저장하면서, 인덱스 크기가 커지면서 샤드의 크기가 불균형해졌습니다.
벌크 인덱싱 성능 부족: 로그 데이터를 실시간으로 저장하는 과정에서 벌크 인덱싱 처리 최적화가 부족하여, 한 번에 너무 많은 데이터를 처리하면서 시스템 자원 소모가 커졌습니다.
우리는 Elasticsearch의 인덱스 설계를 다음과 같이 최적화하였습니다.
불필요한 필드 인덱싱 제거: 검색에 불필요한 request_body와 response_body 필드를 keyword 타입으로 변경하여, 검색할 필요가 없는 데이터를 저장만 하도록 설정.
데이터 타입 최적화: 텍스트 필드들에 대해 norms를 비활성화하거나, 검색이 필요한 필드는 keyword 타입으로 설정하여 효율성을 높임.
샤드 크기 및 리프레시 설정 최적화: 인덱스 리프레시 주기를 늘려주어 디스크 I/O 부하를 감소시키고, 샤드 크기를 균형 있게 설정하여 성능을 개선.
@Component
@Slf4j
public class ElasticsearchIndexInitializer {
private final RestHighLevelClient elasticsearchClient;
@Value("${elasticsearch.index.shards:5}")
private int numberOfShards;
@Value("${elasticsearch.index.replicas:1}")
private int numberOfReplicas;
public void createOptimizedIndex(String indexName) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.startObject("settings");
{
builder.field("number_of_shards", numberOfShards);
builder.field("number_of_replicas", numberOfReplicas);
builder.startObject("index");
{
builder.field("refresh_interval", "30s");
builder.field("number_of_routing_shards", numberOfShards * 2);
}
builder.endObject();
}
builder.endObject();
builder.startObject("mappings");
{
builder.startObject("properties");
{
builder.startObject("log_type")
.field("type", "keyword")
.endObject();
builder.startObject("timestamp")
.field("type", "date")
.field("format", "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
.endObject();
builder.startObject("message")
.field("type", "text")
.field("norms", false)
.endObject();
builder.startObject("response_body")
.field("type", "keyword")
.field("index", false)
.field("doc_values", false)
.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.source(builder);
CreateIndexResponse createIndexResponse =
elasticsearchClient.indices().create(request, RequestOptions.DEFAULT);
if (createIndexResponse.isAcknowledged()) {
log.info("Index {} created successfully", indexName);
}
}
}
벌크 인덱싱을 최적화하여 데이터 처리 성능을 크게 향상시켰습니다.
주요 개선 사항은 다음과 같습니다.
@Service
@Slf4j
public class OptimizedBulkIndexService {
private final RestHighLevelClient elasticsearchClient;
private final ObjectMapper objectMapper;
@Value("${elasticsearch.bulk.size:1000}")
private int bulkSize;
@Value("${elasticsearch.bulk.concurrent.requests:3}")
private int concurrentRequests;
public void bulkIndex(List<LogEvent> logEvents, String indexName) {
int totalDocuments = logEvents.size();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueMinutes(2));
for (LogEvent logEvent : logEvents) {
try {
IndexRequest indexRequest = new IndexRequest(indexName)
.source(objectMapper.writeValueAsString(logEvent), XContentType.JSON);
bulkRequest.add(indexRequest);
if (bulkRequest.numberOfActions() >= bulkSize) {
executeBulkRequest(bulkRequest, successCount, failCount);
bulkRequest = new BulkRequest();
}
} catch (Exception e) {
log.error("Error processing document: {}", e.getMessage());
failCount.incrementAndGet();
}
}
if (bulkRequest.numberOfActions() > 0) {
executeBulkRequest(bulkRequest, successCount, failCount);
}
log.info("Bulk indexing completed - Total: {}, Success: {}, Failed: {}",
totalDocuments, successCount.get(), failCount.get());
}
private void executeBulkRequest(BulkRequest bulkRequest,
AtomicInteger successCount,
AtomicInteger failCount) {
try {
BulkResponse bulkResponse =
elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
log.warn("Bulk request has failures: {}",
bulkResponse.buildFailureMessage());
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
failCount.incrementAndGet();
} else {
successCount.incrementAndGet();
}
}
} else {
successCount.addAndGet(bulkRequest.numberOfActions());
}
} catch (IOException e) {
log.error("Failed to execute bulk request: {}", e.getMessage());
failCount.addAndGet(bulkRequest.numberOfActions());
}
}
}

Elasticsearch의 인덱스 설계와 벌크 인덱싱 최적화를 통해 대규모 로그 시스템의 성능을 크게 개선할 수 있었습니다.
불필요한 필드의 인덱싱을 제거하고, 샤드 크기 및 리프레시 설정을 최적화함으로써 성능 저하 문제를 해결할 수 있었으며, 벌크 인덱싱 최적화로 데이터 처리 속도를 대폭 향상시켰습니다.