๐Ÿค” ๋ฉ”์‹œ์ง€๊ฐ€ ์‚ฌ๋ผ์กŒ๋‹ค! โ€” Redis Pub/Sub -> Kafka๋กœ ๊ต์ฒดํ•œ ์ด์œ  ๊ทธ๋ฆฌ๊ณ  DLT๋ฅผ ํ•˜๋Š” ์ด์œ 

devdoยท2026๋…„ 5์›” 12์ผ

SpringBoot

๋ชฉ๋ก ๋ณด๊ธฐ
43/46
post-thumbnail

๋ฐฐ๊ฒฝ: ๋ฐฐ์น˜ ํŠธ๋ฆฌ๊ฑฐ๋Š” ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ–ˆ๋Š”๊ฐ€

์šฐ๋ฆฌ ์„œ๋น„์Šค์—๋Š” ๋Œ€์šฉ๋Ÿ‰ ์ด๋ฏธ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” Spring Batch ํŒŒ์ดํ”„๋ผ์ธ์ด ์žˆ๋‹ค. ์‚ฌ์šฉ์ž๊ฐ€ ZIP ํŒŒ์ผ์„ ์˜ฌ๋ฆฌ๋ฉด, API ์„œ๋ฒ„(app-api)๊ฐ€ ํŒŒ์ผ์„ ์ž„์‹œ ์ €์žฅํ•˜๊ณ  ๋ฐฐ์น˜ ์„œ๋ฒ„(app-batch)์— "์ฒ˜๋ฆฌ ์‹œ์ž‘ํ•ด" ๋ผ๋Š” ์‹ ํ˜ธ๋ฅผ ๋ณด๋‚ธ๋‹ค.

์ด ์‹ ํ˜ธ๋ฅผ ์ฒ˜์Œ์—๋Š” Redis Pub/Sub์œผ๋กœ ๊ตฌํ˜„ํ–ˆ๋‹ค.

์‚ฌ์šฉ์ž ์—…๋กœ๋“œ
    โ”‚
    โ–ผ
app-api โ†’ Redis PUBLISH "batch:pending" {batchJobId}
    โ”‚
    โ–ผ (๊ตฌ๋… ์ค‘์ธ ๊ฒฝ์šฐ)
app-batch โ†’ RedisBatchEventListener.onMessage() โ†’ Spring Batch Job ์‹คํ–‰

์ง๊ด€์ ์ด๊ณ  ๊ตฌํ˜„๋„ ๋น ๋ฅด๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๋‹น๋ถ„๊ฐ„์€ ์ž˜ ๋™์ž‘ํ–ˆ๋‹ค.


๋ฌธ์ œ ๋ฐœ๊ฒฌ: "๋ฐฐ์น˜๊ฐ€ ์™œ ์•ˆ ์‹œ์ž‘๋˜์ฃ ?"

์–ด๋А ๋‚  QA์—์„œ ๋ฒ„๊ทธ ๋ฆฌํฌํŠธ๊ฐ€ ๋“ค์–ด์™”๋‹ค.

"ํŒŒ์ผ ์—…๋กœ๋“œ๋Š” ๋๋Š”๋ฐ ์ฒ˜๋ฆฌ๊ฐ€ ์‹œ์ž‘์ด ์•ˆ ๋ฉ๋‹ˆ๋‹ค."

๋กœ๊ทธ๋ฅผ ๋ณด๋‹ˆ API ์„œ๋ฒ„๋Š” ์ •์ƒ์ ์œผ๋กœ ์ด๋ฒคํŠธ๋ฅผ ๋ฐœํ–‰ํ–ˆ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ๋ฐฐ์น˜ ์„œ๋ฒ„๊ฐ€ ์ฒ˜๋ฆฌํ•œ ํ”์ ์ด ์—†์—ˆ๋‹ค. ๋ฐฐ์น˜ ์„œ๋ฒ„๋ฅผ ์žฌ์‹œ์ž‘ํ•œ ์งํ›„์— ์˜ฌ๋ผ์˜จ ์—…๋กœ๋“œ์˜€๋‹ค.

์›์ธ์€ Redis Pub/Sub์˜ ํ•ต์‹ฌ ํŠน์„ฑ ๋•Œ๋ฌธ์ด์—ˆ๋‹ค.

Redis Pub/Sub์˜ ์น˜๋ช…์  ํ•œ๊ณ„: Fire-and-Forget

API ์„œ๋ฒ„:   PUBLISH batch:pending โ†’ ์„ฑ๊ณต (Redis ์‘๋‹ต: 0 โ€” ๊ตฌ๋…์ž 0๋ช…)
๋ฐฐ์น˜ ์„œ๋ฒ„:  [์žฌ์‹œ์ž‘ ์ค‘... ์•„์ง Subscribe ์•ˆ ํ•จ]

โ†’ ๋ฉ”์‹œ์ง€๋Š” Redis์— ์ €์žฅ๋˜์ง€ ์•Š๊ณ  ์ฆ‰์‹œ ์†Œ๋ฉธ

