Outbox 패턴 직접 구현하기 — inFlight 카운터로 백프레셔 제어까지

정영범·2026년 3월 23일

토이프로젝트

목록 보기
7/11

Outbox 패턴 직접 구현하기 — inFlight 카운터로 백프레셔 제어까지

들어가기 전에

지난 글에서 왜 Outbox 패턴이 필요한지를 다뤘다. 이번 글은 실제로 어떻게 구현했는지, 구현 과정에서 어떤 선택을 했는지를 다룬다.

코드는 전부 이 프로젝트의 실제 코드다.

📦 GitHub: eventful-commerce


다시 한번 — Outbox 패턴이 해결하는 것

문제는 단순하다. DB 저장과 Kafka 발행을 동시에 원자적으로 처리할 수 없다.

@Transactional
fun createOrder(request: OrderRequest) {
    orderRepository.save(order)          // DB 커밋
    kafkaTemplate.send("order-events")   // 여기서 실패하면?
}

@Transactional이 끝난 순간 DB 커밋은 완료된다. 그 이후에 Kafka 전송이 실패하면 주문은 생겼지만 결제 서비스는 아무것도 모른다. 이게 double write problem이다.

Outbox 패턴의 핵심 아이디어는 간단하다.

Kafka로 바로 보내지 말고, 같은 트랜잭션 안에서 DB 테이블에 먼저 저장하자. Kafka 발행은 나중에 별도 스케줄러가 처리한다.


Outbox 테이블 설계

먼저 이벤트를 저장할 테이블이 필요하다.

@Entity
@Table(name = "outbox_event")
class OutboxEvent(
    val aggregateType: String,   // 어떤 도메인인지 (예: "ORDER", "PAYMENT")
    val aggregateId: UUID,       // 해당 도메인의 ID
    val eventType: String,       // 이벤트 종류 (예: "ORDER_RESERVED")
    
    @Column(columnDefinition = "text")
    val payload: String,         // 이벤트 내용 (JSON)

    @Enumerated(EnumType.STRING)
    var status: OutboxStatus = OutboxStatus.PENDING,  // PENDING / SENT / FAILED

    var retryCount: Int = 0,     // 재시도 횟수
    var lastError: String? = null,

    val createdAt: Instant = Instant.now(),
    var sentAt: Instant? = null,
) {
    @Id
    @GeneratedValue(strategy = GenerationType.UUID)
    lateinit var id: UUID
}

enum class OutboxStatus { PENDING, SENT, FAILED }

필드를 하나씩 짚어보자.

  • aggregateType / aggregateId: 어떤 도메인의 어떤 레코드에서 발생한 이벤트인지 추적할 수 있다. Kafka 파티셔닝 키로도 쓴다. 같은 주문의 이벤트가 항상 같은 파티션으로 가야 순서가 보장된다.
  • status: PENDINGSENT 또는 FAILED로 전이된다. 스케줄러는 PENDING만 긁어간다.
  • retryCount / lastError: 발행이 실패했을 때 몇 번 재시도했는지, 왜 실패했는지 추적한다. maxRetries를 초과하면 FAILED로 전이해서 무한 재시도를 막는다.

이벤트 저장 — 트랜잭션 안에서

주문을 생성할 때 Outbox 이벤트를 같은 트랜잭션에서 저장한다.

@Transactional
fun orders(ordersRequests: List<OrdersRequest>): List<String> {
    // 1. 주문 저장
    val savedOrders = ordersRepository.saveAll(orderList)

    // 2. 재고 예약 (Redis)
    // ...

    // 3. Outbox 이벤트 저장 — 같은 트랜잭션!
    val events = savedOrders.map { order ->
        val payloadJson = objectMapper.writeValueAsString(
            OrderReservedPayload(
                orderId = order.id,
                totalAmount = order.totalAmount,
                reservationId = order.reservationId!!,
                // ...
            )
        )

        OutboxEvent(
            aggregateType = "ORDER",
            aggregateId = order.id,
            eventType = "ORDER_RESERVED",
            payload = payloadJson,
            status = OutboxStatus.PENDING
        )
    }

    outboxEventService.record(events)  // outboxEventRepository.saveAll()
}

트랜잭션이 커밋되면 주문과 Outbox 이벤트가 동시에 DB에 저장된다. 트랜잭션이 롤백되면 둘 다 사라진다. 이제 주문이 DB에 있다는 건 "이 이벤트는 언젠가 반드시 Kafka로 나간다"를 보장한다.


스케줄러 — PENDING 이벤트를 Kafka로 발행

