[Elasticsearch] 인덱스 재 설계와 벌크 인덱싱을 통한 처리 대규모 전자상거래 로그 시스템 성능 시키기

궁금하면 500원·2025년 1월 3일

데이터 저장하기

목록 보기
14/29

1. 문제 상황

저희 연구비 업무지원 시스템은 매일 약 500만 건의 트랜잭션 로그를 실시간으로 처리하고 있습니다.
이 로그들은 시스템 로그, 사용자 행동 로그, 결제 로그 등 다양한 형태로 Elasticsearch에 저장되어 분석됩니다.
그러나 일정 시간이 지난 후 시스템에서 다음과 같은 심각한 성능 문제들이 발생하기 시작했습니다.

  • 피크 시간대(오전 10시 ~ 오후 2시) 로그 저장 지연: 로그가 실시간으로 처리되지 않아, 데이터 저장 지연이 발생하고 서비스 지연을 유발.

  • 검색 쿼리 응답 시간 증가: 사용자들이 쿼리를 실행할 때, 평균 3초에서 8초로 응답 시간이 크게 증가.

  • 디스크 사용량 급증: 매월 디스크 사용량이 20% 이상 증가하여 자원의 비효율적인 사용.

  • OOM(Out of Memory) 발생: 특정 인덱스에서 메모리 부족 현상이 자주 발생하여 시스템 안정성 저하.

2. 문제 분석

인덱스 설계 분석

현재 사용 중인 인덱스의 설계는 기본적인 필드들로 구성되어 있었고, 그 중에서 request_body, response_body 필드가 검색에 필요하지 않음에도 불구하고 인덱스에 포함되어 있어, 디스크와 메모리 리소스를 비효율적으로 소비하고 있었습니다.

특히, response_body 필드는 검색이 불필요한 큰 텍스트 데이터로, 해당 필드를 인덱싱하는 것 자체가 리소스 낭비였으며, 메모리 소비와 쿼리 성능 저하를 일으키고 있었습니다.

성능 병목 지점 분석

Elasticsearch의 Stats API를 통해 시스템 성능을 분석한 결과, 다음과 같은 병목 지점들을 발견할 수 있었습니다.

  • 불필요한 필드 인덱싱: 로그 데이터를 검색할 때 실질적으로 필요한 필드만 인덱싱되지 않고, 전혀 사용되지 않는 필드들까지 인덱싱되고 있었습니다.

  • 샤드 크기 불균형: 모든 데이터를 하나의 인덱스에 저장하면서, 인덱스 크기가 커지면서 샤드의 크기가 불균형해졌습니다.

  • 벌크 인덱싱 성능 부족: 로그 데이터를 실시간으로 저장하는 과정에서 벌크 인덱싱 처리 최적화가 부족하여, 한 번에 너무 많은 데이터를 처리하면서 시스템 자원 소모가 커졌습니다.

3. 해결책

3.1 인덱스 재설계

우리는 Elasticsearch의 인덱스 설계를 다음과 같이 최적화하였습니다.

  • 불필요한 필드 인덱싱 제거: 검색에 불필요한 request_body와 response_body 필드를 keyword 타입으로 변경하여, 검색할 필요가 없는 데이터를 저장만 하도록 설정.

  • 데이터 타입 최적화: 텍스트 필드들에 대해 norms를 비활성화하거나, 검색이 필요한 필드는 keyword 타입으로 설정하여 효율성을 높임.

  • 샤드 크기 및 리프레시 설정 최적화: 인덱스 리프레시 주기를 늘려주어 디스크 I/O 부하를 감소시키고, 샤드 크기를 균형 있게 설정하여 성능을 개선.

@Component
@Slf4j
public class ElasticsearchIndexInitializer {
    
    private final RestHighLevelClient elasticsearchClient;
    
    @Value("${elasticsearch.index.shards:5}")
    private int numberOfShards;
    
    @Value("${elasticsearch.index.replicas:1}")
    private int numberOfReplicas;
    
    public void createOptimizedIndex(String indexName) throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.startObject("settings");
            {
                builder.field("number_of_shards", numberOfShards);
                builder.field("number_of_replicas", numberOfReplicas);
                builder.startObject("index");
                {
                    builder.field("refresh_interval", "30s");
                    builder.field("number_of_routing_shards", numberOfShards * 2);
                }
                builder.endObject();
            }
            builder.endObject();
            
