
์ฐ๋ฆฌ ์๋น์ค์๋ ๋์ฉ๋ ์ด๋ฏธ์ง๋ฅผ ์ฒ๋ฆฌํ๋ Spring Batch ํ์ดํ๋ผ์ธ์ด ์๋ค. ์ฌ์ฉ์๊ฐ ZIP ํ์ผ์ ์ฌ๋ฆฌ๋ฉด, API ์๋ฒ(app-api)๊ฐ ํ์ผ์ ์์ ์ ์ฅํ๊ณ ๋ฐฐ์น ์๋ฒ(app-batch)์ "์ฒ๋ฆฌ ์์ํด" ๋ผ๋ ์ ํธ๋ฅผ ๋ณด๋ธ๋ค.
์ด ์ ํธ๋ฅผ ์ฒ์์๋ Redis Pub/Sub์ผ๋ก ๊ตฌํํ๋ค.
์ฌ์ฉ์ ์
๋ก๋
โ
โผ
app-api โ Redis PUBLISH "batch:pending" {batchJobId}
โ
โผ (๊ตฌ๋
์ค์ธ ๊ฒฝ์ฐ)
app-batch โ RedisBatchEventListener.onMessage() โ Spring Batch Job ์คํ
์ง๊ด์ ์ด๊ณ ๊ตฌํ๋ ๋น ๋ฅด๋ค. ๊ทธ๋ฆฌ๊ณ ๋น๋ถ๊ฐ์ ์ ๋์ํ๋ค.
์ด๋ ๋ QA์์ ๋ฒ๊ทธ ๋ฆฌํฌํธ๊ฐ ๋ค์ด์๋ค.
"ํ์ผ ์ ๋ก๋๋ ๋๋๋ฐ ์ฒ๋ฆฌ๊ฐ ์์์ด ์ ๋ฉ๋๋ค."
๋ก๊ทธ๋ฅผ ๋ณด๋ API ์๋ฒ๋ ์ ์์ ์ผ๋ก ์ด๋ฒคํธ๋ฅผ ๋ฐํํ๋ค. ๊ทธ๋ฐ๋ฐ ๋ฐฐ์น ์๋ฒ๊ฐ ์ฒ๋ฆฌํ ํ์ ์ด ์์๋ค. ๋ฐฐ์น ์๋ฒ๋ฅผ ์ฌ์์ํ ์งํ์ ์ฌ๋ผ์จ ์ ๋ก๋์๋ค.
์์ธ์ Redis Pub/Sub์ ํต์ฌ ํน์ฑ ๋๋ฌธ์ด์๋ค.
API ์๋ฒ: PUBLISH batch:pending โ ์ฑ๊ณต (Redis ์๋ต: 0 โ ๊ตฌ๋
์ 0๋ช
)
๋ฐฐ์น ์๋ฒ: [์ฌ์์ ์ค... ์์ง Subscribe ์ ํจ]
โ ๋ฉ์์ง๋ Redis์ ์ ์ฅ๋์ง ์๊ณ ์ฆ์ ์๋ฉธ
Redis Pub/Sub์ ์นด์นด์คํก ๋ผ์ด๋ธ ์ฑํ
๊ณผ ๊ฐ๋ค.
์ฑํ
๋ฐฉ์ ์๋ฌด๋ ์์ ๋ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ฉด ๐ฅ ๋ฉ์์ง๊ฐ ์ฌ๋ผ์ง๋ค!
Redis๋ "๊ตฌ๋
์๊ฐ ์์" ์ด๋ผ๋ ์๋ต(0)์ ๋๋ ค์ค ๋ฟ, ๋ฉ์์ง๋ฅผ ์ด๋์๋ ๋ณด๊ดํ์ง ์๋๋ค.
์ฐํธํจ์ ๋น์ ํ๋ฉด, Redis Pub/Sub์ ์ฐํธํจ์ด ์๋ค. ์ฌ๋์ด ์์ผ๋ฉด ํธ์ง๊ฐ ๊ณต์ค์์ ์ฌ๋ผ์ง๋ค.
| ์ํฉ | Redis Pub/Sub | Kafka |
|---|---|---|
| ๊ตฌ๋ ์๊ฐ ์์ ๋ ๋ฐํ | ๋ฉ์์ง ์๋ฉธ | ํ ํฝ์ ์ ์ฅ๋จ |
| ๊ตฌ๋ ์ ์ฌ์์ ํ | ์ด์ ๋ฉ์์ง ์์ ๋ถ๊ฐ | offset ์ดํ ๋ฉ์์ง ์ฌ์์ |
| ๋ธ๋ก์ปค ์ฅ์ ํ | ๋ฉ์์ง ์ ์ค | ์ฌ๊ธฐ๋ ํ ๋ณต๊ตฌ |
๋ฌธ์ ๋ฅผ ์ธ์งํ ํ 30๋ถ๋ง๋ค DB๋ฅผ ํด๋งํ๋ ์ค์ผ์ค๋ฌ๋ฅผ ๋ง๋ค์๋ค.
// โ ์์๋ฐฉํธ โ ๋ณต์ก์ฑ๋ง ์ฌ๋ผ๊ฐ๋ค
@Scheduled(fixedDelay = 1800000) // 30๋ถ
public void processDelayedJobs() {
// PENDING ์ํ์ธ ์์
์กฐํ โ ์ด๋ฒคํธ ์ฌ๋ฐํ
List<BatchFileJob> pendingJobs = batchFileJobRepository
.findByStatusAndCreatedAtBefore(BatchProcessingStatus.PENDING,
LocalDateTime.now().minusMinutes(30));
for (BatchFileJob job : pendingJobs) {
batchEventPort.publishPendingEvent(job.getId());
}
}
์ด๊ฒ์ ์ง์ง ํด๊ฒฐ์ด ์๋์๋ค. ๋ฉ์์ง๊ฐ ์ ์ค๋ ๋ฐฐ์น ์์ ์ ์ต๋ 30๋ถ ๋ค์์ผ ์ฌ์ฒ๋ฆฌ๊ฐ ์์๋๋ค. ์ฌ์ฉ์ ์ ์ฅ์์๋ ์ ๋ก๋ ํ 30๋ถ ๋์ ์๋ฌด ์ผ๋ ์ผ์ด๋์ง ์๋ ๊ฒ์ฒ๋ผ ๋ณด์ธ๋ค.
๊ทธ๋ฆฌ๊ณ ์ฝ๋๋ ๋ณต์กํด์ก๋ค. "์ ์ค์ผ์ค๋ฌ๊ฐ ์์ด์?" ๋ผ๋ ์ง๋ฌธ์ "๋ฉ์์ง ์ ์ค ๋ณด์์ฉ์ด์์"๋ผ๊ณ ๋ตํ๋ ์ํฉ์ด ๋๋ค.
๋ฉ์์ง ๋ธ๋ก์ปค ๊ต์ฒด๋ฅผ ๊ฒฐ์ ํ ๋ ์ธ ๊ฐ์ง ์ต์ ์ ๊ฒํ ํ๋ค.
Redis 5.0์์ ์ถ๊ฐ๋ Redis Stream์ Kafka์ฒ๋ผ ๋ฉ์์ง๋ฅผ ์์์ ์ผ๋ก ์ ์ฅํ๋ค.
์ฅ์ : ์ด๋ฏธ Redis๋ฅผ ์ฐ๊ณ ์์ผ๋ฏ๋ก ์ธํ๋ผ ์ถ๊ฐ ์์
๋จ์ : Redis Stream์ Redis๊ฐ ๋จ์ผ ์ฅ์ ์ ์ด ๋จ (์ฐ๋ฆฌ๋ ์ด๋ฏธ Redis๋ฅผ JWT/์๋ฆผ์ ์ฌ์ฉ)
Redis ์ฅ์ ์ ๋ฐฐ์น ํธ๋ฆฌ๊ฑฐ + JWT ์ธ์ฆ + ์๋ฆผ์ด ๋์์ ๋ถ๋ฅ
Redis๋ฅผ ์์ ์ ์ฐ๊ณ , ๋ฐฐ์น ์๋ฒ๊ฐ ์ฃผ๊ธฐ์ ์ผ๋ก DB์์ PENDING ์์ ์ ๊ฐ์ ธ๊ฐ๋ค.
์ฅ์ : ์ธํ๋ผ ์ถ๊ฐ ์์, ๋ฉ์์ง ์ ์ค ์์ (DB์ ์ํ ์ ์ฅ)
๋จ์ : ํด๋ง ์ฃผ๊ธฐ(์: 5์ด)๋ง๋ค ๋ถํ์ํ DB ์ฟผ๋ฆฌ
์ฒ๋ฆฌ ์์ ์ง์ฐ์ด ํด๋ง ์ฃผ๊ธฐ๋งํผ ๋ฐ์
์ฅ์ : At-Least-Once ๋ณด์ฅ (offset ๊ธฐ๋ฐ ์ฌ์ฒ๋ฆฌ)
๋ฐฐ์น ์๋ฒ ์ฌ์์ ํ ์ ์ฝ์ ๋ฉ์์ง ์๋ ์ฌ์ฒ๋ฆฌ
30๋ถ ํด๋ง ์ค์ผ์ค๋ฌ ์ ๊ฑฐ ๊ฐ๋ฅ
๋จ์ : ์ธํ๋ผ ์ถ๊ฐ (Kafka ์ปจํ
์ด๋)
ํ์ต ๋น์ฉ
์ ํ ์ด์ : ๋ฐฐ์น ์๋ฒ๋ ๊ฐ๋ ์ฌ์์๋๋ค. ์ฌ์์๋ง๋ค ๋ฉ์์ง๋ฅผ ์์ผ๋ฉด ์ ๋๋ค. Redis Stream์ ๊ธฐ์กด Redis ์์กด์ฑ์ ๋ ๋ฌด๊ฒ๊ฒ ๋ง๋ค๊ณ , DB ํด๋ง์ ๊ทผ๋ณธ ํด๊ฒฐ์ด ์๋๋ค. Kafka๋ ์ธํ๋ผ๋ฅผ ํ๋ ๋ ๋์ฐ๋ ๋น์ฉ์ด ์์ง๋ง, ๋ฉ์์ง ๋ณด์ฅ์ ๊ทผ๋ณธ ํด๊ฒฐ์ด๋ค.
๊ต์ฒด ์์
์ ์์ํ ๋ ์์ํ๋ ๊ฒ๋ณด๋ค ํจ์ฌ ์ฌ์ ๋ค. ์ด์ ๋ BatchEventPort ์ธํฐํ์ด์ค ๋๋ถ์ด์๋ค.
// core-domain โ ๋ณ๊ฒฝ ์์
public interface BatchEventPort {
void publishPendingEvent(Long batchJobId);
boolean isAvailable();
}
API ์๋ฒ์ BatchFileUploadService๋ BatchEventPort๋ง ์๊ณ ์๋ค. Redis์ธ์ง Kafka์ธ์ง ๋ชจ๋ฅธ๋ค.
// BatchFileUploadService โ ๋ณ๊ฒฝ ์์
@Service
public class BatchFileUploadService {
private final BatchEventPort batchEventPort; // โ ์ธํฐํ์ด์ค๋ง ์์กด
private void publishBatchEvent(Long batchJobId) {
batchEventPort.publishPendingEvent(batchJobId); // ๊ตฌํ์ฒด๊ฐ ๋ญ๋ ์๊ด์์
}
}
Redis ๊ตฌํ์ฒด๋ฅผ Kafka ๊ตฌํ์ฒด๋ก ๊ต์ฒดํ ๊ฒ์ด ์ ๋ถ์๋ค.
// โ ์ญ์ : RedisBatchEventAdapter.java
@Component
@Profile("redis")
public class RedisBatchEventAdapter implements BatchEventPort {
private final RedisTemplate<String, Long> redisTemplate;
@Override
public void publishPendingEvent(Long batchJobId) {
redisTemplate.convertAndSend("batch:pending", batchJobId);
}
}
// โ
์ถ๊ฐ: KafkaBatchEventAdapter.java
@Component
@Profile({"local", "dev"})
public class KafkaBatchEventAdapter implements BatchEventPort {
private final KafkaTemplate<String, Long> kafkaTemplate;
@Override
public void publishPendingEvent(Long batchJobId) {
kafkaTemplate.send(KafkaTopics.BATCH_JOBS, batchJobId.toString(), batchJobId)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("โ Kafka ์ด๋ฒคํธ ๋ฐํ ์คํจ - batchJobId: {}", batchJobId, ex);
// PENDING ์ํ ์ ์ง โ ๊ธฐ๋ ์ ๋ณต๊ตฌ ์ค์ผ์ค๋ฌ๊ฐ ์์ ๋ง
} else {
log.info("๐ข Kafka ์ด๋ฒคํธ ๋ฐํ ์๋ฃ - partition: {}, offset: {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}
๋น์ฆ๋์ค ๋ก์ง(BatchFileUploadService) ๋ณ๊ฒฝ ์ค ์: 0์ค
Port & Adapter ํจํด์ ์ด๋ก ์ ์ผ๋ก "์ธํ๋ผ๋ฅผ ๊ฐ์๋ผ์ธ ์ ์๋ค"๊ณ ํ๋๋ฐ, ์ค์ ๋ก ์ฒดํํ๊ธฐ ์ ๊น์ง ๋ฐ์ ๋ฐ์ํ๋ค. ์ด๋ฒ ๊ต์ฒด์์ ์ฒ์์ผ๋ก ๊ทธ ๊ฐ์น๋ฅผ ํผ๋ถ๋ก ๋๊ผ๋ค.
Kafka ๊ด๋ จ ์ฝ๋๋ ๋ณ๋ ๋ชจ๋๋ก ๋ถ๋ฆฌํ๋ค.
core/core-infrastructure/
โโโ infrastructure-redis/ โ ๊ธฐ์กด ์ ์ง (JWT, ์๋ฆผ ์ฉ๋)
โโโ infrastructure-kafka/ โ ์ ๊ท ์ถ๊ฐ (๋ฐฐ์น ์ด๋ฒคํธ ์ ์ฉ)
โโโ src/main/java/
โโโ KafkaTopics.java โ ํ ํฝ ์ด๋ฆ ์์
โโโ config/
โ โโโ KafkaProducerConfig.java
โโโ adapter/
โโโ KafkaBatchEventAdapter.java
์ค์ํ ์ : infrastructure-kafka๋ infrastructure-redis๋ฅผ ์์กดํ์ง ์๋๋ค. ๋ ๋ชจ๋์ ์์ ํ ๋
๋ฆฝ์ ์ด๋ค. Redis๊ฐ ์ฃฝ์ด๋ Kafka๋ ์ด์์๊ณ , Kafka๊ฐ ์ฃฝ์ด๋ Redis(JWT, ์๋ฆผ)๋ ์ด์์๋ค.
๋ฐฐ์น ์๋ฒ์ Kafka Consumer๋ ๋จ์ํ ๋ฉ์์ง๋ฅผ ๋ฐ์์ Job์ ์คํํ๋ ๊ฒ ์๋๋ค. Race Condition์ด ์๋ค.
API ์๋ฒ: DB์ BatchFileJob ์ ์ฅ โ ํธ๋์ญ์
์ปค๋ฐ
โ (์ ms ์ด๋ด)
Kafka์ ๋ฉ์์ง ๋ฐํ
๋ฐฐ์น ์๋ฒ: Kafka ๋ฉ์์ง ์์
โ (์ฆ์)
DB์์ BatchFileJob ์กฐํ โ ???
์์ง API ์๋ฒ ํธ๋์ญ์
์ด ์ปค๋ฐ ์ ๋์ ์ ์์!
ํธ๋์ญ์ ์ปค๋ฐ๊ณผ Kafka ๋ฉ์์ง ๋ฐํ ์ฌ์ด์ ํ์ด ์๋ค. ๋ฐฐ์น ์๋ฒ๊ฐ ๋ฉ์์ง๋ฅผ ๋ฐ๋ ์๋๊ฐ ๋ ๋น ๋ฅผ ์ ์๋ค.
// โ
3๋จ๊ณ ์ฒ๋ฆฌ๋ก Race Condition ํด๊ฒฐ
@KafkaListener(
topics = KafkaTopics.BATCH_JOBS,
groupId = "batch-worker",
containerFactory = "batchKafkaListenerContainerFactory"
)
public void onBatchJobReceived(Long batchJobId, Acknowledgment ack) {
try {
// 1๋จ๊ณ: DB ์กฐํ โ ์ต๋ 3ํ ์ฌ์๋ (API ์ปค๋ฐ ๋๊ธฐ)
BatchFileJob batchJob = findBatchJobWithRetry(batchJobId);
// 2๋จ๊ณ: ์ํ ๋ณ๊ฒฝ โ REQUIRES_NEW ํธ๋์ญ์
์ผ๋ก ์ฆ์ ์ปค๋ฐ
updateStatusToProcessing(batchJobId);
// 3๋จ๊ณ: Job ์คํ โ ํธ๋์ญ์
๋ฐ์์ ์คํ
launchBatchJob(batchJob);
ack.acknowledge(); // โ
์ฑ๊ณต ์๋ง offset ์ปค๋ฐ
} catch (Exception e) {
log.error("โ ๋ฐฐ์น ์ด๋ฒคํธ ์ฒ๋ฆฌ ์คํจ - batchJobId: {} | ์ฌ์ ๋ฌ ์์ ", batchJobId, e);
// ack ๋ฏธํธ์ถ โ Kafka๊ฐ ๋์ผ ๋ฉ์์ง๋ฅผ ์ฌ์ ๋ฌํจ (At-Least-Once ๋ณด์ฅ)
}
}
// DB ์กฐํ ์ฌ์๋ (Race Condition ๋์)
private BatchFileJob findBatchJobWithRetry(Long batchJobId) {
for (int i = 0; i < 3; i++) {
Optional<BatchFileJob> job = batchFileJobRepository.findById(batchJobId);
if (job.isPresent()) return job.get();
log.warn("โ ๏ธ BatchFileJob ์กฐํ ์คํจ (์ฌ์๋ {}/3) - batchJobId: {}", i + 1, batchJobId);
Thread.sleep(500); // 500ms ๋๊ธฐ ํ ์ฌ์๋
}
throw new IllegalStateException("BatchFileJob์ ์ฐพ์ ์ ์์ต๋๋ค: " + batchJobId);
}
ack.acknowledge()๋ฅผ ์๋์ผ๋ก ํธ์ถํ๋ ๊ฒ์ด ํต์ฌ์ด๋ค. ์ฑ๊ณตํ์ ๋๋ง offset์ ์ปค๋ฐํ๋ค. ์คํจํ๋ฉด Kafka๊ฐ ๊ฐ์ ๋ฉ์์ง๋ฅผ ์ฌ์ ๋ฌํ๋ค. ์ด๊ฒ์ด At-Least-Once ๋ณด์ฅ์ ์ค์ ๋์์ด๋ค.
๋คํธ์ํฌ ์ค๋ฅ๋ DB ์ผ์ ์ฅ์ ๋ก ๋ฐฐ์น ์ฒ๋ฆฌ๊ฐ ์คํจํ๋ฉด Kafka๊ฐ ์ฌ์ ๋ฌํ๋ค. ๊ทธ๋ฐ๋ฐ ํ์ผ์ด ์์๋๊ฑฐ๋ ์ฒ๋ฆฌ ์์ฒด๊ฐ ๋ถ๊ฐ๋ฅํ ๊ฒฝ์ฐ๋ผ๋ฉด? ๋ฌดํ ์ฌ์ ๋ฌ์ด ๋ฐ๋ณต๋๋ค.
์ด๋ฅผ ๋ง๊ธฐ ์ํด DLT(Dead Letter Topic) ๋ฅผ ๋์ ํ๋ค.
๋ฐฐ์น ์ฒ๋ฆฌ ์คํจ (์์ธ ๋ฐ์ โ ack ๋ฏธํธ์ถ)
โ
โผ
DefaultErrorHandler ์ฌ์๋: 3์ด ๊ฐ๊ฒฉ, ์ต๋ 3ํ
โ
โโโ 3ํ ์ฌ์๋ ์ค ์ฑ๊ณต โ ์ ์ ์ฒ๋ฆฌ
โ
โโโ 3ํ ๋ชจ๋ ์คํจ
โ
DeadLetterPublishingRecoverer โ batch-jobs.DLT ํ ํฝ์ ๋ฐํ
โ
DltBatchEventListener.onDltMessage() ์์
โ
BatchFileJob.status = FAILED + errorMessage ๊ธฐ๋ก
ack.acknowledge() โ ๋ฌดํ ๋ฃจํ ๋ฐฉ์ง
// Consumer ์ค์ โ ์ฌ์๋ + DLT
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<?, ?> kafkaTemplate) {
// ์ฌ์ฒ๋ฆฌ ๋ถ๊ฐ ์์ธ๋ ์ฆ์ DLT๋ก
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate);
// 3์ด ๊ฐ๊ฒฉ, ์ต๋ 3ํ ์ฌ์๋
FixedBackOff backOff = new FixedBackOff(3000L, 3L);
return new DefaultErrorHandler(recoverer, backOff);
}
// DLT ๋ฆฌ์ค๋ โ ์ต์ข
์คํจ ์ฒ๋ฆฌ
@KafkaListener(topics = "batch-jobs.DLT", groupId = "batch-worker-dlt")
public void onDltMessage(Long batchJobId, Acknowledgment ack) {
try {
BatchFileJob job = batchFileJobRepository.findById(batchJobId)
.orElseThrow();
job.updateStatus(BatchProcessingStatus.FAILED);
job.updateErrorMessage("DLT ๊ฒฉ๋ฆฌ: ์ฌ์๋ 3ํ ์์ง");
batchFileJobRepository.save(job);
log.error("๐ DLT ์ฒ๋ฆฌ ์๋ฃ - batchJobId: {} FAILED ํ์ ", batchJobId);
} finally {
ack.acknowledge(); // ํญ์ ํธ์ถ โ DLT ๋ฌดํ ๋ฃจํ ๋ฐฉ์ง
}
}
์ฌ์๋ ๊ณ์ธต์ 3๋จ๊ณ๋ก ๋ถ๋ฆฌํ ๊ฒ๋ ์๋์ ์ด๋ค:
| ๊ณ์ธต | ์์น | ๋์ | ํ์ |
|---|---|---|---|
| ๋ด๋ถ ์ฌ์๋ | findBatchJobWithRetry() | API DB ์ปค๋ฐ ๋๊ธฐ (Race Condition) | 3ํ / 500ms |
| ์ธ๋ถ ์ฌ์๋ | DefaultErrorHandler | ๋คํธ์ํฌ/DB ์ผ์ ์ฅ์ | 3ํ / 3000ms |
| DLT ๊ฒฉ๋ฆฌ | DltBatchEventListener | ์ฌ์๋ ์์ง, ๋ณต๊ตฌ ๋ถ๊ฐ | ์ต์ข ํ์ |
โ๏ธ Kafka 3.x๋ถํฐ๋ Zookeeper ์์ด ๋์ํ๋ KRaft(Kafka Raft) ๋ชจ๋๋ฅผ ์ง์ํ๋ค. ๊ฐ๋ฐ/์คํ ์ด์ง ํ๊ฒฝ์์๋ ์ปจํ ์ด๋ ํ๋๋ก ์ถฉ๋ถํ๋ค.
# docker-compose-local.yml
kafka:
image: apache/kafka:3.8.1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller # ํ๋์ ๋
ธ๋๊ฐ ๋ ์ญํ ๋ด๋น
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ["/opt/kafka/bin/kafka-topics.sh", "--bootstrap-server", "localhost:9092", "--list"]
interval: 10s
retries: 5
Zookeeper + Kafka ๋ ์ปจํ ์ด๋๋ฅผ ๋์ ๋ ์ด์ ๊ณผ ๋ฌ๋ฆฌ, ์ด์ Kafka ํ๋๋ง ์์ผ๋ฉด ๋๋ค.
| ํญ๋ชฉ | Redis Pub/Sub (์ด์ ) | Kafka (์ดํ) |
|---|---|---|
| ๋ฐฐ์น ์๋ฒ ์ฌ์์ ์ค ๋ฉ์์ง | ์ ์ค | offset์ ๋ณด๊ด โ ์ฌ์์ ํ ์๋ ์ฒ๋ฆฌ |
| ๋ฐฐ์น ์์ ์ง์ฐ | ์ต๋ 30๋ถ (์ค์ผ์ค๋ฌ) | ์ ์ด ์ด๋ด |
| ์ฝ๋ ๋ณต์ก๋ | BatchRecoveryScheduler + Redis ๋ฆฌ์ค๋ | Kafka ๋ฆฌ์ค๋๋ง |
| ์ฒ๋ฆฌ ์คํจ ์ | ๋ก๊ทธ๋ง ๋จ์ | DLT โ DB FAILED ํ์ |
| ๋ฉ์์ง ๋ณด์ฅ | Fire-and-Forget | At-Least-Once |
BatchRecoveryScheduler์ 30๋ถ ํด๋ง ๋ก์ง์ ์ ๊ฑฐํ๋ค. ๊ธฐ๋ ์ ๋๋ฝ๋ QUEUED/IN_PROGRESS ์์
์ ๋ณต๊ตฌํ๋ ๋ก์ง์ ๋จ๊ฒผ๋ค. Kafka๋ ๋ฉ์์ง๋ฅผ ๋ณด์ฅํ์ง๋ง, ์ฑ ํฌ๋์๋ก ์ธํด Kafka ๋ฉ์์ง๋ฅผ ์์ ์์ ๋ชป ํ ๊ฒฝ์ฐ๋ Kafka๋ก ํด๊ฒฐ์ด ์ ๋๊ธฐ ๋๋ฌธ์ด๋ค.
1. ๋ฉ์์ง ๋ธ๋ก์ปค๋ฅผ ์ ํํ ๋๋ "๊ตฌ๋ ์๊ฐ ์์ ๋ ์ด๋ป๊ฒ ๋๋"๋ฅผ ๋จผ์ ๋ฌผ์ด๋ผ
Redis Pub/Sub์ Fire-and-Forget ํน์ฑ์ ๋ฌธ์์ ๋ช ์๋์ด ์๋ค. ์ฒ์ ๋์ ํ ๋ ์ด๊ฒ์ ๊ฐ๊ณผํ๋ค. "๋จ์ํ ์ด๋ฒคํธ ์ ๋ฌ"์ Redis๋ฅผ ์ฐ๋ ๊ฑด ์ ํฉํ์ง๋ง, "๋ฐ๋์ ์ฒ๋ฆฌ๋์ด์ผ ํ๋ ์ด๋ฒคํธ"์๋ ๋ง์ง ์๋๋ค.
2. Port & Adapter ํจํด์ "์ธํ๋ผ ๊ต์ฒด"์ ์๊ฐ์ ๋น๋๋ค
์ด ๊ต์ฒด๋ฅผ ํตํด BatchEventPort ์ธํฐํ์ด์ค๊ฐ ์ ์์ด์ผ ํ๋์ง๋ฅผ ์ค๊ฐํ๋ค. ์ธํฐํ์ด์ค ์์ด RedisTemplate์ ์๋น์ค์์ ์ง์ ์ฐ๊ณ ์์๋ค๋ฉด, ๊ต์ฒด ๋ฒ์๊ฐ ๋น์ฆ๋์ค ๋ก์ง๊น์ง ํผ์ก์ ๊ฒ์ด๋ค.
3. ์์๋ฐฉํธ์ด ์ฝ๋๋ฅผ ์ด๋ป๊ฒ ์ฉํ๋์ง ๋ณด์๋ค
BatchRecoveryScheduler๋ ํฉ๋ฆฌ์ ์ธ ์์๋ฐฉํธ์ด์๋ค. ๊ทธ๋ฐ๋ฐ ๊ทธ๊ฒ์ด ์๋ ํ "๋ฉ์์ง ์ ์ค ๋ฌธ์ ๊ฐ ํด๊ฒฐ๋๋ค"๋ ์ฐฉ๊ฐ์ด ์๊ธด๋ค. ๊ทผ๋ณธ ์์ธ์ ํด๊ฒฐํ์ง ์๊ณ ์์ ์ฝ๋๋ฅผ ์์ผ๋ฉด, ๋์ค์๋ ์ด๋ ์ฝ๋๊ฐ ์ ์๋์ง ์ค๋ช
ํ๊ธฐ ์ด๋ ค์์ง๋ค.