스프링 배치 로 기상정보 가져오기 멀티스레드(맛보기 시리즈 2-7)

MJ·2025년 8월 29일
post-thumbnail

08. 배치 학습 4단계: 성능 최적화 및 멀티스레드 처리

🎯 학습 목표

  • 대용량 데이터 처리를 위한 성능 최적화 기법 습득
  • 멀티스레드와 병렬 처리 구현
  • 파티셔닝(Partitioning)을 통한 분산 처리
  • 메모리 최적화 및 성능 모니터링

📊 성능 최적화 전략 개요

성능 병목 지점 분석

[데이터 읽기] → [데이터 처리] → [데이터 쓰기]
     ↓              ↓              ↓
  I/O 병목        CPU 병목      I/O + 트랜잭션 병목
     ↓              ↓              ↓
  - 페이징        - 복잡한        - 배치 삽입
  - 인덱스         비즈니스 로직    - 트랜잭션 크기
  - 캐싱          - 변환 작업      - 커넥션 풀

최적화 기법들

  1. 청크 크기 최적화: 메모리 vs 트랜잭션 오버헤드 균형
  2. 멀티스레드 처리: CPU 활용도 극대화
  3. 파티셔닝: 대용량 데이터 분할 처리
  4. 비동기 처리: ItemProcessor에서 외부 API 호출 최적화
  5. 데이터베이스 최적화: 인덱스, 배치 삽입, 커넥션 풀

🚀 멀티스레드 Step 구현

1. 기본 멀티스레드 설정

package com.example.batchtutorial.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * 멀티스레드 배치 처리 설정
 */
@Slf4j
@Configuration
public class MultiThreadBatchConfig {
    
    @Autowired
    private JobRepository jobRepository;
    
    @Autowired
    private PlatformTransactionManager transactionManager;
    
    /**
     * 멀티스레드 배치용 TaskExecutor
     */
    @Bean
    public TaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);        // 기본 스레드 수
        executor.setMaxPoolSize(8);         // 최대 스레드 수
        executor.setQueueCapacity(100);     // 큐 용량
        executor.setThreadNamePrefix("batch-thread-");
        executor.setKeepAliveSeconds(60);   // 유휴 스레드 생존 시간
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
    
    /**
     * CPU 집약적 작업용 TaskExecutor
     */
    @Bean
    public TaskExecutor cpuIntensiveTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // CPU 코어 수만큼 스레드 설정 (CPU 집약적 작업)
        int cpuCores = Runtime.getRuntime().availableProcessors();
        executor.setCorePoolSize(cpuCores);
        executor.setMaxPoolSize(cpuCores);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("cpu-batch-");
        executor.initialize();
        return executor;
    }
    
    /**
     * I/O 집약적 작업용 TaskExecutor
     */
    @Bean
    public TaskExecutor ioIntensiveTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // I/O 대기 시간을 고려하여 더 많은 스레드 할당
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("io-batch-");
        executor.initialize();
        return executor;
    }
    
    /**
     * 멀티스레드 Step 설정
     */
    @Bean
    public Step multiThreadStep(ItemReader<Object> reader,
                               ItemProcessor<Object, Object> processor,
                               ItemWriter<Object> writer) {
        return new StepBuilder("multiThreadStep", jobRepository)
                .<Object, Object>chunk(100, transactionManager)  // 큰 청크 크기
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .taskExecutor(batchTaskExecutor())       // TaskExecutor 설정
                .throttleLimit(4)                        // 동시 실행 스레드 제한
                .build();
    }
}

2. 스레드 세이프 ItemReader 구현

package com.example.batchtutorial.batch.reader;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;

import javax.sql.DataSource;

/**
 * 멀티스레드 환경에서 안전한 ItemReader 설정
 */
@Slf4j
@Configuration
public class ThreadSafeReaderConfig {
    
    @Autowired
    private DataSource dataSource;
    
    /**
     * 스레드 세이프한 JDBC Paging ItemReader
     * (JdbcPagingItemReader는 기본적으로 thread-safe)
     */
    @Bean
    public JdbcPagingItemReader<LargeDataDto> threadSafeReader() throws Exception {
        
        return new JdbcPagingItemReaderBuilder<LargeDataDto>()
                .name("threadSafeReader")
                .dataSource(dataSource)
                .queryProvider(createQueryProvider())
                .pageSize(1000)                    // 페이지 크기 증가
                .rowMapper(new BeanPropertyRowMapper<>(LargeDataDto.class))
                .saveState(false)                  // 멀티스레드에서는 상태 저장 비활성화
                .build();
    }
    