Redis Pub/Sub์€ ์นด์นด์˜คํ†ก ๋ผ์ด๋ธŒ ์ฑ„ํŒ…๊ณผ ๊ฐ™๋‹ค.
์ฑ„ํŒ…๋ฐฉ์— ์•„๋ฌด๋„ ์—†์„ ๋•Œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋ฉด ๐Ÿ’ฅ ๋ฉ”์‹œ์ง€๊ฐ€ ์‚ฌ๋ผ์ง„๋‹ค!

Redis๋Š” "๊ตฌ๋…์ž๊ฐ€ ์—†์Œ" ์ด๋ผ๋Š” ์‘๋‹ต(0)์„ ๋Œ๋ ค์ค„ ๋ฟ, ๋ฉ”์‹œ์ง€๋ฅผ ์–ด๋””์—๋„ ๋ณด๊ด€ํ•˜์ง€ ์•Š๋Š”๋‹ค.

์šฐํŽธํ•จ์— ๋น„์œ ํ•˜๋ฉด, Redis Pub/Sub์€ ์šฐํŽธํ•จ์ด ์—†๋‹ค. ์‚ฌ๋žŒ์ด ์—†์œผ๋ฉด ํŽธ์ง€๊ฐ€ ๊ณต์ค‘์—์„œ ์‚ฌ๋ผ์ง„๋‹ค.

์ƒํ™ฉRedis Pub/SubKafka
๊ตฌ๋…์ž๊ฐ€ ์—†์„ ๋•Œ ๋ฐœํ–‰๋ฉ”์‹œ์ง€ ์†Œ๋ฉธํ† ํ”ฝ์— ์ €์žฅ๋จ
๊ตฌ๋…์ž ์žฌ์‹œ์ž‘ ํ›„์ด์ „ ๋ฉ”์‹œ์ง€ ์ˆ˜์‹  ๋ถˆ๊ฐ€offset ์ดํ›„ ๋ฉ”์‹œ์ง€ ์žฌ์ˆ˜์‹ 
๋ธŒ๋กœ์ปค ์žฅ์•  ํ›„๋ฉ”์‹œ์ง€ ์œ ์‹ค์žฌ๊ธฐ๋™ ํ›„ ๋ณต๊ตฌ

์ž„์‹œ๋ฐฉํŽธ: BatchRecoveryScheduler

๋ฌธ์ œ๋ฅผ ์ธ์ง€ํ•œ ํ›„ 30๋ถ„๋งˆ๋‹ค DB๋ฅผ ํด๋งํ•˜๋Š” ์Šค์ผ€์ค„๋Ÿฌ๋ฅผ ๋งŒ๋“ค์—ˆ๋‹ค.

// โŒ ์ž„์‹œ๋ฐฉํŽธ โ€” ๋ณต์žก์„ฑ๋งŒ ์˜ฌ๋ผ๊ฐ”๋‹ค
@Scheduled(fixedDelay = 1800000) // 30๋ถ„
public void processDelayedJobs() {
    // PENDING ์ƒํƒœ์ธ ์ž‘์—… ์กฐํšŒ โ†’ ์ด๋ฒคํŠธ ์žฌ๋ฐœํ–‰
    List<BatchFileJob> pendingJobs = batchFileJobRepository
        .findByStatusAndCreatedAtBefore(BatchProcessingStatus.PENDING, 
                                        LocalDateTime.now().minusMinutes(30));
    
    for (BatchFileJob job : pendingJobs) {
        batchEventPort.publishPendingEvent(job.getId());
    }
}

์ด๊ฒƒ์€ ์ง„์งœ ํ•ด๊ฒฐ์ด ์•„๋‹ˆ์—ˆ๋‹ค. ๋ฉ”์‹œ์ง€๊ฐ€ ์œ ์‹ค๋œ ๋ฐฐ์น˜ ์ž‘์—…์€ ์ตœ๋Œ€ 30๋ถ„ ๋’ค์—์•ผ ์žฌ์ฒ˜๋ฆฌ๊ฐ€ ์‹œ์ž‘๋œ๋‹ค. ์‚ฌ์šฉ์ž ์ž…์žฅ์—์„œ๋Š” ์—…๋กœ๋“œ ํ›„ 30๋ถ„ ๋™์•ˆ ์•„๋ฌด ์ผ๋„ ์ผ์–ด๋‚˜์ง€ ์•Š๋Š” ๊ฒƒ์ฒ˜๋Ÿผ ๋ณด์ธ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ์ฝ”๋“œ๋„ ๋ณต์žกํ•ด์กŒ๋‹ค. "์™œ ์Šค์ผ€์ค„๋Ÿฌ๊ฐ€ ์žˆ์–ด์š”?" ๋ผ๋Š” ์งˆ๋ฌธ์— "๋ฉ”์‹œ์ง€ ์œ ์‹ค ๋ณด์™„์šฉ์ด์—์š”"๋ผ๊ณ  ๋‹ตํ•˜๋Š” ์ƒํ™ฉ์ด ๋๋‹ค.