            builder.startObject("mappings");
            {
                builder.startObject("properties");
                {
                    builder.startObject("log_type")
                            .field("type", "keyword")
                            .endObject();
                    
                    builder.startObject("timestamp")
                            .field("type", "date")
                            .field("format", "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
                            .endObject();
                    
                    builder.startObject("message")
                            .field("type", "text")
                            .field("norms", false)
                            .endObject();
                    
                    builder.startObject("response_body")
                            .field("type", "keyword")
                            .field("index", false)
                            .field("doc_values", false)
                            .endObject();
                }
                builder.endObject();
            }
            builder.endObject();
        }
        builder.endObject();
        
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        request.source(builder);
        
        CreateIndexResponse createIndexResponse = 
            elasticsearchClient.indices().create(request, RequestOptions.DEFAULT);
        
        if (createIndexResponse.isAcknowledged()) {
            log.info("Index {} created successfully", indexName);
        }
    }
}

3.2 벌크 인덱싱 최적화

벌크 인덱싱을 최적화하여 데이터 처리 성능을 크게 향상시켰습니다.
주요 개선 사항은 다음과 같습니다.

  • 최적의 벌크 크기 설정: 한 번에 처리할 데이터 양을 최적화하여 인덱싱 성능을 개선.
  • 타임아웃 및 재시도 로직 구현: 네트워크 장애나 일시적인 오류를 대비한 타임아웃과 재시도 로직 추가.
  • 비동기 처리: 여러 벌크 요청을 비동기적으로 처리하여 병목 현상 해결.
@Service
@Slf4j
public class OptimizedBulkIndexService {
    
    private final RestHighLevelClient elasticsearchClient;
    private final ObjectMapper objectMapper;
    
    @Value("${elasticsearch.bulk.size:1000}")
    private int bulkSize;
    
    @Value("${elasticsearch.bulk.concurrent.requests:3}")
    private int concurrentRequests;
    
    public void bulkIndex(List<LogEvent> logEvents, String indexName) {
        int totalDocuments = logEvents.size();
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger failCount = new AtomicInteger(0);
        
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueMinutes(2));
        
        for (LogEvent logEvent : logEvents) {
            try {
                IndexRequest indexRequest = new IndexRequest(indexName)
                    .source(objectMapper.writeValueAsString(logEvent), XContentType.JSON);
                bulkRequest.add(indexRequest);
                
                if (bulkRequest.numberOfActions() >= bulkSize) {
                    executeBulkRequest(bulkRequest, successCount, failCount);
                    bulkRequest = new BulkRequest();
                }
            } catch (Exception e) {
                log.error("Error processing document: {}", e.getMessage());
                failCount.incrementAndGet();
            }
        }
        
        if (bulkRequest.numberOfActions() > 0) {
            executeBulkRequest(bulkRequest, successCount, failCount);
        }
        
        log.info("Bulk indexing completed - Total: {}, Success: {}, Failed: {}", 
                totalDocuments, successCount.get(), failCount.get());
    }
    
    private void executeBulkRequest(BulkRequest bulkRequest, 
                                  AtomicInteger successCount, 
                                  AtomicInteger failCount) {
        try {
            BulkResponse bulkResponse = 
                elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            
            if (bulkResponse.hasFailures()) {
                log.warn("Bulk request has failures: {}", 
                        bulkResponse.buildFailureMessage());
                
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    if (bulkItemResponse.isFailed()) {
                        failCount.incrementAndGet();
                    } else {
                        successCount.incrementAndGet();
                    }
                }
            } else {
                successCount.addAndGet(bulkRequest.numberOfActions());
            }
        } catch (IOException e) {
            log.error("Failed to execute bulk request: {}", e.getMessage());
            failCount.addAndGet(bulkRequest.numberOfActions());
        }
    }
}

4. 성능 개선 결과

1. 인덱싱 성능

  • 개선 전: 초당 처리 문서 수 3,000건, 평균 지연 시간 800ms, 실패율 0.5%
  • 개선 후: 초당 처리 문서 수 12,000건, 평균 지연 시간 150ms, 실패율 0.01%

2. 검색 성능

  • 평균 검색 응답 시간: 8초 → 0.8초 (90% 개선)
  • 쿼리 처리 속도: 10배 이상 향상

3. 자원 최적화

  • 디스크 사용량: 30% 절감
  • 메모리 사용: 40% 절감

5. 결론

Elasticsearch의 인덱스 설계와 벌크 인덱싱 최적화를 통해 대규모 로그 시스템의 성능을 크게 개선할 수 있었습니다.

불필요한 필드의 인덱싱을 제거하고, 샤드 크기 및 리프레시 설정을 최적화함으로써 성능 저하 문제를 해결할 수 있었으며, 벌크 인덱싱 최적화로 데이터 처리 속도를 대폭 향상시켰습니다.

profile
그냥 코딩할래요 재미있어요

0개의 댓글