    /**
     * 페이징 쿼리 제공자 생성
     */
    private PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean factory = new SqlPagingQueryProviderFactoryBean();
        factory.setDataSource(dataSource);
        factory.setSelectClause("SELECT id, name, amount, created_at");
        factory.setFromClause("FROM large_data_table");
        factory.setWhereClause("WHERE status = 'ACTIVE'");
        factory.setSortKey("id");                  // 정렬 키 필수
        return factory.getObject();
    }
}

/**
 * 대용량 데이터 처리용 DTO
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
class LargeDataDto {
    private Long id;
    private String name;
    private BigDecimal amount;
    private LocalDateTime createdAt;
}

3. 비동기 ItemProcessor 구현

package com.example.batchtutorial.batch.processor;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

/**
 * 비동기 처리를 포함하는 ItemProcessor
 */
@Slf4j
@Configuration
public class AsyncProcessorConfig {
    
    @Autowired
    private WebClient webClient;
    
    /**
     * 외부 API 호출이 포함된 비동기 Processor
     */
    @Bean
    public ItemProcessor<LargeDataDto, ProcessedDataDto> asyncProcessor() {
        return new ItemProcessor<LargeDataDto, ProcessedDataDto>() {
            @Override
            public ProcessedDataDto process(LargeDataDto item) throws Exception {
                
                long startTime = System.currentTimeMillis();
                String threadName = Thread.currentThread().getName();
                
                log.debug("Processing item {} on thread {}", item.getId(), threadName);
                
                try {
                    // 1. 로컬 데이터 변환 (CPU 작업)
                    ProcessedDataDto result = transformData(item);
                    
                    // 2. 외부 API 호출 (I/O 작업)
                    String enrichedData = callExternalApiAsync(item.getId()).get();
                    result.setEnrichedData(enrichedData);
                    
                    // 3. 복잡한 계산 수행 (CPU 작업)
                    BigDecimal calculatedValue = performComplexCalculation(item.getAmount());
                    result.setCalculatedValue(calculatedValue);
                    
                    long processingTime = System.currentTimeMillis() - startTime;
                    result.setProcessingTime(processingTime);
                    
                    log.debug("Completed processing item {} in {}ms on thread {}", 
                            item.getId(), processingTime, threadName);
                    
                    return result;
                    
                } catch (Exception e) {
                    log.error("Failed to process item {} on thread {}: {}", 
                            item.getId(), threadName, e.getMessage());
                    throw e;
                }
            }
        };
    }
    
    /**
     * 데이터 변환 로직
     */
    private ProcessedDataDto transformData(LargeDataDto input) {
        ProcessedDataDto result = new ProcessedDataDto();
        result.setId(input.getId());
        result.setProcessedName(input.getName().toUpperCase());
        result.setOriginalAmount(input.getAmount());
        result.setProcessedAt(LocalDateTime.now());
        return result;
    }
    
    /**
     * 외부 API 비동기 호출
     */
    private CompletableFuture<String> callExternalApiAsync(Long id) {
        return webClient.get()
                .uri("/api/enrich/{id}", id)
                .retrieve()
                .bodyToMono(String.class)
                .timeout(Duration.ofSeconds(5))
                .toFuture();
    }
    
    /**
     * 복잡한 계산 수행 (CPU 집약적)
     */
    private BigDecimal performComplexCalculation(BigDecimal input) {
        // 복잡한 수학적 계산 시뮬레이션
        BigDecimal result = input;
        for (int i = 0; i < 1000; i++) {
            result = result.multiply(new BigDecimal("1.001"));
        }
        return result.setScale(2, RoundingMode.HALF_UP);
    }
}

/**
 * 처리된 데이터 DTO
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
class ProcessedDataDto {
    private Long id;
    private String processedName;
    private BigDecimal originalAmount;
    private BigDecimal calculatedValue;
    private String enrichedData;
    private LocalDateTime processedAt;
    private Long processingTime;
}

🔀 파티셔닝(Partitioning) 구현

1. 파티셔닝 기본 설정

package com.example.batchtutorial.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * 파티셔닝을 통한 분산 처리 설정
 */
