Round8 - Kafka

Pyro·2025년 12월 17일

Loopers

목록 보기
8/10

서비스 경계를 넘어 이벤트 전달하기

Round 7에서 ApplicationEvent로 트랜잭션을 분리하고 장애 격리에 성공했다. 이번주에는 Kafka를 도입했더니, 서비스 경계를 넘어 안전하게 이벤트를 전달할 수 있었다. Transactional Outbox Pattern과 Idempotent Consumer Pattern을 적용하니, 메시지가 유실되지도 중복 처리되지도 않았다. 이벤트 기반 아키텍처는 단순히 비동기 처리가 아니라, 분산 시스템에서의 데이터 전파 를 해결해야 했다.

스케일 아웃해도 이벤트 전달하기

Spring의 ApplicationEvent 동작 원리

// 이벤트 발행
applicationEventPublisher.publishEvent(LikeAddedEvent(...))

// Spring 내부 동작
// 1. 현재 ApplicationContext의 리스너 목록 조회
// 2. 같은 JVM 내의 @EventListener만 찾음
// 3. 해당 리스너들만 호출

// ❌ 다른 서버의 리스너는 알 수 없음!

ApplicationEvent의 한계:

항목동작한계
전달 범위단일 JVM 내부다른 서버로 전달 불가
확장성Scale-up만 가능Scale-out 불가능
고가용성서버 1대 장애 시 전체 영향장애 격리 불가능
서비스 분리불가능모놀리스만 가능

"서비스를 분리하려면 다른 방법이 필요하다..."

"메시지 브로커가 필요하다"

첫 번째 시도: HTTP API로 전달?

"그럼 HTTP로 다른 서버에 알려주면 되지 않나?"

시도해본 방법:

@Async
@TransactionalEventListener(phase = AFTER_COMMIT)
fun handleLikeAdded(event: LikeAddedEvent) {
    // 모든 서버에 HTTP 요청
    servers.forEach { serverUrl ->
        restTemplate.postForEntity(
            "$serverUrl/internal/metrics/like",
            event,
            Void::class.java
        )
    }
}

문제점:

문제설명영향
서버 목록 관리어떤 서버들이 있는지 알아야 함서버 추가/제거 시 설정 변경 필요
네트워크 장애HTTP 요청 실패 시 재시도?복잡한 재시도 로직 필요
순서 보장 어려움네트워크 지연, 재시도로 전송 순서 != 도착 순서같은 상품에 좋아요 추가→취소 순서가 뒤바뀔 수 있음
중복 처리재시도 시 같은 메시지 중복 수신멱등성 처리 필요

순서 보장 문제 예시:

[Server A에서 발생]
1. 좋아요 추가 (productId=100) → HTTP 전송 시작 (네트워크 지연 500ms)
2. 좋아요 취소 (productId=100) → HTTP 전송 시작 (네트워크 지연 100ms)

[Server B에서 수신]
1. 좋아요 취소 먼저 도착 ❌
2. 좋아요 추가 나중에 도착 ❌

→ 실제: 취소 상태
→ 결과: 추가 상태 (잘못됨!)

"이건 너무 복잡하다... 전문적인 메시지 브로커가 필요해"

"메시지를 안전하게 전달하려면"

Kafka를 띄우고 Producer 작성

Docker로 Kafka를 띄우고, 간단한 Producer를 만들었다.

첫 번째 구현:

@Service
class LikeService(
    private val likeRepository: LikeRepository,
    private val kafkaTemplate: KafkaTemplate<String, String>,
    private val objectMapper: ObjectMapper,
) {
    @Transactional
    fun addLike(userId: Long, productId: Long) {
        // 1. Like 저장
        val like = Like(userId = userId, productId = productId)
        likeRepository.save(like)

        // 2. Kafka로 이벤트 전송
        val event = LikeAddedEvent(userId, productId, LocalDateTime.now())
        kafkaTemplate.send(
            "catalog-events",
            productId.toString(),
            objectMapper.writeValueAsString(event)
        )
    }
}

