
Spring Batch와 Kafka를 연동하여 로그 데이터를 처리하는 시스템을 구현하던 중, 처리 성능이 예상보다 현저히 낮은 문제가 발생하였습니다.
“LogGenerator”를 통해 대량의 로그 데이터를 Kafka Topic으로 보내고, Spring Batch Job으로 소비하여 DB에 저장하는 구조에서 다음과 같은 문제가 관찰되었습니다.
@Configuration
@EnableBatchProcessing
public class KafkaBatchConfig {
@Bean
public Step logProcessingStep(StepBuilderFactory stepBuilderFactory,
ItemReader<String> kafkaItemReader,
ItemWriter<String> databaseItemWriter) {
return stepBuilderFactory.get("logProcessingStep")
.<String, String>chunk(100) // Chunk Size 설정
.reader(kafkaItemReader)
.writer(databaseItemWriter)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(4); // Multi-thread 처리
return executor;
}
@Bean
public KafkaItemReader<String, String> kafkaItemReader() {
return new KafkaItemReader<>("logTopic", consumerFactory(), "groupId");
}
@Bean
public ItemWriter<String> databaseItemWriter(DataSource dataSource) {
JdbcBatchItemWriter<String> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO logs (log_message) VALUES (:message)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Consumer Poll 설정
return new DefaultKafkaConsumerFactory<>(props);
}
}
Spring Batch와 Kafka를 통합한 로그 데이터 처리 시스템의 성능을 크게 개선하였습니다.
위의 최적화를 통해 데이터 처리 속도를 6배 이상 향상시켰으며,
병렬 처리 및 Batch Writer 최적화를 통해 시스템 안정성도 개선되었습니다.
이를 실제 서비스에 적용하여 로그 처리 시간을 단축하고,
시스템 병목 현상을 해소할 수 있었습니다.