이전 글에서는 Elasticsearch, Kibana, Redis, MongoDB를 Docker Compose로 구성하는 과정을 다루었으며, 이번 글에서는 Spring Batch, Quartz 설정, CSV 파일 생성 및 S3 업로드, Kafka 메시지 전송을 진행하겠습니다.
// build.gradle
implementation 'org.springframework.boot:spring-boot-starter-batch'spring:
  batch:
    jdbc:
      initialize-schema: always
    job:
      enabled: false  BatchConfig 클래스는 Spring Batch의 다양한 컴포넌트를 설정하고 배치 작업을 정의합니다. 이 설정은 CSV 파일을 생성하고 S3에 업로드하며, 실패한 항목을 재처리하고 Kafka 메시지를 전송하는 프로세스를 포함합니다.
@Configuration
@Slf4j
@RequiredArgsConstructor
public class BatchConfig {
    private static final String JOB_NAME = "exportEventsJob";
    private static final String MANAGER_STEP_NAME = "managerStep";
    private static final String WORKER_STEP_NAME = "workerStep";
    private static final String RETRY_STEP_NAME = "retryFailedItemsStep";
    private static final String KAFKA_STEP_NAME = "sendKafkaMessageStep";
    private static final int CHUNK_SIZE = 2000;
    private static final int PAGE_SIZE = 1000;
    private static final String THREAD_NAME_PREFIX = "Batch-Thread-";
    private final CSVGenerator csvGenerator;
    private final EventRepository eventRepository;
    private final AwsS3Service awsS3Service;
    private final FullIndexingProducer fullIndexingProducer;
    private final PrepareJobListener prepareJobListener;
    private final FailedItemRepository failedItemRepository;배치 작업을 정의하며, 각 Step을 순차적으로 실행합니다.
@Bean
public Job exportEventsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder(JOB_NAME, jobRepository)
            .listener(prepareJobListener)
            .start(managerStep(jobRepository, taskExecutor(), transactionManager))
            .next(retryFailedItemsStep(jobRepository, transactionManager))
            .next(sendKafkaMessageStep(jobRepository, transactionManager))
            .build();
}Manager Step
@Bean
public Step managerStep(JobRepository jobRepository, TaskExecutor taskExecutor, PlatformTransactionManager transactionManager) {
    return new StepBuilder(MANAGER_STEP_NAME, jobRepository)
            .partitioner(WORKER_STEP_NAME, partitioner())
            .step(workerStep(jobRepository, transactionManager))
            .taskExecutor(taskExecutor)
            .allowStartIfComplete(true)
            .build();
}Worker Step
@Bean
public Step workerStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder(WORKER_STEP_NAME, jobRepository)
            .<Long, Long>chunk(CHUNK_SIZE, transactionManager)
            .reader(eventReader(null, null))
            .processor(batchProcessor())
            .writer(csvBatchWriter())
            .faultTolerant()
            .skip(Exception.class).skipLimit(5)
            .retry(Exception.class).retryLimit(3)
            .listener(skipListener())
            .allowStartIfComplete(true)
            .build();
}S3에 업로드된 파일 경로를 Kafka 메시지로 전송합니다.
@Bean
public Step sendKafkaMessageStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder(KAFKA_STEP_NAME, jobRepository)
            .tasklet(sendKafkaMessageTasklet(), transactionManager)
            .build();
}
@Bean
public Tasklet sendKafkaMessageTasklet() {
    return (contribution, chunkContext) -> {
        List<String> filePaths = awsS3Service.getFiles();
        int batchSize = 2000;
        for (int i = 0; i < filePaths.size(); i += batchSize) {
            List<String> batch = filePaths.subList(i, Math.min(i + batchSize, filePaths.size()));
            String message = String.join(",", batch);
            boolean isLastMessage = (i + batchSize >= filePaths.size());
            fullIndexingProducer.sendIndexingMessage(message, isLastMessage);
        }
        return RepeatStatus.FINISHED;
    };
}이전에 실패한 항목을 데이터베이스에서 읽어와 다시 처리합니다.
@Bean
public Step retryFailedItemsStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder(RETRY_STEP_NAME, jobRepository)
            .<FailedItem, Long>chunk(100, transactionManager)
            .reader(failedItemReader())
            .processor(failedItemProcessor())
            .writer(itemWriter())
            .faultTolerant()
            .retry(Exception.class)
            .retryLimit(3)
            .listener(skipListener())
            .build();
}PrepareJobListener는 배치 작업 전에 S3 파일을 정리하고, Redis에 작업 상태를 저장합니다.
@Component
@RequiredArgsConstructor
public class PrepareJobListener implements JobExecutionListener {
    private final AwsS3Service awsS3Service;
    private final RedisTemplate<String, String> redisTemplate;
    private static final long FULL_INDEX_TTL = 60 * 60 * 1000L; // 1시간
    private static final String FULL_INDEXING_RESERVED = "FULL_INDEXING_RESERVED";
    @Override
    public void beforeJob(JobExecution jobExecution) {
        // Job이 처음 시작할 때만 S3 파일 삭제
        redisTemplate.opsForValue().set(FULL_INDEXING_RESERVED, "true", FULL_INDEX_TTL, TimeUnit.MILLISECONDS);
        awsS3Service.deleteAllFiles();
    }
    @Override
    public void afterJob(JobExecution jobExecution) {
        // 필요 시 Job 완료 후 추가 작업 처리
    }
}implementation 'org.springframework.boot:spring-boot-starter-quartz'spring:
  quartz:
    job-store-type: jdbc
    jdbc:
      initialize-schema: always
    properties:
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
      org.quartz.jobStore.tablePrefix: QRTZ_
      org.quartz.jobStore.isClustered: true
      org.quartz.threadPool.threadCount: 10QuartzBatchJob은 Redisson 분산 락을 사용해 동시 실행을 방지하며, 지정된 Job을 실행합니다.