이제 PENDING 상태의 이벤트를 Kafka로 보내는 스케줄러가 필요하다. 이 부분이 구현에서 가장 신경 쓴 부분이다.

@Component
class OutboxPublisher(
    private val outboxEventRepository: OutboxEventRepository,
    private val outboxEventService: OutboxEventService,
    private val kafkaTemplate: KafkaTemplate<String, String>,
    private val objectMapper: ObjectMapper,
    @Value("\${outbox.topic}") private val topic: String
) {
    private val batchSize = 50
    private val maxRetries = 10

    private val inFlight = AtomicInteger(0)
    private val maxInFlight = 200

    @Scheduled(fixedDelayString = "200")
    @Transactional
    fun publishPending() {
        if (inFlight.get() >= maxInFlight) return  // 백프레셔 체크

        val outboxEvents = outboxEventRepository
            .findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING, PageRequest.of(0, batchSize))

        outboxEvents.forEach { event ->
            if (inFlight.incrementAndGet() > maxInFlight) {
                inFlight.decrementAndGet()
                return
            }

            val message = objectMapper.writeValueAsString(
                OutboxEventMessage(
                    eventId = event.id,
                    aggregateType = event.aggregateType,
                    aggregateId = event.aggregateId,
                    eventType = event.eventType,
                    occurredAt = event.createdAt,
                    payload = event.payload
                )
            )

            // aggregateId를 파티션 키로 사용 → 같은 주문의 이벤트는 같은 파티션
            kafkaTemplate.send(topic, event.aggregateId.toString(), message)
                .whenComplete { _, ex ->
                    try {
                        if (ex == null) outboxEventService.markAsSent(event.id)
                        else outboxEventService.markAsFailed(event.id, ex, maxRetries)
                    } finally {
                        inFlight.decrementAndGet()  // 완료되면 항상 카운트 감소
                    }
                }
        }
    }
}

코드 안에 몇 가지 설계 결정이 담겨있다. 하나씩 설명한다.


설계 결정 1 — 200ms 폴링 간격

@Scheduled(fixedDelayString = "200")

200ms마다 PENDING 이벤트를 확인한다. 왜 200ms인가?

너무 짧으면: DB 폴링 쿼리가 너무 자주 나간다. Outbox 테이블에 대한 SELECT가 초당 수십 번 발생하면 DB에 불필요한 부하가 생긴다.

너무 길면: 이벤트 발행 지연이 커진다. 주문이 생성된 후 결제 서비스가 인지하는 데까지 걸리는 시간이 길어진다.

200ms는 이 둘 사이의 균형점이다. 초당 5번 폴링하면서 최대 200ms의 발행 지연을 허용한다. 실시간성이 중요한 서비스라면 더 짧게 가져가도 된다.

fixedDelayfixedRate의 차이도 중요하다. fixedRate는 이전 실행이 끝나지 않아도 다음 실행을 시작한다. fixedDelay는 이전 실행이 완료된 후 200ms를 기다린다. 스케줄러가 겹쳐 실행되면 같은 이벤트를 두 번 처리할 수 있기 때문에 fixedDelay를 선택했다.


설계 결정 2 — batchSize 50

한 번에 50개씩 긁어온다. 왜 50인가?

Kafka 발행은 비동기(whenComplete)로 처리된다. 한 번에 너무 많이 가져오면 메모리에 수백 개의 이벤트가 대기하게 된다. 50개씩 가져오면 200ms마다 최대 50개를 처리하니까 초당 최대 250개다. 이 정도면 웬만한 트래픽은 충분히 소화한다.

트래픽이 폭발적으로 늘어나면 batchSize를 올리거나 폴링 간격을 줄이는 방식으로 튜닝할 수 있다.


설계 결정 3 — inFlight 카운터와 백프레셔

이 부분이 가장 신경 쓴 부분이다.

private val inFlight = AtomicInteger(0)
private val maxInFlight = 200

왜 필요한가?

Kafka 발행은 비동기다. kafkaTemplate.send()를 호출하면 즉시 반환되고, 실제 발행 결과는 나중에 whenComplete로 받는다. 즉, 스케줄러가 200ms마다 실행되면서 발행 완료를 기다리지 않고 계속 새 이벤트를 쌓을 수 있다.

Kafka 브로커가 느려지거나 네트워크 지연이 생기면 어떻게 될까?

1회 실행: 50개 발행 시작 (아직 완료 안 됨)
200ms 후 2회 실행: 또 50개 발행 시작 (1회 것도 아직 완료 안 됨)
200ms 후 3회 실행: 또 50개 발행 시작
...