โ˜‘๏ธ ์™œ Kafka์ธ๊ฐ€ โ€” ์˜์‚ฌ๊ฒฐ์ •

๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค ๊ต์ฒด๋ฅผ ๊ฒฐ์ •ํ•  ๋•Œ ์„ธ ๊ฐ€์ง€ ์˜ต์…˜์„ ๊ฒ€ํ† ํ–ˆ๋‹ค.

์˜ต์…˜ A: Redis Stream ์‚ฌ์šฉ

Redis 5.0์—์„œ ์ถ”๊ฐ€๋œ Redis Stream์€ Kafka์ฒ˜๋Ÿผ ๋ฉ”์‹œ์ง€๋ฅผ ์˜์†์ ์œผ๋กœ ์ €์žฅํ•œ๋‹ค.

์žฅ์ : ์ด๋ฏธ Redis๋ฅผ ์“ฐ๊ณ  ์žˆ์œผ๋ฏ€๋กœ ์ธํ”„๋ผ ์ถ”๊ฐ€ ์—†์Œ
๋‹จ์ : Redis Stream์€ Redis๊ฐ€ ๋‹จ์ผ ์žฅ์• ์ ์ด ๋จ (์šฐ๋ฆฌ๋Š” ์ด๋ฏธ Redis๋ฅผ JWT/์•Œ๋ฆผ์— ์‚ฌ์šฉ)
      Redis ์žฅ์•  ์‹œ ๋ฐฐ์น˜ ํŠธ๋ฆฌ๊ฑฐ + JWT ์ธ์ฆ + ์•Œ๋ฆผ์ด ๋™์‹œ์— ๋ถˆ๋Šฅ

์˜ต์…˜ B: DB ํด๋ง ๋ฐฉ์‹์œผ๋กœ ์ „ํ™˜

Redis๋ฅผ ์•„์˜ˆ ์•ˆ ์“ฐ๊ณ , ๋ฐฐ์น˜ ์„œ๋ฒ„๊ฐ€ ์ฃผ๊ธฐ์ ์œผ๋กœ DB์—์„œ PENDING ์ž‘์—…์„ ๊ฐ€์ ธ๊ฐ„๋‹ค.

์žฅ์ : ์ธํ”„๋ผ ์ถ”๊ฐ€ ์—†์Œ, ๋ฉ”์‹œ์ง€ ์œ ์‹ค ์—†์Œ (DB์— ์ƒํƒœ ์ €์žฅ)
๋‹จ์ : ํด๋ง ์ฃผ๊ธฐ(์˜ˆ: 5์ดˆ)๋งˆ๋‹ค ๋ถˆํ•„์š”ํ•œ DB ์ฟผ๋ฆฌ
      ์ฒ˜๋ฆฌ ์‹œ์ž‘ ์ง€์—ฐ์ด ํด๋ง ์ฃผ๊ธฐ๋งŒํผ ๋ฐœ์ƒ

์˜ต์…˜ C: Kafka (์„ ํƒ)

์žฅ์ : At-Least-Once ๋ณด์žฅ (offset ๊ธฐ๋ฐ˜ ์žฌ์ฒ˜๋ฆฌ)
      ๋ฐฐ์น˜ ์„œ๋ฒ„ ์žฌ์‹œ์ž‘ ํ›„ ์•ˆ ์ฝ์€ ๋ฉ”์‹œ์ง€ ์ž๋™ ์žฌ์ฒ˜๋ฆฌ
      30๋ถ„ ํด๋ง ์Šค์ผ€์ค„๋Ÿฌ ์ œ๊ฑฐ ๊ฐ€๋Šฅ
๋‹จ์ : ์ธํ”„๋ผ ์ถ”๊ฐ€ (Kafka ์ปจํ…Œ์ด๋„ˆ)
      ํ•™์Šต ๋น„์šฉ

์„ ํƒ ์ด์œ : ๋ฐฐ์น˜ ์„œ๋ฒ„๋Š” ๊ฐ€๋” ์žฌ์‹œ์ž‘๋œ๋‹ค. ์žฌ์‹œ์ž‘๋งˆ๋‹ค ๋ฉ”์‹œ์ง€๋ฅผ ์žƒ์œผ๋ฉด ์•ˆ ๋œ๋‹ค. Redis Stream์€ ๊ธฐ์กด Redis ์˜์กด์„ฑ์„ ๋” ๋ฌด๊ฒ๊ฒŒ ๋งŒ๋“ค๊ณ , DB ํด๋ง์€ ๊ทผ๋ณธ ํ•ด๊ฒฐ์ด ์•„๋‹ˆ๋‹ค. Kafka๋Š” ์ธํ”„๋ผ๋ฅผ ํ•˜๋‚˜ ๋” ๋„์šฐ๋Š” ๋น„์šฉ์ด ์žˆ์ง€๋งŒ, ๋ฉ”์‹œ์ง€ ๋ณด์žฅ์˜ ๊ทผ๋ณธ ํ•ด๊ฒฐ์ด๋‹ค.


