Spring Batch와 Kafka를 활용한 로그 데이터 처리 성능 최적화 사례

문제 정의

Spring Batch와 Kafka를 연동하여 로그 데이터를 처리하는 시스템을 구현하던 중, 처리 성능이 예상보다 현저히 낮은 문제가 발생하였습니다.

“LogGenerator”를 통해 대량의 로그 데이터를 Kafka Topic으로 보내고, Spring Batch Job으로 소비하여 DB에 저장하는 구조에서 다음과 같은 문제가 관찰되었습니다.

  1. Kafka Consumer의 처리 속도가 Kafka Producer의 생산 속도를 따라가지 못함.
  2. Spring Batch의 Chunk 기반 처리에서 병목이 발생하여 Job 실행 시간이 지나치게 길어짐.
  3. 전체 시스템에서 병목 지점이 불명확하여 디버깅 및 최적화가 어려움.

직면한 문제 분석

1. Kafka Consumer와 Batch Integration 병목

  • Kafka Listener가 데이터를 Spring Batch로 전달하는 과정에서 데이터 전달 큐의 크기가 초과되어 Consumer의 처리 대기 시간이 증가.

2. Spring Batch의 Chunk Size 비효율

  • 기본 Chunk Size가 10으로 설정되어 있어, 데이터 처리 단위가 지나치게 작음.

3. I/O 병목

  • Spring Batch가 DB에 데이터를 저장할 때, 삽입 쿼리 실행 시간이 처리 속도를 제한.

4. Kafka Partition 설정 미흡

  • Kafka Topic이 단일 Partition으로 설정되어 있어 병렬 처리가 불가능한 상태.

해결책

1. Kafka Partition 수 증가

  • Topic Partition을 4개로 늘려 병렬 처리가 가능하도록 구성.

2. Spring Batch의 Chunk Size 조정

  • Chunk Size를 100으로 늘려 데이터 처리 단위를 확대.

3. Spring Batch의 Multi-thread Step 사용

  • TaskExecutor를 활용하여 Spring Batch Step에서 다중 스레드로 작업을 병렬화.

4. Batch Writer 최적화

  • Hibernate의 JdbcBatchItemWriter를 적용하여 DB에 데이터를 Batch Insert로 처리.

5. Kafka Consumer Poll 설정 조정

  • max.poll.records 값을 500으로 증가시켜, Kafka Consumer가 더 많은 데이터를 한 번에 가져올 수 있도록 설정.

성능 개선 전후 비교

1. 성능 측정 데이터

  • 개선 전: 10,000건 처리에 약 30분 소요
  • 개선 후: 10,000건 처리에 약 5분 소요

2. 병목 지점 분석 결과

  • Consumer 처리 속도와 DB 저장 속도 모두 큰 폭으로 개선.
  • Kafka Topic Partition 증가로 Consumer 병렬 처리량 증가.

3. 코드구현

@Configuration
@EnableBatchProcessing
public class KafkaBatchConfig {

    @Bean
    public Step logProcessingStep(StepBuilderFactory stepBuilderFactory,
                                  ItemReader<String> kafkaItemReader,
                                  ItemWriter<String> databaseItemWriter) {
        return stepBuilderFactory.get("logProcessingStep")
            .<String, String>chunk(100)  // Chunk Size 설정
            .reader(kafkaItemReader)
            .writer(databaseItemWriter)
            .taskExecutor(taskExecutor())
            .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        executor.setConcurrencyLimit(4); // Multi-thread 처리
        return executor;
    }

    @Bean
    public KafkaItemReader<String, String> kafkaItemReader() {
        return new KafkaItemReader<>("logTopic", consumerFactory(), "groupId");
    }

    @Bean
    public ItemWriter<String> databaseItemWriter(DataSource dataSource) {
        JdbcBatchItemWriter<String> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("INSERT INTO logs (log_message) VALUES (:message)");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Consumer Poll 설정
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

결론 및 결과

Spring Batch와 Kafka를 통합한 로그 데이터 처리 시스템의 성능을 크게 개선하였습니다.
위의 최적화를 통해 데이터 처리 속도를 6배 이상 향상시켰으며,
병렬 처리 및 Batch Writer 최적화를 통해 시스템 안정성도 개선되었습니다.
이를 실제 서비스에 적용하여 로그 처리 시간을 단축하고,
시스템 병목 현상을 해소할 수 있었습니다.

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글