@Slf4j
@Configuration
public class PartitioningBatchConfig {
    
    @Autowired
    private JobRepository jobRepository;
    
    @Autowired
    private PlatformTransactionManager transactionManager;
    
    /**
     * 파티셔닝 Job 설정
     */
    @Bean
    public Job partitioningJob(Step managerStep) {
        return new JobBuilder("partitioningJob", jobRepository)
                .start(managerStep)
                .build();
    }
    
    /**
     * 매니저 Step (파티션을 관리하는 Step)
     */
    @Bean
    public Step managerStep(Step workerStep, Partitioner customPartitioner, TaskExecutor taskExecutor) {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", customPartitioner)
                .step(workerStep)
                .partitionHandler(partitionHandler(workerStep, taskExecutor))
                .build();
    }
    
    /**
     * 파티션 핸들러 설정
     */
    @Bean
    public PartitionHandler partitionHandler(Step workerStep, TaskExecutor taskExecutor) {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setTaskExecutor(taskExecutor);
        handler.setStep(workerStep);
        handler.setGridSize(8);                    // 파티션 개수
        return handler;
    }
    
    /**
     * 워커 Step (실제 작업을 수행하는 Step)
     */
    @Bean
    public Step workerStep(ItemReader<LargeDataDto> partitionReader,
                          ItemProcessor<LargeDataDto, ProcessedDataDto> processor,
                          ItemWriter<ProcessedDataDto> writer) {
        return new StepBuilder("workerStep", jobRepository)
                .<LargeDataDto, ProcessedDataDto>chunk(500, transactionManager)
                .reader(partitionReader)
                .processor(processor)
                .writer(writer)
                .build();
    }
    
    /**
     * 커스텀 파티셔너 - 데이터 범위에 따라 분할
     */
    @Bean
    public Partitioner customPartitioner() {
        return new CustomRangePartitioner();
    }
}

2. 커스텀 파티셔너 구현

package com.example.batchtutorial.partition;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * ID 범위 기반 파티셔너
 */
@Slf4j
@Component
public class CustomRangePartitioner implements Partitioner {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        
        log.info("Creating {} partitions for data processing", gridSize);
        
        // 1. 전체 데이터 범위 조회
        Long minId = jdbcTemplate.queryForObject("SELECT MIN(id) FROM large_data_table WHERE status = 'ACTIVE'", Long.class);
        Long maxId = jdbcTemplate.queryForObject("SELECT MAX(id) FROM large_data_table WHERE status = 'ACTIVE'", Long.class);
        Long totalRecords = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM large_data_table WHERE status = 'ACTIVE'", Long.class);
        
        if (minId == null || maxId == null || totalRecords == 0) {
            log.warn("No data found for partitioning");
            return new HashMap<>();
        }
        
        log.info("Total records: {}, ID range: {} - {}", totalRecords, minId, maxId);
        
        // 2. 파티션별 범위 계산
        long rangeSize = (maxId - minId + 1) / gridSize;
        Map<String, ExecutionContext> partitions = new HashMap<>();
        
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            
            long startId = minId + (i * rangeSize);
            long endId = (i == gridSize - 1) ? maxId : startId + rangeSize - 1;
            
            context.putLong("startId", startId);
            context.putLong("endId", endId);
            
            // 파티션별 예상 레코드 수 계산
            Long partitionRecordCount = jdbcTemplate.queryForObject(
                "SELECT COUNT(*) FROM large_data_table WHERE id BETWEEN ? AND ? AND status = 'ACTIVE'",
                Long.class, startId, endId
            );
            
            context.putLong("expectedRecords", partitionRecordCount);
            
            partitions.put("partition" + i, context);
            
            log.info("Partition {}: ID range {} - {}, expected records: {}", 
                    i, startId, endId, partitionRecordCount);
        }
        
        return partitions;
    }
}

3. 파티션용 StepScope ItemReader

package com.example.batchtutorial.batch.reader;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * 파티션별 데이터를 읽는 StepScope ItemReader
 */
@Slf4j
@Configuration
public class PartitionReaderConfig {
    
    @Autowired
    private DataSource dataSource;
    