ํ—ฅ์‚ฌ๊ณ ๋‚  ์•„ํ‚คํ…์ฒ˜๊ฐ€ ์ง„๊ฐ€๋ฅผ ๋ฐœํœ˜ํ•œ ์ˆœ๊ฐ„

๊ต์ฒด ์ž‘์—…์„ ์‹œ์ž‘ํ•  ๋•Œ ์˜ˆ์ƒํ–ˆ๋˜ ๊ฒƒ๋ณด๋‹ค ํ›จ์”ฌ ์‰ฌ์› ๋‹ค. ์ด์œ ๋Š” BatchEventPort ์ธํ„ฐํŽ˜์ด์Šค ๋•๋ถ„์ด์—ˆ๋‹ค.

// core-domain โ€” ๋ณ€๊ฒฝ ์—†์Œ
public interface BatchEventPort {
    void publishPendingEvent(Long batchJobId);
    boolean isAvailable();
}

API ์„œ๋ฒ„์˜ BatchFileUploadService๋Š” BatchEventPort๋งŒ ์•Œ๊ณ  ์žˆ๋‹ค. Redis์ธ์ง€ Kafka์ธ์ง€ ๋ชจ๋ฅธ๋‹ค.

// BatchFileUploadService โ€” ๋ณ€๊ฒฝ ์—†์Œ
@Service
public class BatchFileUploadService {
    private final BatchEventPort batchEventPort; // โ† ์ธํ„ฐํŽ˜์ด์Šค๋งŒ ์˜์กด

    private void publishBatchEvent(Long batchJobId) {
        batchEventPort.publishPendingEvent(batchJobId); // ๊ตฌํ˜„์ฒด๊ฐ€ ๋ญ๋“  ์ƒ๊ด€์—†์Œ
    }
}

Redis ๊ตฌํ˜„์ฒด๋ฅผ Kafka ๊ตฌํ˜„์ฒด๋กœ ๊ต์ฒดํ•œ ๊ฒƒ์ด ์ „๋ถ€์˜€๋‹ค.

// โŒ ์‚ญ์ œ: RedisBatchEventAdapter.java
@Component
@Profile("redis")
public class RedisBatchEventAdapter implements BatchEventPort {
    private final RedisTemplate<String, Long> redisTemplate;
    
    @Override
    public void publishPendingEvent(Long batchJobId) {
        redisTemplate.convertAndSend("batch:pending", batchJobId);
    }
}

// โœ… ์ถ”๊ฐ€: KafkaBatchEventAdapter.java
@Component
@Profile({"local", "dev"})
public class KafkaBatchEventAdapter implements BatchEventPort {
    private final KafkaTemplate<String, Long> kafkaTemplate;
    
    @Override
    public void publishPendingEvent(Long batchJobId) {
        kafkaTemplate.send(KafkaTopics.BATCH_JOBS, batchJobId.toString(), batchJobId)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("โŒ Kafka ์ด๋ฒคํŠธ ๋ฐœํ–‰ ์‹คํŒจ - batchJobId: {}", batchJobId, ex);
                    // PENDING ์ƒํƒœ ์œ ์ง€ โ†’ ๊ธฐ๋™ ์‹œ ๋ณต๊ตฌ ์Šค์ผ€์ค„๋Ÿฌ๊ฐ€ ์•ˆ์ „๋ง
                } else {
                    log.info("๐Ÿ“ข Kafka ์ด๋ฒคํŠธ ๋ฐœํ–‰ ์™„๋ฃŒ - partition: {}, offset: {}",
                            result.getRecordMetadata().partition(),
                            result.getRecordMetadata().offset());
                }
            });
    }
}

๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง(BatchFileUploadService) ๋ณ€๊ฒฝ ์ค„ ์ˆ˜: 0์ค„

Port & Adapter ํŒจํ„ด์€ ์ด๋ก ์ ์œผ๋กœ "์ธํ”„๋ผ๋ฅผ ๊ฐˆ์•„๋ผ์šธ ์ˆ˜ ์žˆ๋‹ค"๊ณ  ํ•˜๋Š”๋ฐ, ์‹ค์ œ๋กœ ์ฒดํ—˜ํ•˜๊ธฐ ์ „๊นŒ์ง„ ๋ฐ˜์‹ ๋ฐ˜์˜ํ–ˆ๋‹ค. ์ด๋ฒˆ ๊ต์ฒด์—์„œ ์ฒ˜์Œ์œผ๋กœ ๊ทธ ๊ฐ€์น˜๋ฅผ ํ”ผ๋ถ€๋กœ ๋А๊ผˆ๋‹ค.


์ƒˆ ๋ชจ๋“ˆ ๊ตฌ์กฐ: infrastructure-kafka

Kafka ๊ด€๋ จ ์ฝ”๋“œ๋Š” ๋ณ„๋„ ๋ชจ๋“ˆ๋กœ ๋ถ„๋ฆฌํ–ˆ๋‹ค.