로컬에서 돌려보니 잘 작동했다. Consumer도 메시지를 받아서 메트릭을 업데이트했다.

하지만 새로운 문제

시나리오 1: Kafka가 느려지면?

잘못된 구현 (트랜잭션 안에서 Kafka 호출):

@Transactional  // ❌ 문제: Kafka 호출까지 트랜잭션에 포함
fun addLike(userId: Long, productId: Long) {
    likeRepository.save(like)         // 50ms
    kafkaTemplate.send(...)           // 1000ms ⚠️ 느림!
    // 트랜잭션 커밋은 Kafka 응답 후에야 가능
}

문제점:

[트랜잭션 시작]
  ├── Like 저장 (50ms)
  ├── Kafka 전송 (1000ms) ⚠️ 느림!
  │   - 네트워크 왕복
  │   - Kafka 브로커 응답 대기
  │   - 트랜잭션은 계속 유지됨!
  └── 커밋 (10ms)

총 소요 시간: ~1060ms
→ DB 커넥션을 1060ms 동안 점유!

영향:

  • Kafka가 느리면 트랜잭션도 길어짐
  • DB 커넥션 점유 시간 증가 (1초 이상)
  • 커넥션 풀 고갈 → 다른 요청 대기
  • 동시 요청 처리량 급감

"결론: 트랜잭션을 나누어야 한다"

시나리오 2: Kafka 전송 실패 시?

@Transactional
fun addLike(userId: Long, productId: Long) {
    likeRepository.save(like)  // ✅ 성공

    kafkaTemplate.send(...)  // ❌ 실패 (Kafka 다운)
    // 예외 발생!
}

문제:

  • Kafka 실패 시 전체 트랜잭션 롤백?
  • Like도 저장 안 됨
  • Kafka 장애가 도메인 로직에 직접 영향

시나리오 3: DB 커밋 후 Kafka 실패?

1. Like 저장 성공
2. DB 커밋 ✅
3. Kafka 전송 시도
4. ❌ Kafka 전송 실패 (네트워크 오류)

결과: Like는 저장됐는데, 이벤트는 미발행 😱

"DB와 Kafka를 하나의 트랜잭션으로 묶을 수 없잖아..."

Transactional Outbox Pattern

문제의 본질

핵심 문제는 DB 트랜잭션과 메시지 전송을 원자적으로 처리할 수 없다는 것이었다.

원하는 것:

[원자적 처리]
  ├── DB에 Like 저장
  └── Kafka로 이벤트 전송

둘 다 성공하거나, 둘 다 실패해야 함

현실:

[DB 트랜잭션] ≠ [Kafka 전송]

Case 1: DB 성공, Kafka 실패 → 이벤트 유실
Case 2: Kafka 성공, DB 롤백 → 잘못된 이벤트 발행
Case 3: Kafka 느림 → DB 트랜잭션 길어짐

"트랜잭션을 나누되, 메시지는 반드시 전달되어야 한다"

Outbox Pattern의 아이디어

해결책은 의외로 간단했다.

"메시지도 DB에 저장하면 되지 않을까?"

@Transactional
fun addLike(userId: Long, productId: Long) {
    // 1. Like 저장
    likeRepository.save(like)

    // 2. Outbox에 이벤트 저장 (같은 트랜잭션!)
    outboxEventRepository.save(
        OutboxEvent.create(
            eventType = "LikeAddedEvent",
            topic = "catalog-events",
            partitionKey = productId.toString(),
            payload = objectMapper.writeValueAsString(event),
            aggregateType = "Product",
            aggregateId = productId
        )
    )

    // 커밋되면 둘 다 저장됨 ✅
    // 롤백되면 둘 다 롤백됨 ✅
}

Outbox 테이블:

CREATE TABLE outbox_events (
    id BIGINT PRIMARY KEY,
    event_type VARCHAR(255),  -- "LikeAddedEvent"
    topic VARCHAR(255),        -- "catalog-events"
    partition_key VARCHAR(255), -- "100"
    payload TEXT,              -- JSON 형태의 이벤트 데이터
    status VARCHAR(20),        -- PENDING, PUBLISHED, FAILED
    retry_count INT,
    created_at TIMESTAMP,
    published_at TIMESTAMP
);

그럼 Kafka로는 언제 보내지?

OutboxRelayScheduler: 배달부

Outbox에 저장된 이벤트를 Kafka로 전달하는 스케줄러를 만들었다.

Outbox Pattern 동작 흐름:

[사용자 요청: 좋아요 추가]
   ↓
[LikeService - 트랜잭션 1]
   ├── Like 저장
   └── OutboxEvent 저장 (status: PENDING)
   ↓ 커밋

[5초 후 - OutboxRelayScheduler]
   ↓
[별도 트랜잭션 2]
   ├── PENDING 이벤트 조회
   ├── Kafka로 전송
   │   ├─ 성공 → status: PUBLISHED
   │   └─ 실패 → status: FAILED, retryCount++
   └── 상태 업데이트

Outbox Pattern의 장점

문제Before (직접 전송)After (Outbox)
트랜잭션 길이Kafka 응답까지 대기DB 저장만 (빠름)
Kafka 장애전체 롤백 🔴Like 저장 성공 ✅
메시지 유실커밋 후 실패 시 유실 가능 🔴재시도로 반드시 전달 ✅
성능Kafka 속도에 영향받음DB 속도에만 영향 ✅

"DB 트랜잭션 내에서는 DB만 다루고, 메시지 전송은 나중에 한다"

이게 Outbox Pattern의 핵심이다.

At Least Once Delivery

Outbox Pattern은 At Least Once 전달을 보장한다.

정상 흐름:

1. OutboxEvent 저장 (PENDING)
2. Scheduler가 조회
3. Kafka 전송 성공
4. status → PUBLISHED
→ ✅ 메시지 정확히 1번 전달

실패 후 재시도:

1. OutboxEvent 저장 (PENDING)
2. Scheduler가 조회
3. Kafka 전송 실패 (네트워크 오류)
4. status → FAILED, retryCount = 1

[5초 후 재시도]
5. 같은 OutboxEvent 다시 조회
6. Kafka 전송 성공
7. status → PUBLISHED
→ ✅ 메시지 반드시 전달 (최소 1번)

극단적 케이스: Kafka 전송 성공 후 DB 업데이트 실패?

1. OutboxEvent 저장 (PENDING)
2. Kafka 전송 성공 ✅
3. status 업데이트 시도
4. ❌ DB 장애 발생 (업데이트 실패)

[5초 후]
5. 같은 이벤트 다시 조회 (여전히 PENDING)
6. Kafka로 다시 전송 ✅
→ 🔔 메시지 중복 전달!

"최소 1번 전달은 보장하지만, 중복 전달될 수 있다"

이게 At Least Once의 의미다. 그럼 중복은 어떻게 처리하지?

Idempotent Consumer Pattern

Consumer의 고민

Producer는 Outbox로 해결했다. 그런데 Consumer는?

Consumer가 받는 메시지:

메시지 1: LikeAddedEvent(userId=1, productId=100)
메시지 2: LikeAddedEvent(userId=1, productId=100)  // 중복!
메시지 3: LikeAddedEvent(userId=2, productId=100)

중복 처리 시 문제:

@KafkaListener(topics = ["catalog-events"])
fun consumeCatalogEvents(message: String) {
    val event = objectMapper.readValue(message, LikeAddedEvent::class.java)

    // ProductMetrics의 likeCount 증가
    val metrics = productMetricsRepository.findByProductId(event.productId)
    metrics.incrementLikeCount()
    productMetricsRepository.save(metrics)
}