    /**
     * 파티션 범위에 따라 데이터를 읽는 Reader
     */
    @Bean
    @StepScope
    public JdbcPagingItemReader<LargeDataDto> partitionReader(
            @Value("#{stepExecutionContext['startId']}") Long startId,
            @Value("#{stepExecutionContext['endId']}") Long endId,
            @Value("#{stepExecutionContext['expectedRecords']}") Long expectedRecords) {
        
        String threadName = Thread.currentThread().getName();
        log.info("Creating partition reader for range {} - {} (expected: {} records) on thread {}", 
                startId, endId, expectedRecords, threadName);
        
        Map<String, Object> parameters = new HashMap<>();
        parameters.put("startId", startId);
        parameters.put("endId", endId);
        
        return new JdbcPagingItemReaderBuilder<LargeDataDto>()
                .name("partitionReader")
                .dataSource(dataSource)
                .queryString("SELECT id, name, amount, created_at FROM large_data_table WHERE id BETWEEN :startId AND :endId AND status = 'ACTIVE'")
                .parameterValues(parameters)
                .pageSize(100)
                .rowMapper(new BeanPropertyRowMapper<>(LargeDataDto.class))
                .saveState(false)  // 파티셔닝에서는 상태 저장 비활성화
                .build();
    }
}

📈 성능 모니터링 및 측정

1. 배치 성능 모니터링 리스너

package com.example.batchtutorial.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;

/**
 * 배치 성능 모니터링 리스너
 */
@Slf4j
@Component
public class PerformanceMonitoringListener implements JobExecutionListener {
    
    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info("🚀 Job started: {} at {}", 
                jobExecution.getJobInstance().getJobName(),
                LocalDateTime.ofInstant(jobExecution.getStartTime().toInstant(), ZoneId.systemDefault()));
        
        // 시스템 리소스 모니터링
        Runtime runtime = Runtime.getRuntime();
        long maxMemory = runtime.maxMemory();
        long freeMemory = runtime.freeMemory();
        long totalMemory = runtime.totalMemory();
        
        log.info("💾 Memory status - Max: {}MB, Free: {}MB, Total: {}MB", 
                maxMemory / (1024 * 1024), 
                freeMemory / (1024 * 1024), 
                totalMemory / (1024 * 1024));
        
        log.info("🖥️ Available processors: {}", runtime.availableProcessors());
    }
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        Duration duration = Duration.between(
                jobExecution.getStartTime().toInstant(),
                jobExecution.getEndTime().toInstant()
        );
        
        log.info("✅ Job completed: {} in {} seconds", 
                jobExecution.getJobInstance().getJobName(),
                duration.getSeconds());
        
        // Step별 성능 통계
        logStepStatistics(jobExecution);
        
        // 전체 성능 통계
        logOverallPerformance(jobExecution, duration);
        
        // 메모리 사용량 최종 확인
        logFinalMemoryUsage();
    }
    
    private void logStepStatistics(JobExecution jobExecution) {
        log.info("📊 Step Performance Statistics:");
        
        for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
            Duration stepDuration = Duration.between(
                    stepExecution.getStartTime().toInstant(),
                    stepExecution.getEndTime().toInstant()
            );
            
            long readCount = stepExecution.getReadCount();
            long writeCount = stepExecution.getWriteCount();
            long skipCount = stepExecution.getSkipCount();
            
            double readThroughput = readCount / (double) stepDuration.getSeconds();
            double writeThroughput = writeCount / (double) stepDuration.getSeconds();
            
            log.info("  📋 Step: {}", stepExecution.getStepName());
            log.info("    ⏱️ Duration: {} seconds", stepDuration.getSeconds());
            log.info("    📖 Read: {} items ({:.2f} items/sec)", readCount, readThroughput);
            log.info("    ✏️ Write: {} items ({:.2f} items/sec)", writeCount, writeThroughput);
            log.info("    ⚠️ Skip: {} items", skipCount);
            
            if (stepExecution.getCommitCount() > 0) {
                double avgChunkSize = (double) writeCount / stepExecution.getCommitCount();
                log.info("    🔄 Commits: {} (avg chunk size: {:.1f})", 
                        stepExecution.getCommitCount(), avgChunkSize);
            }
        }
    }
    
    private void logOverallPerformance(JobExecution jobExecution, Duration duration) {
        long totalReadCount = jobExecution.getStepExecutions().stream()
                .mapToLong(StepExecution::getReadCount)
                .sum();
        
        long totalWriteCount = jobExecution.getStepExecutions().stream()
                .mapToLong(StepExecution::getWriteCount)
                .sum();
        
        double overallThroughput = totalWriteCount / (double) duration.getSeconds();
        
        log.info("🎯 Overall Performance:");
        log.info("  📖 Total Read: {} items", totalReadCount);
        log.info("  ✏️ Total Write: {} items", totalWriteCount);
        log.info("  ⚡ Overall Throughput: {:.2f} items/sec", overallThroughput);
        log.info("  📊 Success Rate: {:.2f}%", 
                (totalWriteCount / (double) totalReadCount) * 100);
    }
    
    private void logFinalMemoryUsage() {
        Runtime runtime = Runtime.getRuntime();
        runtime.gc(); // 가비지 컬렉션 수행
        
        long usedMemory = runtime.totalMemory() - runtime.freeMemory();
        long maxMemory = runtime.maxMemory();
        
        log.info("💾 Final Memory Usage: {}MB / {}MB ({:.1f}%)", 
                usedMemory / (1024 * 1024),
                maxMemory / (1024 * 1024),
                (usedMemory / (double) maxMemory) * 100);
    }
}