core/core-infrastructure/
โ”œโ”€โ”€ infrastructure-redis/      โ† ๊ธฐ์กด ์œ ์ง€ (JWT, ์•Œ๋ฆผ ์šฉ๋„)
โ””โ”€โ”€ infrastructure-kafka/      โ† ์‹ ๊ทœ ์ถ”๊ฐ€ (๋ฐฐ์น˜ ์ด๋ฒคํŠธ ์ „์šฉ)
    โ””โ”€โ”€ src/main/java/
        โ”œโ”€โ”€ KafkaTopics.java           โ† ํ† ํ”ฝ ์ด๋ฆ„ ์ƒ์ˆ˜
        โ”œโ”€โ”€ config/
        โ”‚   โ””โ”€โ”€ KafkaProducerConfig.java
        โ””โ”€โ”€ adapter/
            โ””โ”€โ”€ KafkaBatchEventAdapter.java

์ค‘์š”ํ•œ ์ : infrastructure-kafka๋Š” infrastructure-redis๋ฅผ ์˜์กดํ•˜์ง€ ์•Š๋Š”๋‹ค. ๋‘ ๋ชจ๋“ˆ์€ ์™„์ „ํžˆ ๋…๋ฆฝ์ ์ด๋‹ค. Redis๊ฐ€ ์ฃฝ์–ด๋„ Kafka๋Š” ์‚ด์•„์žˆ๊ณ , Kafka๊ฐ€ ์ฃฝ์–ด๋„ Redis(JWT, ์•Œ๋ฆผ)๋Š” ์‚ด์•„์žˆ๋‹ค.


Consumer ๊ตฌํ˜„: 3๋‹จ๊ณ„ ์ฒ˜๋ฆฌ ํ๋ฆ„

๋ฐฐ์น˜ ์„œ๋ฒ„์˜ Kafka Consumer๋Š” ๋‹จ์ˆœํžˆ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„์„œ Job์„ ์‹คํ–‰ํ•˜๋Š” ๊ฒŒ ์•„๋‹ˆ๋‹ค. Race Condition์ด ์žˆ๋‹ค.

API ์„œ๋ฒ„: DB์— BatchFileJob ์ €์žฅ โ†’ ํŠธ๋žœ์žญ์…˜ ์ปค๋ฐ‹
             โ†“ (์ˆ˜ ms ์ด๋‚ด)
          Kafka์— ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰

๋ฐฐ์น˜ ์„œ๋ฒ„: Kafka ๋ฉ”์‹œ์ง€ ์ˆ˜์‹ 
             โ†“ (์ฆ‰์‹œ)
          DB์—์„œ BatchFileJob ์กฐํšŒ โ†’ ???
          ์•„์ง API ์„œ๋ฒ„ ํŠธ๋žœ์žญ์…˜์ด ์ปค๋ฐ‹ ์•ˆ ๋์„ ์ˆ˜ ์žˆ์Œ!

ํŠธ๋žœ์žญ์…˜ ์ปค๋ฐ‹๊ณผ Kafka ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰ ์‚ฌ์ด์— ํ‹ˆ์ด ์žˆ๋‹ค. ๋ฐฐ์น˜ ์„œ๋ฒ„๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๋Š” ์†๋„๊ฐ€ ๋” ๋น ๋ฅผ ์ˆ˜ ์žˆ๋‹ค.

// โœ… 3๋‹จ๊ณ„ ์ฒ˜๋ฆฌ๋กœ Race Condition ํ•ด๊ฒฐ
@KafkaListener(
    topics = KafkaTopics.BATCH_JOBS,
    groupId = "batch-worker",
    containerFactory = "batchKafkaListenerContainerFactory"
)
public void onBatchJobReceived(Long batchJobId, Acknowledgment ack) {
    try {
        // 1๋‹จ๊ณ„: DB ์กฐํšŒ โ€” ์ตœ๋Œ€ 3ํšŒ ์žฌ์‹œ๋„ (API ์ปค๋ฐ‹ ๋Œ€๊ธฐ)
        BatchFileJob batchJob = findBatchJobWithRetry(batchJobId);
        
        // 2๋‹จ๊ณ„: ์ƒํƒœ ๋ณ€๊ฒฝ โ€” REQUIRES_NEW ํŠธ๋žœ์žญ์…˜์œผ๋กœ ์ฆ‰์‹œ ์ปค๋ฐ‹
        updateStatusToProcessing(batchJobId);
        
        // 3๋‹จ๊ณ„: Job ์‹คํ–‰ โ€” ํŠธ๋žœ์žญ์…˜ ๋ฐ–์—์„œ ์‹คํ–‰
        launchBatchJob(batchJob);
        
        ack.acknowledge(); // โœ… ์„ฑ๊ณต ์‹œ๋งŒ offset ์ปค๋ฐ‹
    } catch (Exception e) {
        log.error("โŒ ๋ฐฐ์น˜ ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์‹คํŒจ - batchJobId: {} | ์žฌ์ „๋‹ฌ ์˜ˆ์ •", batchJobId, e);
        // ack ๋ฏธํ˜ธ์ถœ โ†’ Kafka๊ฐ€ ๋™์ผ ๋ฉ”์‹œ์ง€๋ฅผ ์žฌ์ „๋‹ฌํ•จ (At-Least-Once ๋ณด์žฅ)
    }
}