이게 누적되면 메모리에 수천 개의 발행 대기 이벤트가 쌓인다. 최악의 경우 OOM이 발생한다.

inFlight 카운터가 이걸 막는다.

// 스케줄러 진입 시 체크
if (inFlight.get() >= maxInFlight) return  // 이미 200개가 날아가고 있으면 이번엔 패스

outboxEvents.forEach { event ->
    if (inFlight.incrementAndGet() > maxInFlight) {
        inFlight.decrementAndGet()
        return  // 루프 도중 200개 초과하면 중단
    }

    kafkaTemplate.send(...)
        .whenComplete { _, ex ->
            try {
                // 성공/실패 처리
            } finally {
                inFlight.decrementAndGet()  // 완료되면 반드시 감소
            }
        }
}

AtomicInteger를 쓰는 이유는 스레드 안전성 때문이다. whenComplete 콜백은 Kafka 내부 스레드에서 실행된다. 스케줄러 스레드와 Kafka 콜백 스레드가 동시에 카운터를 수정할 수 있기 때문에 일반 Int로는 안 된다.

finally 블록에서 decrementAndGet()을 호출하는 것도 중요하다. 성공하든 실패하든 발행이 완료되면 반드시 카운터를 줄여야 한다. 예외가 발생해도 카운터가 줄어들어야 한다. try-finally가 이를 보장한다.


설계 결정 4 — markAsFailed와 maxRetries

Kafka 발행이 실패하면 바로 FAILED로 바꾸지 않는다. retryCount를 올리고, maxRetries(10번)를 초과할 때만 FAILED로 전이한다.

@Modifying
@Query("""
    UPDATE OutboxEvent e
    SET e.retryCount = e.retryCount + 1,
        e.lastError = :lastError,
        e.status = case when (e.retryCount + 1) >= :maxRetries 
                        then :failed 
                        else e.status end
    WHERE e.id = :id
""")
fun updateFailed(id: UUID, lastError: String, maxRetries: Int, failed: OutboxStatus)

왜 이렇게 하는가?

Kafka가 일시적으로 불안정하거나 네트워크 지연이 발생했을 때, 한 번 실패했다고 FAILED로 박아버리면 이벤트가 영원히 발행되지 않는다. PENDING 상태를 유지하면 다음 폴링 주기에 자동으로 재시도된다.

10번을 초과하면 FAILED로 처리하는 이유는 무한 재시도를 막기 위해서다. 10번 실패했다면 일시적인 문제가 아닐 가능성이 높다. lastError에 에러 메시지가 남아있으니 나중에 원인을 파악하고 수동으로 재처리할 수 있다.


설계 결정 5 — aggregateId를 파티션 키로

kafkaTemplate.send(topic, event.aggregateId.toString(), message)
//                  토픽    파티션 키                    메시지

Kafka는 같은 키의 메시지를 같은 파티션으로 보낸다. 파티션 안에서는 순서가 보장된다.

주문 ID를 키로 쓰면 같은 주문의 이벤트는 항상 같은 파티션으로 간다. ORDER_RESERVEDORDER_CONFIRMED 순서가 보장된다는 뜻이다. 키 없이 라운드로빈으로 보내면 ORDER_CONFIRMEDORDER_RESERVED보다 먼저 도착할 수 있다.


Consumer 쪽 — OutboxEventMessage

스케줄러가 Kafka로 보내는 메시지 형식도 정의했다.

data class OutboxEventMessage(
    val eventId: UUID,        // 멱등성 처리에 사용
    val aggregateType: String,
    val aggregateId: UUID,
    val eventType: String,    // Consumer가 분기 처리에 사용
    val occurredAt: Instant,
    val payload: String       // 실제 이벤트 데이터 (JSON)
)

eventId가 핵심이다. Consumer는 이 값으로 중복 이벤트를 감지한다. Outbox 스케줄러가 재시도로 같은 이벤트를 두 번 발행해도, Consumer는 eventId를 보고 이미 처리된 이벤트임을 알 수 있다.

Consumer 쪽 멱등성 처리는 다음 글에서 다룬다.


전체 흐름 정리

[주문 생성 트랜잭션]
  ├── orders 테이블에 주문 저장
  └── outbox_event 테이블에 이벤트 저장 (status = PENDING)
          ↓ 트랜잭션 커밋