2. 성능 최적화된 ItemWriter

package com.example.batchtutorial.batch.writer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;

import javax.sql.DataSource;

/**
 * 고성능 배치 삽입을 위한 ItemWriter
 */
@Slf4j
@Configuration  
public class HighPerformanceWriterConfig {
    
    @Autowired
    private DataSource dataSource;
    
    /**
     * JDBC 배치 삽입 Writer
     */
    @Bean
    public JdbcBatchItemWriter<ProcessedDataDto> highPerformanceWriter() {
        return new JdbcBatchItemWriterBuilder<ProcessedDataDto>()
                .dataSource(dataSource)
                .sql("INSERT INTO processed_data (id, processed_name, original_amount, calculated_value, enriched_data, processed_at, processing_time) " +
                     "VALUES (:id, :processedName, :originalAmount, :calculatedValue, :enrichedData, :processedAt, :processingTime)")
                .beanMapped()
                .assertUpdates(false)  // 업데이트 확인 비활성화 (성능 향상)
                .build();
    }
    
    /**
     * 사용자 정의 배치 Writer (더 세밀한 제어)
     */
    @Bean
    public CustomBatchWriter customBatchWriter() {
        return new CustomBatchWriter(dataSource);
    }
}

/**
 * 커스텀 배치 Writer 구현
 */
@Slf4j
class CustomBatchWriter implements ItemWriter<ProcessedDataDto> {
    
    private final DataSource dataSource;
    private final JdbcTemplate jdbcTemplate;
    
    public CustomBatchWriter(DataSource dataSource) {
        this.dataSource = dataSource;
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }
    
    @Override
    public void write(Chunk<? extends ProcessedDataDto> chunk) throws Exception {
        List<? extends ProcessedDataDto> items = chunk.getItems();
        
        if (items.isEmpty()) {
            return;
        }
        
        long startTime = System.currentTimeMillis();
        String threadName = Thread.currentThread().getName();
        
        log.debug("Writing {} items on thread {}", items.size(), threadName);
        
        try {
            // 배치 삽입 준비
            String sql = "INSERT INTO processed_data " +
                        "(id, processed_name, original_amount, calculated_value, enriched_data, processed_at, processing_time) " +
                        "VALUES (?, ?, ?, ?, ?, ?, ?)";
            
            List<Object[]> batchArgs = items.stream()
                    .map(this::createBatchArgs)
                    .collect(Collectors.toList());
            
            // 배치 실행
            int[] results = jdbcTemplate.batchUpdate(sql, batchArgs);
            
            long executionTime = System.currentTimeMillis() - startTime;
            double throughput = items.size() / (executionTime / 1000.0);
            
            log.debug("Batch write completed - {} items in {}ms ({:.2f} items/sec) on thread {}", 
                    items.size(), executionTime, throughput, threadName);
            
            // 실패한 삽입 확인
            long failedInserts = Arrays.stream(results).filter(result -> result == 0).count();
            if (failedInserts > 0) {
                log.warn("⚠️ {} insertions failed out of {} on thread {}", 
                        failedInserts, items.size(), threadName);
            }
            
        } catch (Exception e) {
            log.error("❌ Batch write failed for {} items on thread {}: {}", 
                    items.size(), threadName, e.getMessage(), e);
            throw e;
        }
    }
    