// DB ์กฐํšŒ ์žฌ์‹œ๋„ (Race Condition ๋Œ€์‘)
private BatchFileJob findBatchJobWithRetry(Long batchJobId) {
    for (int i = 0; i < 3; i++) {
        Optional<BatchFileJob> job = batchFileJobRepository.findById(batchJobId);
        if (job.isPresent()) return job.get();
        
        log.warn("โš ๏ธ BatchFileJob ์กฐํšŒ ์‹คํŒจ (์žฌ์‹œ๋„ {}/3) - batchJobId: {}", i + 1, batchJobId);
        Thread.sleep(500); // 500ms ๋Œ€๊ธฐ ํ›„ ์žฌ์‹œ๋„
    }
    throw new IllegalStateException("BatchFileJob์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: " + batchJobId);
}

ack.acknowledge()๋ฅผ ์ˆ˜๋™์œผ๋กœ ํ˜ธ์ถœํ•˜๋Š” ๊ฒƒ์ด ํ•ต์‹ฌ์ด๋‹ค. ์„ฑ๊ณตํ–ˆ์„ ๋•Œ๋งŒ offset์„ ์ปค๋ฐ‹ํ•œ๋‹ค. ์‹คํŒจํ•˜๋ฉด Kafka๊ฐ€ ๊ฐ™์€ ๋ฉ”์‹œ์ง€๋ฅผ ์žฌ์ „๋‹ฌํ•œ๋‹ค. ์ด๊ฒƒ์ด At-Least-Once ๋ณด์žฅ์˜ ์‹ค์ œ ๋™์ž‘์ด๋‹ค.



โญ DLT(Dead Letter Topic): ๊ณ„์† ์‹คํŒจํ•˜๋Š” ๋ฉ”์‹œ์ง€๋Š” ์–ด๋–ป๊ฒŒ ์ฒ˜๋ฆฌํ•˜๋‚˜

๋„คํŠธ์›Œํฌ ์˜ค๋ฅ˜๋‚˜ DB ์ผ์‹œ ์žฅ์• ๋กœ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๊ฐ€ ์‹คํŒจํ•˜๋ฉด Kafka๊ฐ€ ์žฌ์ „๋‹ฌํ•œ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ํŒŒ์ผ์ด ์†์ƒ๋๊ฑฐ๋‚˜ ์ฒ˜๋ฆฌ ์ž์ฒด๊ฐ€ ๋ถˆ๊ฐ€๋Šฅํ•œ ๊ฒฝ์šฐ๋ผ๋ฉด? ๋ฌดํ•œ ์žฌ์ „๋‹ฌ์ด ๋ฐ˜๋ณต๋œ๋‹ค.

์ด๋ฅผ ๋ง‰๊ธฐ ์œ„ํ•ด DLT(Dead Letter Topic) ๋ฅผ ๋„์ž…ํ–ˆ๋‹ค.

๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ์‹คํŒจ (์˜ˆ์™ธ ๋ฐœ์ƒ โ†’ ack ๋ฏธํ˜ธ์ถœ)
    โ”‚
    โ–ผ
DefaultErrorHandler ์žฌ์‹œ๋„: 3์ดˆ ๊ฐ„๊ฒฉ, ์ตœ๋Œ€ 3ํšŒ
    โ”‚
    โ”œโ”€โ”€ 3ํšŒ ์žฌ์‹œ๋„ ์ค‘ ์„ฑ๊ณต โ†’ ์ •์ƒ ์ฒ˜๋ฆฌ
    โ”‚
    โ””โ”€โ”€ 3ํšŒ ๋ชจ๋‘ ์‹คํŒจ
            โ†“
    DeadLetterPublishingRecoverer โ†’ batch-jobs.DLT ํ† ํ”ฝ์— ๋ฐœํ–‰
            โ†“
    DltBatchEventListener.onDltMessage() ์ˆ˜์‹ 
            โ†“
    BatchFileJob.status = FAILED + errorMessage ๊ธฐ๋ก
    ack.acknowledge() โ†’ ๋ฌดํ•œ ๋ฃจํ”„ ๋ฐฉ์ง€
// Consumer ์„ค์ • โ€” ์žฌ์‹œ๋„ + DLT
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<?, ?> kafkaTemplate) {
    // ์žฌ์ฒ˜๋ฆฌ ๋ถˆ๊ฐ€ ์˜ˆ์™ธ๋Š” ์ฆ‰์‹œ DLT๋กœ
    DeadLetterPublishingRecoverer recoverer = 
        new DeadLetterPublishingRecoverer(kafkaTemplate);
    
    // 3์ดˆ ๊ฐ„๊ฒฉ, ์ตœ๋Œ€ 3ํšŒ ์žฌ์‹œ๋„
    FixedBackOff backOff = new FixedBackOff(3000L, 3L);
    
    return new DefaultErrorHandler(recoverer, backOff);
}