@Slf4j
@RequiredArgsConstructor
public class QuartzBatchJob extends QuartzJobBean {
    private final JobLauncher jobLauncher;
    private final Job exportEventsJob;
    private final RedissonClient redisson;
    private static final long FULL_INDEX_TTL = 60 * 60 * 1000L; // 1시간
    private static final String INDEXING_LOCK = "FULL_INDEXING_LOCK";
    @Override
    protected void executeInternal(@NotNull JobExecutionContext context) throws JobExecutionException {
        RLock lock = redisson.getLock(INDEXING_LOCK);
        try {
            // 락 점유 시도
            if (lock.tryLock(0, FULL_INDEX_TTL, TimeUnit.MILLISECONDS)) {
                String today = LocalDate.now().toString(); // yyyy-MM-dd 형식
                JobParameters jobParameters = new JobParametersBuilder()
                        .addString("jobName", "exportEventsJob")
                        .addString("executionDate", today) // 날짜 추가
                        .toJobParameters();
                jobLauncher.run(exportEventsJob, jobParameters);
                log.info("Batch Job 'exportEventsJob' successfully executed.");
            } else {
                log.warn("Job is already running. Skipping execution.");
            }
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException |
                 JobParametersInvalidException e) {
            log.error("Failed to execute Batch Job 'exportEventsJob': {}", e.getMessage(), e);
            throw new JobExecutionException(e);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Lock acquisition was interrupted: {}", e.getMessage(), e);
        }
    }
}QuartzSchedulerConfig는 Quartz 작업을 스케줄링합니다. 아래 설정은 매일 새벽 2시에 작업을 실행하도록 구성합니다.
@Configuration
public class QuartzSchedulerConfig {
    private static final String JOB_NAME = "eventToCSVJob";
    @Bean
    public JobDetail jobDetail() {
        return JobBuilder.newJob(QuartzBatchJob.class)
                .withIdentity(JOB_NAME)
                .storeDurably()
                .build();
    }
    @Bean
    public Trigger trigger() {
        return TriggerBuilder.newTrigger()
                .forJob(jobDetail())
                .withIdentity(JOB_NAME + "Trigger")
                .withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 * * ?")) // 매일 새벽 2시 실행
                .build();
    }
}FullIndexingProducer는 Kafka 메시지를 생성하고 전송하며, 실패한 메시지를 DLQ(Dead Letter Queue)로 처리하는 역할을 합니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class FullIndexingProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;
    private static final String TOPIC_NAME = "full-indexing";
    private static final String DLQ_TOPIC_NAME = "full-indexing-dlq";
    private static final int PARTITION_NUMBER = 0; // 단일 파티션 설정
    public void sendIndexingMessage(String s3UrlList, boolean isLast) {
        try {
            // 메시지 직렬화
            String message = objectMapper.writeValueAsString(
                    new FullIndexingMessage(s3UrlList, isLast)
            );
            // 특정 파티션으로 메시지 전송
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, null, message);
            kafkaTemplate.send(record)
                    .thenApply(result -> {
                        log.info("Kafka 메시지 전송 성공: topic={}, partition={}, offset={}, message={}",
                                result.getRecordMetadata().topic(),
                                result.getRecordMetadata().partition(),
                                result.getRecordMetadata().offset(),
                                message);
                        return result;
                    })
                    .exceptionally(ex -> {
                        log.error("Kafka 메시지 전송 실패: {}, 오류: {}", message, ex.getMessage());
                        sendToDLQ(message);
                        return null;
                    });
        } catch (JsonProcessingException e) {
            log.error("Kafka 메시지 직렬화 실패. 데이터: {}, 오류: {}", s3UrlList, e.getMessage());
            throw new RuntimeException("Kafka 메시지 직렬화 실패", e);
        }
    }
    private void sendToDLQ(String failedMessage) {
        try {
            ProducerRecord<String, String> record = new ProducerRecord<>(DLQ_TOPIC_NAME, PARTITION_NUMBER, null, failedMessage);
            kafkaTemplate.send(record)
                    .thenAccept(result -> log.info("DLQ 메시지 전송 성공: topic={}, partition={}, offset={}, message={}",
                            result.getRecordMetadata().topic(),
                            result.getRecordMetadata().partition(),
                            result.getRecordMetadata().offset(),
                            failedMessage))
                    .exceptionally(ex -> {
                        log.error("DLQ 메시지 전송 실패: {}, 오류: {}", failedMessage, ex.getMessage());
                        return null;
                    });
        } catch (Exception e) {
            log.error("DLQ 메시지 전송 중 예외 발생. 메시지: {}, 오류: {}", failedMessage, e.getMessage());
        }
    }
    @Getter
    private static class FullIndexingMessage {
        private final String s3UrlList;
        private final boolean isLastMessage;
        public FullIndexingMessage(String message, boolean isLastMessage) {
            this.s3UrlList = message;
            this.isLastMessage = isLastMessage;
        }
    }
}