    private Object[] createBatchArgs(ProcessedDataDto item) {
        return new Object[]{
            item.getId(),
            item.getProcessedName(),
            item.getOriginalAmount(),
            item.getCalculatedValue(),
            item.getEnrichedData(),
            item.getProcessedAt(),
            item.getProcessingTime()
        };
    }
}

🔧 성능 최적화 설정

1. 데이터베이스 커넥션 풀 최적화

# application-performance.properties

# HikariCP 설정 (고성능 커넥션 풀)
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.idle-timeout=300000
spring.datasource.hikari.max-lifetime=600000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.leak-detection-threshold=60000

# JPA/Hibernate 성능 최적화
spring.jpa.hibernate.jdbc.batch_size=25
spring.jpa.hibernate.order_inserts=true
spring.jpa.hibernate.order_updates=true
spring.jpa.hibernate.jdbc.batch_versioned_data=true

# Spring Batch 성능 설정
spring.batch.jdbc.isolation-level-for-create=READ_COMMITTED
spring.batch.jdbc.table-prefix=BATCH_

# 로깅 설정 (성능 모드)
logging.level.org.springframework.batch=INFO
logging.level.org.hibernate.SQL=WARN
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=WARN

2. JVM 성능 튜닝 옵션

# 배치 애플리케이션 실행 시 권장 JVM 옵션
java -Xms2g -Xmx4g \
     -XX:+UseG1GC \
     -XX:MaxGCPauseMillis=200 \
     -XX:+UnlockExperimentalVMOptions \
     -XX:+UseStringDeduplication \
     -XX:+PrintGCDetails \
     -XX:+PrintGCTimeStamps \
     -Xloggc:gc.log \
     -jar batch-application.jar

📊 성능 테스트 및 벤치마크

성능 테스트 시나리오

package com.example.batchtutorial.test;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

import java.time.Duration;
import java.time.Instant;

/**
 * 배치 성능 테스트
 */
@Slf4j
@SpringBootTest
@ActiveProfiles("performance")
class BatchPerformanceTest {
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private Job singleThreadJob;
    
    @Autowired
    private Job multiThreadJob;
    
    @Autowired
    private Job partitioningJob;
    
    @Test
    void compareBatchPerformance() throws Exception {
        
        // 1. 단일 스레드 성능 테스트
        Instant start = Instant.now();
        jobLauncher.run(singleThreadJob, createJobParameters("single"));
        Duration singleThreadTime = Duration.between(start, Instant.now());
        log.info("Single thread execution time: {} seconds", singleThreadTime.getSeconds());
        
        // 2. 멀티 스레드 성능 테스트
        start = Instant.now();
        jobLauncher.run(multiThreadJob, createJobParameters("multi"));
        Duration multiThreadTime = Duration.between(start, Instant.now());
        log.info("Multi thread execution time: {} seconds", multiThreadTime.getSeconds());
        
        // 3. 파티셔닝 성능 테스트
        start = Instant.now();
        jobLauncher.run(partitioningJob, createJobParameters("partition"));
        Duration partitioningTime = Duration.between(start, Instant.now());
        log.info("Partitioning execution time: {} seconds", partitioningTime.getSeconds());
        
        // 4. 성능 비교 결과
        log.info("Performance Comparison:");
        log.info("  Single Thread: {} seconds (baseline)", singleThreadTime.getSeconds());
        log.info("  Multi Thread: {} seconds ({}x faster)", 
                multiThreadTime.getSeconds(), 
                (double) singleThreadTime.getSeconds() / multiThreadTime.getSeconds());
        log.info("  Partitioning: {} seconds ({}x faster)", 
                partitioningTime.getSeconds(),
                (double) singleThreadTime.getSeconds() / partitioningTime.getSeconds());
    }
    
    private JobParameters createJobParameters(String suffix) {
        return new JobParametersBuilder()
                .addLong("timestamp", System.currentTimeMillis())
                .addString("testType", suffix)
                .toJobParameters();
    }
}

이제 대용량 데이터 처리를 위한 고성능 Spring Batch 시스템을 완전히 구현했습니다! 다음 단계에서는 실무에서 필요한 에러 처리와 모니터링 기능을 학습하겠습니다.

profile
..

0개의 댓글