// DLT ๋ฆฌ์Šค๋„ˆ โ€” ์ตœ์ข… ์‹คํŒจ ์ฒ˜๋ฆฌ
@KafkaListener(topics = "batch-jobs.DLT", groupId = "batch-worker-dlt")
public void onDltMessage(Long batchJobId, Acknowledgment ack) {
    try {
        BatchFileJob job = batchFileJobRepository.findById(batchJobId)
            .orElseThrow();
        job.updateStatus(BatchProcessingStatus.FAILED);
        job.updateErrorMessage("DLT ๊ฒฉ๋ฆฌ: ์žฌ์‹œ๋„ 3ํšŒ ์†Œ์ง„");
        batchFileJobRepository.save(job);
        
        log.error("๐Ÿ’€ DLT ์ฒ˜๋ฆฌ ์™„๋ฃŒ - batchJobId: {} FAILED ํ™•์ •", batchJobId);
    } finally {
        ack.acknowledge(); // ํ•ญ์ƒ ํ˜ธ์ถœ โ€” DLT ๋ฌดํ•œ ๋ฃจํ”„ ๋ฐฉ์ง€
    }
}

์žฌ์‹œ๋„ ๊ณ„์ธต์„ 3๋‹จ๊ณ„๋กœ ๋ถ„๋ฆฌํ•œ ๊ฒƒ๋„ ์˜๋„์ ์ด๋‹ค:

๊ณ„์ธต์œ„์น˜๋Œ€์ƒํšŸ์ˆ˜
๋‚ด๋ถ€ ์žฌ์‹œ๋„findBatchJobWithRetry()API DB ์ปค๋ฐ‹ ๋Œ€๊ธฐ (Race Condition)3ํšŒ / 500ms
์™ธ๋ถ€ ์žฌ์‹œ๋„DefaultErrorHandler๋„คํŠธ์›Œํฌ/DB ์ผ์‹œ ์žฅ์• 3ํšŒ / 3000ms
DLT ๊ฒฉ๋ฆฌDltBatchEventListener์žฌ์‹œ๋„ ์†Œ์ง„, ๋ณต๊ตฌ ๋ถˆ๊ฐ€์ตœ์ข… ํ™•์ •

โญ Kafka ์„ค์ •: KRaft ๋ชจ๋“œ (Zookeeper ์—†์Œ)

โ˜‘๏ธ Kafka 3.x๋ถ€ํ„ฐ๋Š” Zookeeper ์—†์ด ๋™์ž‘ํ•˜๋Š” KRaft(Kafka Raft) ๋ชจ๋“œ๋ฅผ ์ง€์›ํ•œ๋‹ค. ๊ฐœ๋ฐœ/์Šคํ…Œ์ด์ง• ํ™˜๊ฒฝ์—์„œ๋Š” ์ปจํ…Œ์ด๋„ˆ ํ•˜๋‚˜๋กœ ์ถฉ๋ถ„ํ•˜๋‹ค.

# docker-compose-local.yml
kafka:
  image: apache/kafka:3.8.1
  environment:
    KAFKA_NODE_ID: 1
    KAFKA_PROCESS_ROLES: broker,controller  # ํ•˜๋‚˜์˜ ๋…ธ๋“œ๊ฐ€ ๋‘ ์—ญํ•  ๋‹ด๋‹น
    KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
    KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  healthcheck:
    test: ["/opt/kafka/bin/kafka-topics.sh", "--bootstrap-server", "localhost:9092", "--list"]
    interval: 10s
    retries: 5

Zookeeper + Kafka ๋‘ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ๋„์› ๋˜ ์ด์ „๊ณผ ๋‹ฌ๋ฆฌ, ์ด์ œ Kafka ํ•˜๋‚˜๋งŒ ์žˆ์œผ๋ฉด ๋œ๋‹ค.


๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ๊ฒฐ๊ณผ ๋น„๊ต

ํ•ญ๋ชฉRedis Pub/Sub (์ด์ „)Kafka (์ดํ›„)
๋ฐฐ์น˜ ์„œ๋ฒ„ ์žฌ์‹œ์ž‘ ์ค‘ ๋ฉ”์‹œ์ง€์œ ์‹คoffset์— ๋ณด๊ด€ โ†’ ์žฌ์‹œ์ž‘ ํ›„ ์ž๋™ ์ฒ˜๋ฆฌ
๋ฐฐ์น˜ ์‹œ์ž‘ ์ง€์—ฐ์ตœ๋Œ€ 30๋ถ„ (์Šค์ผ€์ค„๋Ÿฌ)์ˆ˜ ์ดˆ ์ด๋‚ด
์ฝ”๋“œ ๋ณต์žก๋„BatchRecoveryScheduler + Redis ๋ฆฌ์Šค๋„ˆKafka ๋ฆฌ์Šค๋„ˆ๋งŒ
์ฒ˜๋ฆฌ ์‹คํŒจ ์‹œ๋กœ๊ทธ๋งŒ ๋‚จ์ŒDLT โ†’ DB FAILED ํ™•์ •
๋ฉ”์‹œ์ง€ ๋ณด์žฅFire-and-ForgetAt-Least-Once

