이전 글에서는 Elasticsearch, Kibana, Redis, MongoDB를 Docker Compose로 구성하는 과정을 다루었으며, 이번 글에서는 Spring Batch, Quartz 설정, CSV 파일 생성 및 S3 업로드, Kafka 메시지 전송을 진행하겠습니다.
// build.gradle
implementation 'org.springframework.boot:spring-boot-starter-batch'
spring:
batch:
jdbc:
initialize-schema: always
job:
enabled: false
BatchConfig
클래스는 Spring Batch의 다양한 컴포넌트를 설정하고 배치 작업을 정의합니다. 이 설정은 CSV 파일을 생성하고 S3에 업로드하며, 실패한 항목을 재처리하고 Kafka 메시지를 전송하는 프로세스를 포함합니다.
@Configuration
@Slf4j
@RequiredArgsConstructor
public class BatchConfig {
private static final String JOB_NAME = "exportEventsJob";
private static final String MANAGER_STEP_NAME = "managerStep";
private static final String WORKER_STEP_NAME = "workerStep";
private static final String RETRY_STEP_NAME = "retryFailedItemsStep";
private static final String KAFKA_STEP_NAME = "sendKafkaMessageStep";
private static final int CHUNK_SIZE = 2000;
private static final int PAGE_SIZE = 1000;
private static final String THREAD_NAME_PREFIX = "Batch-Thread-";
private final CSVGenerator csvGenerator;
private final EventRepository eventRepository;
private final AwsS3Service awsS3Service;
private final FullIndexingProducer fullIndexingProducer;
private final PrepareJobListener prepareJobListener;
private final FailedItemRepository failedItemRepository;
배치 작업을 정의하며, 각 Step을 순차적으로 실행합니다.
@Bean
public Job exportEventsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder(JOB_NAME, jobRepository)
.listener(prepareJobListener)
.start(managerStep(jobRepository, taskExecutor(), transactionManager))
.next(retryFailedItemsStep(jobRepository, transactionManager))
.next(sendKafkaMessageStep(jobRepository, transactionManager))
.build();
}
Manager Step
@Bean
public Step managerStep(JobRepository jobRepository, TaskExecutor taskExecutor, PlatformTransactionManager transactionManager) {
return new StepBuilder(MANAGER_STEP_NAME, jobRepository)
.partitioner(WORKER_STEP_NAME, partitioner())
.step(workerStep(jobRepository, transactionManager))
.taskExecutor(taskExecutor)
.allowStartIfComplete(true)
.build();
}
Worker Step
@Bean
public Step workerStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder(WORKER_STEP_NAME, jobRepository)
.<Long, Long>chunk(CHUNK_SIZE, transactionManager)
.reader(eventReader(null, null))
.processor(batchProcessor())
.writer(csvBatchWriter())
.faultTolerant()
.skip(Exception.class).skipLimit(5)
.retry(Exception.class).retryLimit(3)
.listener(skipListener())
.allowStartIfComplete(true)
.build();
}
S3에 업로드된 파일 경로를 Kafka 메시지로 전송합니다.
@Bean
public Step sendKafkaMessageStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder(KAFKA_STEP_NAME, jobRepository)
.tasklet(sendKafkaMessageTasklet(), transactionManager)
.build();
}
@Bean
public Tasklet sendKafkaMessageTasklet() {
return (contribution, chunkContext) -> {
List<String> filePaths = awsS3Service.getFiles();
int batchSize = 2000;
for (int i = 0; i < filePaths.size(); i += batchSize) {
List<String> batch = filePaths.subList(i, Math.min(i + batchSize, filePaths.size()));
String message = String.join(",", batch);
boolean isLastMessage = (i + batchSize >= filePaths.size());
fullIndexingProducer.sendIndexingMessage(message, isLastMessage);
}
return RepeatStatus.FINISHED;
};
}
이전에 실패한 항목을 데이터베이스에서 읽어와 다시 처리합니다.
@Bean
public Step retryFailedItemsStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder(RETRY_STEP_NAME, jobRepository)
.<FailedItem, Long>chunk(100, transactionManager)
.reader(failedItemReader())
.processor(failedItemProcessor())
.writer(itemWriter())
.faultTolerant()
.retry(Exception.class)
.retryLimit(3)
.listener(skipListener())
.build();
}
PrepareJobListener
는 배치 작업 전에 S3 파일을 정리하고, Redis에 작업 상태를 저장합니다.
@Component
@RequiredArgsConstructor
public class PrepareJobListener implements JobExecutionListener {
private final AwsS3Service awsS3Service;
private final RedisTemplate<String, String> redisTemplate;
private static final long FULL_INDEX_TTL = 60 * 60 * 1000L; // 1시간
private static final String FULL_INDEXING_RESERVED = "FULL_INDEXING_RESERVED";
@Override
public void beforeJob(JobExecution jobExecution) {
// Job이 처음 시작할 때만 S3 파일 삭제
redisTemplate.opsForValue().set(FULL_INDEXING_RESERVED, "true", FULL_INDEX_TTL, TimeUnit.MILLISECONDS);
awsS3Service.deleteAllFiles();
}
@Override
public void afterJob(JobExecution jobExecution) {
// 필요 시 Job 완료 후 추가 작업 처리
}
}
implementation 'org.springframework.boot:spring-boot-starter-quartz'
spring:
quartz:
job-store-type: jdbc
jdbc:
initialize-schema: always
properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.isClustered: true
org.quartz.threadPool.threadCount: 10
QuartzBatchJob
은 Redisson 분산 락을 사용해 동시 실행을 방지하며, 지정된 Job을 실행합니다.
@Slf4j
@RequiredArgsConstructor
public class QuartzBatchJob extends QuartzJobBean {
private final JobLauncher jobLauncher;
private final Job exportEventsJob;
private final RedissonClient redisson;
private static final long FULL_INDEX_TTL = 60 * 60 * 1000L; // 1시간
private static final String INDEXING_LOCK = "FULL_INDEXING_LOCK";
@Override
protected void executeInternal(@NotNull JobExecutionContext context) throws JobExecutionException {
RLock lock = redisson.getLock(INDEXING_LOCK);
try {
// 락 점유 시도
if (lock.tryLock(0, FULL_INDEX_TTL, TimeUnit.MILLISECONDS)) {
String today = LocalDate.now().toString(); // yyyy-MM-dd 형식
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobName", "exportEventsJob")
.addString("executionDate", today) // 날짜 추가
.toJobParameters();
jobLauncher.run(exportEventsJob, jobParameters);
log.info("Batch Job 'exportEventsJob' successfully executed.");
} else {
log.warn("Job is already running. Skipping execution.");
}
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException |
JobParametersInvalidException e) {
log.error("Failed to execute Batch Job 'exportEventsJob': {}", e.getMessage(), e);
throw new JobExecutionException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Lock acquisition was interrupted: {}", e.getMessage(), e);
}
}
}
QuartzSchedulerConfig
는 Quartz 작업을 스케줄링합니다. 아래 설정은 매일 새벽 2시에 작업을 실행하도록 구성합니다.
@Configuration
public class QuartzSchedulerConfig {
private static final String JOB_NAME = "eventToCSVJob";
@Bean
public JobDetail jobDetail() {
return JobBuilder.newJob(QuartzBatchJob.class)
.withIdentity(JOB_NAME)
.storeDurably()
.build();
}
@Bean
public Trigger trigger() {
return TriggerBuilder.newTrigger()
.forJob(jobDetail())
.withIdentity(JOB_NAME + "Trigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 * * ?")) // 매일 새벽 2시 실행
.build();
}
}
FullIndexingProducer
는 Kafka 메시지를 생성하고 전송하며, 실패한 메시지를 DLQ(Dead Letter Queue)로 처리하는 역할을 합니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class FullIndexingProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private static final String TOPIC_NAME = "full-indexing";
private static final String DLQ_TOPIC_NAME = "full-indexing-dlq";
private static final int PARTITION_NUMBER = 0; // 단일 파티션 설정
public void sendIndexingMessage(String s3UrlList, boolean isLast) {
try {
// 메시지 직렬화
String message = objectMapper.writeValueAsString(
new FullIndexingMessage(s3UrlList, isLast)
);
// 특정 파티션으로 메시지 전송
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, PARTITION_NUMBER, null, message);
kafkaTemplate.send(record)
.thenApply(result -> {
log.info("Kafka 메시지 전송 성공: topic={}, partition={}, offset={}, message={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
message);
return result;
})
.exceptionally(ex -> {
log.error("Kafka 메시지 전송 실패: {}, 오류: {}", message, ex.getMessage());
sendToDLQ(message);
return null;
});
} catch (JsonProcessingException e) {
log.error("Kafka 메시지 직렬화 실패. 데이터: {}, 오류: {}", s3UrlList, e.getMessage());
throw new RuntimeException("Kafka 메시지 직렬화 실패", e);
}
}
private void sendToDLQ(String failedMessage) {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(DLQ_TOPIC_NAME, PARTITION_NUMBER, null, failedMessage);
kafkaTemplate.send(record)
.thenAccept(result -> log.info("DLQ 메시지 전송 성공: topic={}, partition={}, offset={}, message={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
failedMessage))
.exceptionally(ex -> {
log.error("DLQ 메시지 전송 실패: {}, 오류: {}", failedMessage, ex.getMessage());
return null;
});
} catch (Exception e) {
log.error("DLQ 메시지 전송 중 예외 발생. 메시지: {}, 오류: {}", failedMessage, e.getMessage());
}
}
@Getter
private static class FullIndexingMessage {
private final String s3UrlList;
private final boolean isLastMessage;
public FullIndexingMessage(String message, boolean isLastMessage) {
this.s3UrlList = message;
this.isLastMessage = isLastMessage;
}
}
}