문제:

메시지 1 처리 → likeCount = 1 ✅
메시지 2 처리 → likeCount = 2 (중복!) ❌
메시지 3 처리 → likeCount = 3 ✅

실제 좋아요: 2개
집계된 좋아요: 3개 😱

"같은 메시지를 여러 번 처리해도 결과가 같아야 한다"

이게 멱등성(Idempotency)이다.

EventHandled 테이블: 처리 기록

해결책은 "이미 처리한 메시지인지 기록"하는 것이었다.

EventHandled 테이블:

CREATE TABLE event_handled (
    id BIGINT PRIMARY KEY,
    event_type VARCHAR(255),    -- "LikeAddedEvent"
    aggregate_type VARCHAR(255), -- "Product"
    aggregate_id BIGINT,         -- 100
    event_version BIGINT,        -- createdAt.nano (유일성 보장)
    handled_at TIMESTAMP,

    UNIQUE INDEX idx_event_key (
        event_type,
        aggregate_type,
        aggregate_id,
        event_version
    )
);

EventHandled의 역할:

이벤트 처리 전에 확인:
"이 이벤트를 이미 처리했나?"

→ 처리한 적 있음: Skip
→ 처리한 적 없음: 처리 + 기록

멱등 Consumer 구현

처리 흐름:

메시지 1 수신: LikeAddedEvent(userId=1, productId=100, createdAt=12:00:00.123)
   ↓
[멱등성 체크]
   SELECT * FROM event_handled
   WHERE event_type='LikeAddedEvent' AND aggregate_id=100
     AND event_version=123
   → 결과: 없음
   ↓
[비즈니스 로직]
   likeCount: 0 → 1
   ↓
[처리 기록]
   INSERT INTO event_handled (..., event_version=123)
   ↓
[Ack]
   acknowledgment.acknowledge()

메시지 2 수신: 같은 이벤트 (중복!)
   ↓
[멱등성 체크]
   SELECT * FROM event_handled WHERE ... event_version=123
   → 결과: ✅ 있음!
   ↓
[Skip]
   logger.info("중복 이벤트 Skip")
   acknowledgment.acknowledge()

→ likeCount는 여전히 1 (중복 처리 안 됨!) ✅

At Most Once Processing

EventHandled 패턴은 At Most Once 처리를 보장한다.

시나리오동작결과
정상 처리처리 + 기록✅ 1번 처리
중복 수신Skip✅ 1번만 처리
처리 중 실패재시도✅ 결국 1번 처리

"최소 1번 전달 + 최대 1번 처리 = Exactly Once Semantics"

Producer의 Outbox + Consumer의 EventHandled = 정확히 1번 처리

왜 EventHandled와 OutboxEvent를 분리했는가?

처음의 의문

"둘 다 이벤트 기록인데, 하나로 합치면 안 될까?"

테이블 스키마를 보면 비슷해 보인다:

-- OutboxEvent
outbox_events (
    event_type, topic, partition_key, payload,
    status, created_at
)

-- EventHandled
event_handled (
    event_type, aggregate_type, aggregate_id,
    event_version, handled_at
)

"이벤트 로그 테이블 하나로 관리하면 간단하지 않나?"

완전히 다른 책임

하지만 두 테이블은 완전히 다른 질문에 답한다.

OutboxEvent가 답하는 질문:

"이 이벤트를 Kafka로 발행했는가?"

→ Producer의 관심사
→ PENDING 이벤트를 찾아서 Kafka로 전송
→ 순차 조회 (created_at 순서)

EventHandled가 답하는 질문:

"이 이벤트를 이미 처리했는가?"

→ Consumer의 관심사
→ 중복 체크 (빠른 존재 여부 확인)
→ 랜덤 액세스 (유니크 키 조회)

쿼리 패턴의 차이

OutboxEvent 쿼리:

// Producer: 배치로 PENDING 이벤트 조회
fun findPendingEvents(limit: Int): List<OutboxEvent> {
    return jpaQueryFactory
        .selectFrom(outboxEvent)
        .where(outboxEvent.status.eq(OutboxEventStatus.PENDING))
        .orderBy(outboxEvent.createdAt.asc())  // 순차 조회
        .limit(limit.toLong())
        .fetch()
}

// 인덱스: (status, created_at)

EventHandled 쿼리:

// Consumer: 빠른 중복 체크
fun existsByEventKey(eventKey: EventKey): Boolean {
    return exists(
        event_handled
        WHERE event_type = ?
          AND aggregate_id = ?
          AND event_version = ?
    )
}

// 유니크 인덱스: (event_type, aggregate_type, aggregate_id, event_version)
// → O(1) 조회

성능 차이

만약 하나의 테이블로 합친다면?

CREATE TABLE event_log (
    id BIGINT,
    event_type VARCHAR(255),
    payload TEXT,
    -- Producer용 컬럼
    status VARCHAR(20),        -- PENDING, PUBLISHED
    created_at TIMESTAMP,
    -- Consumer용 컬럼
    aggregate_id BIGINT,
    event_version BIGINT,
    handled BOOLEAN,
    handled_at TIMESTAMP
);

문제점:

문제설명영향
인덱스 충돌Producer는 (status, created_at), Consumer는 (aggregate_id, event_version) 필요인덱스 비효율
테이블 락 경합Producer INSERT + Consumer SELECT 동시 발생성능 저하
데이터 크기OutboxEvent는 payload 포함 (큼), EventHandled는 키만 (작음)불필요한 저장 공간

라이프사이클의 차이

OutboxEvent:

// PUBLISHED 이벤트는 7일 후 삭제 가능
@Scheduled(cron = "0 0 3 * * *")
fun cleanupOldPublishedEvents() {
    val threshold = ZonedDateTime.now().minusDays(7)
    outboxEventRepository.deletePublishedEventsBefore(threshold)
}

EventHandled:

// 멱등성 보장을 위해 장기 보관
// 삭제하면 중복 처리 위험!
→ 보관 또는 아카이빙
테이블보관 기간클린업 정책이유
OutboxEvent7일주기적 삭제Kafka 발행만 확인하면 됨
EventHandled장기보관/아카이빙멱등성 보장 필요

트랜잭션 경계의 명확성

// Producer: OutboxEvent에만 의존
@Transactional
fun addLike(userId: Long, productId: Long) {
    likeRepository.save(Like(...))
    outboxEventPublisher.publish(LikeAddedEvent(...))
    // OutboxEvent 테이블에만 INSERT
}

// Consumer: EventHandled에만 의존
@Transactional
fun handleLikeAdded(event: LikeAddedEvent) {
    if (eventHandledRepository.exists(...)) return
    // EventHandled 테이블에만 SELECT

    processEvent(...)
    eventHandledRepository.save(EventHandled.create(...))
    // EventHandled 테이블에만 INSERT
}

Producer와 Consumer가 독립적인 테이블 사용:

  • 서로 다른 트랜잭션
  • 서로 다른 데이터베이스로 분리 가능
  • 장애 격리

"하나로 합치는 것은 중복이 아니라, 책임을 섞는 것이다"

결론

EventHandled와 OutboxEvent는:

OutboxEvent = "발행 대기열"
  - Producer가 사용
  - 순차 조회
  - 단기 보관

EventHandled = "처리 기록 해시맵"
  - Consumer가 사용
  - 빠른 중복 체크
  - 장기 보관

처음엔 "중복 아닌가?"라고 생각했지만, 실제로는 각자의 역할에 최적화된 설계였다.

배운 것들

1. ApplicationEvent의 한계

처음엔 ApplicationEvent로 충분하다고 생각했다.

하지만:

항목ApplicationEventKafka
전달 범위단일 JVM서비스 간 전달
확장성Scale-up만Scale-out 가능
영속성메모리만디스크 저장
재처리불가능Offset 조정 가능

"모놀리스에서는 ApplicationEvent, 분산 시스템에서는 Kafka"

2. Transactional Outbox Pattern

"DB 트랜잭션과 메시지 전송을 원자적으로 처리할 수 없다"

이 문제의 해결책은:

메시지도 DB에 저장하고,
별도 프로세스가 Kafka로 전송한다

Outbox Pattern의 가치:

측면가치
원자성DB 커밋과 이벤트 저장이 동일 트랜잭션
성능Kafka 속도에 영향받지 않음
안정성Kafka 장애 시에도 도메인 로직 성공
재시도자동 재시도로 메시지 유실 방지

3. Idempotent Consumer Pattern

"At Least Once 전달은 중복을 의미한다"

Producer가 "최소 1번" 전달을 보장하면, Consumer는 "최대 1번" 처리를 보장해야 한다.

EventHandled 테이블의 역할:

처리 전: "이미 처리했나?" 확인
처리 후: "처리했다" 기록

→ 같은 이벤트는 절대 2번 처리 안 됨

멱등성의 핵심:

f(x) = y
f(f(x)) = f(y) = y  // 같은 결과

좋아요 추가(event1) = likeCount++
좋아요 추가(event1 중복) = Skip (같은 결과 유지)

4. Manual Ack의 중요성

// ❌ Auto Ack
@KafkaListener(...)
fun consume(message: String) {
    // 메시지 수신 즉시 Ack됨
    processEvent(...)  // 실패해도 재처리 안 됨
}

// ✅ Manual Ack
@KafkaListener(...)
fun consume(message: String, ack: Acknowledgment) {
    processEvent(...)
    eventHandledRepository.save(...)
    ack.acknowledge()  // 모두 성공 후에만 Ack
}

Manual Ack 없이는:

  • 처리 실패 시 메시지 유실
  • 멱등성 보장 불가능 (EventHandled 저장 전 Ack)

5. Partition Key의 전략적 선택

OutboxEvent.create(
    partitionKey = productId.toString()
)

Partition Key 선택 이유:

같은 상품 = 같은 Partition
→ 순서 보장

다른 상품 = 다른 Partition
→ 병렬 처리

예시:

productId=100 → partition 0
  ├── LikeAdded
  ├── LikeRemoved  (순서 보장 ✅)
  └── LikeAdded

productId=200 → partition 1
  └── LikeAdded  (병렬 처리 ✅)

"순서가 중요한 단위를 Partition Key로 선택한다"

6. EventHandled vs OutboxEvent 분리의 지혜

처음엔 "중복 테이블"이라고 생각했다.

하지만:

OutboxEvent = Producer의 발행 큐
EventHandled = Consumer의 처리 기록

→ 완전히 다른 책임
→ 완전히 다른 액세스 패턴
→ 완전히 다른 라이프사이클

"유사해 보이는 테이블도 책임이 다르면 분리해야 한다"

마치며

Round 5에서 "빠르게 돌아간다"를 배웠다면,
Round 6에서 "장애에도 멈추지 않는다"를 배웠고,
Round 7에서 "느슨하게 연결하되, 안전하게 동작한다"를 배웠고,
Round 8에서는 "서비스 경계를 넘어 안전하게 전달한다" 를 배웠다.

"메시지 전달의 보장은 Producer와 Consumer가 함께 만든다" 가 이번주의 핵심 깨달음이었다.

Producer: Transactional Outbox Pattern
  → "메시지를 최소 1번 전달한다"

Consumer: Idempotent Consumer Pattern
  → "메시지를 최대 1번 처리한다"

= Exactly Once Semantics
  → "정확히 1번 처리된다"
profile
dreams of chronic and sustained passion

0개의 댓글