BatchRecoveryScheduler์˜ 30๋ถ„ ํด๋ง ๋กœ์ง์€ ์ œ๊ฑฐํ–ˆ๋‹ค. ๊ธฐ๋™ ์‹œ ๋ˆ„๋ฝ๋œ QUEUED/IN_PROGRESS ์ž‘์—…์„ ๋ณต๊ตฌํ•˜๋Š” ๋กœ์ง์€ ๋‚จ๊ฒผ๋‹ค. Kafka๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด์žฅํ•˜์ง€๋งŒ, ์•ฑ ํฌ๋ž˜์‹œ๋กœ ์ธํ•ด Kafka ๋ฉ”์‹œ์ง€๋ฅผ ์•„์˜ˆ ์ˆ˜์‹  ๋ชป ํ•œ ๊ฒฝ์šฐ๋Š” Kafka๋กœ ํ•ด๊ฒฐ์ด ์•ˆ ๋˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.


๋งˆ์น˜๋ฉฐ โ€” ์ด ๊ฒฝํ—˜์—์„œ ์–ป์€ ๊ฒƒ

1. ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋ฅผ ์„ ํƒํ•  ๋•Œ๋Š” "๊ตฌ๋…์ž๊ฐ€ ์—†์„ ๋•Œ ์–ด๋–ป๊ฒŒ ๋˜๋‚˜"๋ฅผ ๋จผ์ € ๋ฌผ์–ด๋ผ

Redis Pub/Sub์˜ Fire-and-Forget ํŠน์„ฑ์€ ๋ฌธ์„œ์— ๋ช…์‹œ๋˜์–ด ์žˆ๋‹ค. ์ฒ˜์Œ ๋„์ž…ํ•  ๋•Œ ์ด๊ฒƒ์„ ๊ฐ„๊ณผํ–ˆ๋‹ค. "๋‹จ์ˆœํ•œ ์ด๋ฒคํŠธ ์ „๋‹ฌ"์— Redis๋ฅผ ์“ฐ๋Š” ๊ฑด ์ ํ•ฉํ•˜์ง€๋งŒ, "๋ฐ˜๋“œ์‹œ ์ฒ˜๋ฆฌ๋˜์–ด์•ผ ํ•˜๋Š” ์ด๋ฒคํŠธ"์—๋Š” ๋งž์ง€ ์•Š๋Š”๋‹ค.

2. Port & Adapter ํŒจํ„ด์€ "์ธํ”„๋ผ ๊ต์ฒด"์˜ ์ˆœ๊ฐ„์— ๋น›๋‚œ๋‹ค

์ด ๊ต์ฒด๋ฅผ ํ†ตํ•ด BatchEventPort ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ์™œ ์žˆ์–ด์•ผ ํ•˜๋Š”์ง€๋ฅผ ์‹ค๊ฐํ–ˆ๋‹ค. ์ธํ„ฐํŽ˜์ด์Šค ์—†์ด RedisTemplate์„ ์„œ๋น„์Šค์—์„œ ์ง์ ‘ ์“ฐ๊ณ  ์žˆ์—ˆ๋‹ค๋ฉด, ๊ต์ฒด ๋ฒ”์œ„๊ฐ€ ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง๊นŒ์ง€ ํผ์กŒ์„ ๊ฒƒ์ด๋‹ค.

3. ์ž„์‹œ๋ฐฉํŽธ์ด ์ฝ”๋“œ๋ฅผ ์–ด๋–ป๊ฒŒ ์ฉํžˆ๋Š”์ง€ ๋ณด์•˜๋‹ค

BatchRecoveryScheduler๋Š” ํ•ฉ๋ฆฌ์ ์ธ ์ž„์‹œ๋ฐฉํŽธ์ด์—ˆ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ๊ทธ๊ฒƒ์ด ์žˆ๋Š” ํ•œ "๋ฉ”์‹œ์ง€ ์œ ์‹ค ๋ฌธ์ œ๊ฐ€ ํ•ด๊ฒฐ๋๋‹ค"๋Š” ์ฐฉ๊ฐ์ด ์ƒ๊ธด๋‹ค. ๊ทผ๋ณธ ์›์ธ์„ ํ•ด๊ฒฐํ•˜์ง€ ์•Š๊ณ  ์œ„์— ์ฝ”๋“œ๋ฅผ ์Œ“์œผ๋ฉด, ๋‚˜์ค‘์—๋Š” ์–ด๋А ์ฝ”๋“œ๊ฐ€ ์™œ ์žˆ๋Š”์ง€ ์„ค๋ช…ํ•˜๊ธฐ ์–ด๋ ค์›Œ์ง„๋‹ค.


profile
์ž๋ฐ” ์Šคํ”„๋ง ๋ฐฑ์—”๋“œ ๊ฐœ๋ฐœ์ž์ž…๋‹ˆ๋‹ค. ๋ฐฐ์šด ๊ฒƒ์„ ๊ธฐ๋กํ•ฉ๋‹ˆ๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€