[OutboxPublisher 스케줄러 — 200ms마다]
  ├── PENDING 이벤트 최대 50개 조회
  ├── inFlight >= 200이면 이번 회차 스킵
  ├── 각 이벤트를 Kafka로 비동기 발행
  │     ├── 성공 → status = SENT, sentAt 기록
  │     └── 실패 → retryCount++, maxRetries 초과 시 status = FAILED
  └── 발행 완료 시 inFlight--

[결제 서비스 Kafka Consumer]
  └── OutboxEventMessage 수신 → 멱등성 체크 → 결제 생성

이 구현의 한계

솔직하게 짚고 넘어간다.

fixedDelay임에도 같은 이벤트가 중복 발행될 수 있다

fixedDelay는 메서드가 반환된 시점부터 200ms를 기다린다. 언뜻 보면 "메서드가 끝난 뒤 200ms"이니까 안전해 보인다. 그런데 함정이 있다.

kafkaTemplate.send()는 비동기다. 호출하는 순간 즉시 반환되고, 실제 발행 결과는 나중에 whenComplete 콜백으로 받는다. 즉, publishPending() 메서드는 Kafka 발행이 완료되기 전에 이미 반환된다.

t=0ms    : publishPending() 실행
           → kafka.send(event1) 호출 (비동기 — 아직 완료 안 됨)
           → kafka.send(event2) 호출 (비동기 — 아직 완료 안 됨)
           → 메서드 반환 ← 여기서 fixedDelay 타이머 시작

t=200ms  : publishPending() 다시 실행
           → event1, event2는 아직 markAsSent() 안 됨 → 여전히 PENDING
           → 같은 이벤트를 또 집어감 → 중복 발행 발생!

inFlight 카운터가 어느 정도 막아주긴 한다. 이미 200개가 날아가고 있으면 다음 스케줄링은 스킵된다. 하지만 200개 미만인 상황에서는 중복 집어가기를 완전히 막지 못한다.

결국 Consumer 쪽 멱등성 처리가 최종 방어선이 된다. 같은 이벤트가 두 번 발행되더라도 Consumer가 eventId 기반으로 중복을 감지하고 무시해야 한다. Outbox는 "이벤트를 반드시 발행한다"를 보장하고, 멱등성은 "중복 발행이 와도 한 번만 처리한다"를 보장하는 구조다. 둘이 함께 있어야 의미가 있다.

근본적으로 해결하려면 send()를 동기로 바꾸거나(get()으로 블로킹), SELECT FOR UPDATE SKIP LOCKED로 발행 중인 이벤트를 DB 수준에서 잠가야 한다. 하지만 동기 방식은 처리량이 급감하고, SKIP LOCKED는 구현 복잡도가 올라간다. 이 프로젝트에서는 Consumer 멱등성으로 최종 방어하는 방식을 선택했다.

단일 인스턴스 가정

inFlight 카운터는 인스턴스 내 메모리에만 존재한다. 인스턴스가 2개 이상이면 각자의 카운터로 동작하기 때문에 실제 in-flight 수는 maxInFlight * 인스턴스 수가 된다. 인스턴스가 늘어날수록 카운터의 백프레셔 효과가 줄어든다.

여러 인스턴스가 같은 PENDING 이벤트를 동시에 긁어가는 문제도 있다. 이건 DB 수준의 SELECT FOR UPDATE SKIP LOCKED로 해결할 수 있다.

폴링 방식의 지연

최대 200ms의 발행 지연이 있다. 실시간성이 극도로 중요한 서비스라면 폴링 대신 Change Data Capture(CDC) 방식인 Debezium을 고려해볼 수 있다. DB의 binlog를 직접 읽어서 변경 사항을 바로 Kafka로 보내는 방식이다.


마치며

Outbox 패턴을 직접 구현하고 나서 느낀 건, 단순해 보이는 아이디어 안에 생각보다 많은 디테일이 있다는 거다. 폴링 간격, 배치 크기, 백프레셔 제어, 재시도 정책 하나하나가 전부 트레이드오프를 가지고 있다.

가장 중요한 건 "이벤트는 절대 유실되지 않는다"는 보장이다. 이 구현에서 이벤트가 유실될 수 있는 케이스는 단 하나다. DB 자체가 다운되거나 데이터가 손실되는 경우다. Kafka가 불안정하거나 Consumer가 죽어도, 이벤트는 Outbox 테이블에 남아있고 언젠가 반드시 발행된다.


다음 글: 멱등성 처리 — DB Unique 제약으로 중복 이벤트 막기

profile
벨로그 좋은것만 드려요

0개의 댓글