5/5(화) 이벤트 발행과 Outbox 패턴 구현 이해하기 (코드래빗 리뷰 반영)

dev_joo·2026년 5월 5일

다시 학습하기

일단 도메인 이벤트를 발행하고 이 이벤트에 발행에 대해 Outbox로 영속 저장을 해서
이벤트 처리가 중간에 실패하더라도 서비스에서 이벤트가 전달될수 있어 정합성을 보장한다는 Outbox테이블 사용의 이유와

호출이 일어나면 이벤트가 만들어지고, polling으로 kafka가 가져간다는 큰 흐름까지는 설명할 수 있었지만,

예시를 보고 만들어서 왜 이 속성과 설정을 사용했는지까지는 설명할 자신이 없었다.

그래서 다시 한 번 원리와 세부 구현을 살펴보면서 이에 대해 확실하게 학습하고 넘어가기로 했다.

새로 만든 파일별 역할

[도메인 이벤트 표현]
├ BaseEvent.java               "이벤트의 공통 모양"
└ UserCreatedEvent.java        "회원가입했다 라는 사실"

[Outbox 저장]
├ OutboxEvent.java             "DB에 보관할 이벤트 (테이블 한 행)"
├ OutboxRepository.java        "Outbox 다루는 인터페이스"
├ OutboxRepositoryAdapter.java "위 인터페이스 구현"
└ JpaOutboxRepository.java     "Spring 이 자동 만들어주는 SQL 실행기"

[Outbox 흐름 연결]
├ UserApplicationService       "회원가입하면서 이벤트 발행만 알림"
├ OutboxEventHandler           "이벤트 받아서 Outbox에 저장"
└ OutboxPoller                 "주기적으로 Outbox 읽어서 Kafka로"

[설정]
├ application.yml              "Kafka 접속 정보 + 직렬화 설정"
├ SchedulerConfig              "스케줄링 켜기"
└ V3__create_outbox_table.sql  "p_outbox 테이블 만들기"

목록화 하고 보니 이벤트 발행과 OutBox패턴을 만들기 위해 무려 11개의 파일을 새로 만들었다.(BaseEvent는 컨벤션)

빅픽쳐 그리기 (큰 흐름 이해)

회원가입이 일어났을 때 각 파일들의 동작

[T=0] 사용자가 POST /api/v1/users 요청
   │
   │  Body: {"loginId":"alice", ...}
   │
   ▼
┌─────────────────────────────────────────────┐
│ UserController.signup()                      │
│   userApplicationService.signup(command)     │
└─────────────────┬───────────────────────────┘
                  │
                  ▼
┌─────────────────────────────────────────────┐
│ UserApplicationService.signup()              │
│   @Transactional() 시작                      │
│                                              │
│   ① User user = User.create(...)             │
│   ② userRepository.save(user)                │
│      ↓                                       │
│      INSERT INTO p_users                     │
│                                              │
│   ③ nicknameHistoryRepository.save(...)      │
│      INSERT INTO p_user_nickname_histories   │
│                                              │
│   ④ eventPublisher.publishEvent(             │
│         UserCreatedEvent.from(saved)         │
│      )                                       │
│      ↓ Spring 이벤트 시스템에 던짐                │
│      ↓ "이런 일이 일어났음" 알림                  │
└─────────────────┬───────────────────────────┘
                  │
                  ▼
        "Spring"BEFORE_COMMIT 시점에
        이벤트 구독자를 호출함
                  │
                  ▼
┌──────────────────────────────────────────────┐
│ OutboxEventHandler.onDomainEvent(event)      │
│ @TransactionalEventListener(BEFORE_COMMIT)   │
│                                              │
│   ⑤ String payload = JSON 직렬화              │
│   ⑥ String topic = "user-created"            │
│   ⑦ OutboxEvent outbox = OutboxEvent.of(...) │
│   ⑧ outboxRepository.save(outbox)            │
│      ↓                                       │
│      INSERT INTO p_outbox                    │
      (published=false)                       │
└─────────────────┬────────────────────────────┘
                  │
                  ▼
┌─────────────────────────────────────────────┐
│ UserApplicationService.signup() 메서드 끝      │
│                                              │
│   @Transactional 커밋                         │
│   = p_users + p_user_nickname_histories      │
│     + p_outbox 한 번에 저장됨                   │
│                                              │
│   201 응답                                    │
└─────────────────────────────────────────────┘

   여기까지 회원가입 응답 끝
   사용자는 가입 성공 받음
   Outbox 에는 published=false 인 행이 있음
[T=5초 후] OutboxPoller 가 자동 실행 (스케줄러)
   │
   ▼
┌─────────────────────────────────────────────┐
│ OutboxPoller.publishPendingEvents()          │
│ @Scheduled(fixedDelay = 5000)                │
│ @Transactional 시작                           │
│                                              │
│   ⑨ events = outboxRepository                │
│              .findUnpublished(...)           │
│      ↓                                       │
│      SELECT * FROM p_outbox                  │
│      WHERE published = false                 │
│      LIMIT 100                               │
│                                              │
│   ⑩ for 각 event:                            │
│      kafkaTemplate.send(topic, key, payload) │
│      ↓                                       │
│      Kafka 브로커로 메시지 발행                   │
│                                              │
│   ⑪ event.markPublished()                    │
│      = published=true 상태로 변경               │
│                                              │
│   @Transactional 커밋                         │
│   = p_outbox UPDATE published=true           │
└─────────────────────────────────────────────┘

   여기까지 Kafka 에 메시지 도착 완료
[T=∞ 어느 시점] 다른 서비스 (: 알림 서비스) 가
                Kafka 에서 메시지 가져감
   │
   ▼
   Notification Service 가 user-created 토픽 구독
   메시지 받아서 환영 메일 발송 등에 사용

ApplicationEventPublisher

eventPublisher.publishEvent(UserCreatedEvent.from(saved));

이 코드는 Spring이 내부적으로 이벤트 시스템을 동작하게 한다.
그 내부 이벤트 시스템 동작은 다음과 같다.

eventPublisher.publishEvent(myEvent)
              │
              ▼
   Spring 내부: "이 이벤트의 구독자 누구지?"
   ├ 발견: OutboxEventHandler.onDomainEvent(BaseEvent event)(BaseEventUserCreatedEvent 의 부모라 매칭됨)
   │
   ▼
   onDomainEvent(myEvent) 자동 호출
  1. 모든 @TransactionalEventListener / @EventListener 메서드 검색
  2. 파라미터 타입이 UserCreatedEvent (또는 부모 BaseEvent)으로 일치하는 메서드를 찾아내 호출한다.

서비스 코드에서 사용

import org.springframework.context.ApplicationEventPublisher;

@Service
public class UserApplicationService {
	private final ApplicationEventPublisher eventPublisher;
    
    @Transactional
	public SignupResponse signup(SignupCommand command) {
    // ...
    try {
        User saved = userRepository.save(user);
        nicknameHistoryRepository.save(nicknameHistory);
        
        eventPublisher.publishEvent(UserCreatedEvent.from(saved));   // ← 추가
        
        return SignupResponse.from(saved);
    } catch (...) { ... }
}
}

EventPublisher가 호출하는 EventListener (OutboxEventHandler)

TransactionalEventListener

일반적인 @EventListenereventPublisher.publishEvent() 시점에 즉시 실행된다.
하지만 @TransactionalEventListener 는 트랜잭션 단계에 맞춰 실행된다:

  • Spring의 트랜잭션 이벤트는 Transaction이 진행중이 아니면 fallbackExecution()을 명시적으로 활성화하지 않는 한 처리되지 않는다.
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
// 이 메서드는 트랜잭션 이벤트를 듣는다, 트랜잭션 커밋 직전에 실행
public void onDomainEvent(BaseEvent event) {
    // ...
    outboxRepository.save(outbox);
}

이벤트가 처리될 시점을 지정하는 EventListener의 phase 속성은 다음 4가지가 있다.

  • BEFORE_COMMIT ← 위 코드
  • AFTER_COMMIT (기본값)
  • AFTER_ROLLBACK
  • AFTER_COMPLETION

User 만 저장되고 Outbox 못 저장되는 일이 없도록, 원자성을 보장하기 위해 BEFORE_COMMIT 을 사용했다.

OutboxEventHandler에서 사용

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxEventHandler {

    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
    public void onDomainEvent(BaseEvent event) {
        try {
            String payload = objectMapper.writeValueAsString(event);
            String topic = resolveTopic(event);
            
            // BaseEvent 의 domainId 는 String. OutboxEvent 의 aggregateId 는 UUID. 
            UUID aggregateId = UUID.fromString(event.getDomainId());

            OutboxEvent outbox = OutboxEvent.of(
                    event.getDomainType(),
                    aggregateId,
                    event.getEventType(),
                    topic,
                    payload
            );
            outboxRepository.save(outbox);

            log.debug("Outbox 저장 완료: ...");
        } catch (JsonProcessingException e) {
            log.error("이벤트 직렬화 실패: ...");
            throw new IllegalStateException("이벤트 직렬화 실패: " + event.getEventType(), e);
        }
    }
    // 이벤트 타입 → 토픽 매핑.
	// Java 14+ 의 switch 표현식 (콜론 : 대신 화살표 ->)
    private String resolveTopic(BaseEvent event) {
        return switch (event.getEventType()) {
            case "UserCreatedEvent" -> UserCreatedEvent.TOPIC;
            default -> throw new IllegalArgumentException(
                    "Unsupported event type: " + event.getEventType());
        };
    }
}

직렬화

ObjectMapper

JSON 직렬화 / 역직렬화 도구.
Spring Boot 자동 등록한 ObjectMapper 주입. JavaTimeModule이 자동 포함된다.

String payload = objectMapper.writeValueAsString(event);


Java 객체 → JSON 문자열:
UserCreatedEvent { eventId: "...", payload: { ... } }"{\"eventId\":\"...\",\"payload\":{...}}"

Jackson 라이브러리 - Instant 객체 직렬화 방식

→ Jackson 라이브러리 사용할 때 WRITE_DATES_AS_TIMESTAMPS 옵션 사용 여부에 따라 달라짐:

1. Epoch Timestamp(숫자) 형식

1970년 1월 1일 0시부터 경과된 초(second)

"occurredAt": 1777339234.090765000

사용시 장점 :

  • 소수점까지 포함되어 기계적인 정밀도가 높음.

2. ISO 8601 형식

  • Z : Zulu Time'의 약자로, 오프셋이 +00:00인 UTC(협정 세계시)를 의미
"occurredAt": "2026-04-25T03:34:56Z""Z" 마커 = UTC

사용시 장점 :

  • 사람이 바로 읽을 수 있음 (로그 가독성 디버깅이 쉬움)
  • IETF(Internet Engineering Task Force)에서 인터넷상의 날짜와 시간 표기법 규정 (RFC 3339 문서)
  • Google이나 Microsoft의 REST 가이드라인에서는 RFC 3339(ISO 8601) 문자열 형식을 표준으로 권장

Epoch Timestamp 사용했을 때 발생할 수 있는 문제:

Java에서 Instant 객체는 내부적으로 두 개의 필드로 시간을 저장한다.

  • Seconds (초): long 타입 (64비트)
  • Nanos (나노초): int 타입 (32비트)
  • 96비트의 데이터를 사용하여 시간을 표현

JavaScript의 Number는 64비트 부동 소수점 방식을 사용한다.
그래서 Epoch Timestamp 방식처럼 숫자형태로 시간을 저장하게 되면
JavaScript가 파싱할 때, 마지막 자리를 반올림하면서 데이터가 손실될 수 있다.

프론트엔드나 JS 기반의 미들웨어(Node.js 등)를 거치고 나면 값이 근사치로 바뀌면서 데이터의 정합성이 지켜지지 않을 위험이 있다.


OutboxPoller

이벤트의 주기적 발행을 이곳에서 담당한다.

@Scheduled(스케줄 등록)

@Scheduled로 주기적으로 실행됨을 명시하면 Spring Boot 가 application.yml 의 설정 보고 자동으로 등록한다.

    @Scheduled(fixedDelayString = "${outbox.poll-interval-ms:5000}") // 설정값 없으면 기본 5000ms (5초)
    @Transactional // 트랜잭션 안에서 엔티티 변경 → JPA dirty checking으로 자동 UPDATE
    public void publishPendingEvents() {
    

@EnableScheduling 필요 — SchedulerConfig

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class SchedulerConfig {
}

스케줄 주기

fixedDelay :

  • 이전 실행이 "끝난 후" 지정 시간(ms) 뒤 다시 실행
  • 절대 겹치지 않음
  • 실행 시간이 길어지면 전체 주기도 같이 늘어남
  • 안정성이 우선되는 작업 (DB 작업, 배치, Outbox)
[실행 1: 1초]  [대기 5초]  [실행 2: 1초]  [대기 5초]  ...
0────1                    6───7

flexDelay :

  • 시작 시점 기준으로 지정 시간(ms) 마다 실행
  • 이전 작업이 안 끝나도 다음 실행 시작 가능
  • 겹칠 수 있음 (중복 실행 위험)
  • 일정한 주기가 중요한 작업 (모니터링, 메트릭 수집 등)
[실행 1: 1초]
0────1

[실행 2: 1초]
     5────6

[실행 3: 1초]
          10───11

처리해야 할 이벤트 Outbox에서 가져오기 - findUnpublished()

 // 한 번의 폴링 사이클에서 처리할 최대 이벤트 개수
 private static final int BATCH_SIZE = 100;
 ...
 @Scheduled(...)
 @Transactional
 public void publishPendingEvents() {
        
        List<OutboxEvent> events = outboxRepository.findUnpublished(
                PageRequest.of(0, BATCH_SIZE) // 내부적으로 LIMIT 100 OFFSET 0
        );

        // 처리할 이벤트 없으면 바로 종료 (불필요한 로직 방지)
        if (events.isEmpty()) {
            return;
        }

		// for 루프 + try-catch 패턴 : 한 이벤트 실패해도 다음 이벤트 계속 처리됨
        for (OutboxEvent event : events) {
            try {
                // Kafka로 이벤트 발행
                publishToKafka(event);
                event.markPublished(); // 내부적으로 published=true, publishedAt=now 설정
            } catch (Exception e) {
                event.recordFailure(e.getMessage());
            }
        }
    }

kafka 발행

private void publishToKafka(OutboxEvent event)
throws ExecutionException, InterruptedException {
	kafkaTemplate.send(
    	event.getTopic(),                       // Kafka topic
        event.getAggregateId().toString(),      // 메시지 key (파티셔닝 기준)
        event.getPayload()                      // JSON payload
    ).get(); // 비동기 send → .get()으로 동기화 (성공/실패 결과를 즉시 받기 위해)
}

OutboxPoller 작성

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxPoller {

    private static final int BATCH_SIZE = 100;

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(fixedDelayString = "${outbox.poll-interval-ms:5000}")
    @Transactional
    public void publishPendingEvents() {
        List<OutboxEvent> events = outboxRepository.findUnpublished(
                PageRequest.of(0, BATCH_SIZE)
        );

        if (events.isEmpty()) {
            return;
        }

        for (OutboxEvent event : events) {
            try {
                publishToKafka(event);
                event.markPublished();
            } catch (Exception e) {
                event.recordFailure(e.getMessage());
            }
        }
    }

    private void publishToKafka(OutboxEvent event)
            throws ExecutionException, InterruptedException {
        kafkaTemplate.send(event.getTopic(),
                           event.getAggregateId().toString(),
                           event.getPayload()).get();
    }
}

application.yml

spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        spring.json.add.type.headers: false
        acks: all
        retries: 3
        enable.idempotence: true

outbox:
  poll-interval-ms: 5000 # 위 OutboxPoller 의 fixedDelay 값

(producer) key-serializer / value-serializer

Java 객체 → 바이트로 어떻게 변환할지 설정
StringSerializer: 문자열을 UTF-8 바이트로.
우리 케이스:

key: aggregateId (UUID String) -> StringSerializer
value: 우리가 이미 JSON 으로 만든 String -> StringSerializer

acks

Producer 가 메시지 보냈을 때 Kafka 가 언제 ACK 보낼지 설정

  • 0 ACK 안 보냄 (가장 빠름, 손실 가능)
  • 1 리더 브로커가 받으면 ACK
  • all 모든 ISR 에 복제된 후 ACK

kafka발행 멱등성 설정 (At-least-once → Exactly-once Producer)

kafka 발행 재시도 과정에서 같은 메시지 두 번 발행되는것을 막기 위한 설정

Producer 가 메시지 보냄 → ACK 못 받음 → 재시도 → (같은 메시지 두 번 도달 가능)

enable.idempotence: true

  • Producer 가 각 메시지에 시퀀스 번호 부여
  • Kafka 가 시퀀스 추적
  • 중복 발행 자동 무시

코드래빗 코드리뷰 반영

1. JPQL 변수 파라미터 매핑 문제

@Query("""
        ...
        WHERE e.failureCount >= :threshold
        """)
List<OutboxEvent> findFailedExceedingThreshold(int threshold, Pageable pageable);

:threshold = JPQL 안의 변수. 메서드의 threshold 파라미터 값이 들어감.
Spring 이 어떻게 매핑?:
메서드 파라미터 이름 (threshold) ↔ JPQL 파라미터 이름 (:threshold)

컴파일러는 기본적으로 메서드 파라미터 이름을 .class 파일에 저장하지 않는다.

public void findFailedExceedingThreshold(int threshold, Pageable pageable);

// 컴파일 후 .class 파일에서 실제 보이는 것
public void findFailedExceedingThreshold(int arg0, Pageable arg1);
//                                            ↑ 이름이 사라짐!
# parameters 플래그가 있어야 .class 에 threshold 라는 이름이 남음.
javac -parameters MyClass.java

build.gradle 스크립트가 플래그 추가를 자동으로 처리한다.

빌드 스크립트에

compileJava { options.compilerArgs << '-parameters' }

를 추가하면 자동으로 빌드시 parameters 플래그를 추가해준다.

Spring Boot가 처리했다굴

plugins {
    id 'org.springframework.boot' version '3.5.13'
}

SpringBoot 플러그인이 내부적으로 compileJava { options.compilerArgs << '-parameters' } 추가한다.

하지만 누군가 build.gradle 손보다가 Spring Boot 플러그인 옵션을 변경해서 -parameters 플래그가 설정되지 않는다면,
JPQL과 파라미터 변수 매핑 과정에서 RepositoryInitializationException이 발생할 수 있다.

+ 역사적 배경:

과거에는 .class 파일 용량을 줄이기 위해 디버깅에 꼭 필요하지 않은 정보(로컬 변수 이름 등)는 삭제하는 것이 기본값이었음.
Java 8부터 -parameters 옵션이 도입
Spring Boot 3.x(Java 17 이상 권장)부터는 이 옵션이 선택이 아닌 필수에 가깝게 취급되기 시작함.

해결 @Param() 사용하기

import org.springframework.data.repository.query.Param;

List<OutboxEvent> findFailedExceedingThreshold(
    @Param("threshold") int threshold,    // ← 명시적 매핑
    Pageable pageable
);

@Param("threshold") = "이 파라미터 변수가 JPQL 의 :threshold 와 매핑됨"
→ 컴파일러 플래그 없어도 작동 보장.
이에 더해 파라미터 변수를 JPQL에서 사용한다는 의도가 코드에 명시되는 장점도 있다.

참고문서

hibernate - jpa.named-parameters

Spring Data JPA - Java API for HQL and JPQL


2. kafka 응답 지연 시 스케줄러 점유 문제

@Transactional   ← DB 트랜잭션 열림
public void publishPendingEvents() {
    List<OutboxEvent> events = outboxRepository.findUnpublished(...);
    
    for (OutboxEvent event : events) {
        try {
            publishToKafka(event);
            event.markPublished();
        } catch (Exception e) {
            event.recordFailure(e.getMessage());
        }
    }
}

private void publishToKafka(OutboxEvent event) throws ... {
    kafkaTemplate.send(topic, key, payload).get();   ← 무한 대기 가능
}

kafkaTemplate.send().get() 이 broker 응답 지연 시 스케줄러 점유.
DB 트랜잭션도 열린 상태 유지.
producer 타임아웃 설정 없으면 unbounded blocking 가능.

CompletableFuture.get() 의 동작

CompletableFuture<RecordMetadata> future = kafkaTemplate.send(...);
future.get();   // 결과(성공/실패) 올 때까지 대기

Kafka 의 기본 타임아웃 설정

request.timeout.ms (개별 요청 타임아웃) - 30초

프로듀서가 브로커에게 데이터를 보낸 후, 브로커로부터 응답(Ack)을 기다리는 최대 시간
단일 네트워크 요청 하나에만 해당한다.

delivery.timeout.ms (전체 전송 타임아웃) - 2분

send() 메서드를 호출한 시점부터 성공/실패가 확정될 때까지의 전체 시간에 대한 제한
재시도(retries)를 포함한 데이터 전송의 전체 과정을 포함한다.

네트워크 지연 시나리오

  • 브로커는 살아있는데 응답이 느린 경우 (TCP 패킷 유실, 네트워크 혼잡 등)

1) 스케줄러 스레드 점유

Spring 의 기본 스케줄러는 싱글 스레드이다.

Thread: scheduling-1
  ┌─────────────────────────┐
  │ publishPendingEvents()  │
  │   send().get()          │ ← 2분간 대기
  └─────────────────────────┘
  
  다음 5초 후 폴링 → 못 함 (스레드 점유 중)
  10초 후 폴링 → 못 함
  ... 2분 후에야 다음 폴링

2) DB 트랜잭션 열린 상태

@Transactional   ← 트랜잭션 시작
public void publishPendingEvents() {
    // SELECT ... FROM p_outbox  ← 트랜잭션 안에서
    
    send().get();2분 대기
    
    // 이 동안 트랜잭션 계속 열려있음
}

영향:
DB 의 connection pool 한 자리 점유하고 있음 -> connection leak 위험
HA(High Availability) 구성을 위해 여러 서버에서 스케줄러를 가동할 경우 다른 트랜잭션이 같은 row 만지려면 대기 (락 경합)
-> 전체적인 이벤트 처리량이 떨어진다.

3) Outbox 쌓이는 문제

[T=0] Kafka 잠시 응답 안 함
[T=0:01] send().get() 대기 중
[T=0:30] 첫 폴링 사이클 끝나야 하는데 안 끝남
[T=2:00] 첫 사이클 timeout 으로 실패
[T=2:05] 다음 폴링 시작
[T=2:06] Kafka 또 안 됨 → 또 대기
[T=4:00] 두 번째 사이클 끝
...

Kafka가 다시 빨라져도, 이미 Outbox 테이블에는 수십만 건의 데이터가 쌓여있을 수 있다.
이 데이터를 다 처리해서 '실시간' 상태로 돌아오려면 며칠이 걸릴 수도 있다.

해결 :kafkaTemplate.send()에 timeout 명시, Producer 타임아웃 설정을 짧게

private static final long SEND_TIMEOUT_SECONDS = 10;

private void publishToKafka(OutboxEvent event)
        throws ExecutionException, InterruptedException, TimeoutException {
    
    kafkaTemplate.send(topic, key, payload)
            .get(SEND_TIMEOUT_SECONDS, TimeUnit.SECONDS);   ← 변경
}

application.yml

타임아웃 시간 권장 공식:
delivery.timeout.ms >= request.timeout.ms * (retries + 1)

spring:
  kafka:
    producer:
      properties:
        request.timeout.ms: 5000      # 단일 요청 5초
        delivery.timeout.ms: 10000    # 전체 발행 10초 (retries 포함)

명심하기 :

외부 시스템 호출은 항상 timeout 가져야 함 (HTTP, DB, Kafka 등)


3. 새 이벤트 추가 시도메인 작업 롤백됨

1) 기존 방식 : BaseEvent 전체 구독 + resolveTopic switch

  • 미지원 이벤트 발생 시 IllegalArgumentException
  • BEFORE_COMMIT 이라 도메인 트랜잭션까지 롤백 위험
package com.pagely.userservice.infrastructure.messaging.outbox;

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxEventHandler {

    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
    public void onDomainEvent(BaseEvent event) {
        try {
            String payload = objectMapper.writeValueAsString(event);
            String topic = resolveTopic(event);
            UUID aggregateId = UUID.fromString(event.getDomainId());

            OutboxEvent outbox = OutboxEvent.of(
                    event.getDomainType(),
                    aggregateId,
                    event.getEventType(),
                    topic,
                    payload
            );
            outboxRepository.save(outbox);

            log.debug("Outbox 저장 완료: eventType={}, domainId={}, outboxId={}",
                    event.getEventType(), event.getDomainId(), outbox.getId());
        } catch (JsonProcessingException e) {
            log.error("이벤트 직렬화 실패: eventType={}", event.getEventType(), e);
            throw new IllegalStateException("이벤트 직렬화 실패: " + event.getEventType(), e);
        }
    }

    /**
     * 이벤트 타입에 따른 Kafka 토픽 결정.
     */
    private String resolveTopic(BaseEvent event) {
        return switch (event.getEventType()) {
            case "UserCreatedEvent" -> UserCreatedEvent.TOPIC;
            // 이곳에서 문제 발생:
            default -> throw new IllegalArgumentException(
                    "Unsupported event type: " + event.getEventType());
        };
    }
}

Spring 이벤트 시스템의 타입 매칭

eventPublisher.publishEvent(myEvent) 가 호출됨 ->

  1. Spring 이 등록된 모든 @TransactionalEventListener / @EventListener 메서드 검색
  2. 각 메서드의 파라미터 타입 확인
  3. myEvent 의 타입이 메서드 파라미터 타입에 할당 가능한지 (instanceof) 검사
  4. 매칭되는 메서드만 호출
  5. 매칭 안 되는 이벤트는 "그냥 무시" (예외도 안 남, 로그도 안 남)

이를 활용해 새 이벤트 추가시, switch문을 갱신 하지 않아도 되도록 리펙토링한다.

2) 새 방식: 구체 이벤트 타입별 메서드 + 공통 헬퍼 방식

  • onUserCreated(UserCreatedEvent) ← 타입 일치할 때만 호출
  • saveOutbox(BaseEvent, String topic) ← 공통 직렬화/저장 로직

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxEventHandler {

    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
    public void onUserCreated(UserCreatedEvent event) {
        saveOutbox(event, UserCreatedEvent.TOPIC);
    }

    /**
     * 공통 처리 로직 — 직렬화 + Outbox 저장.
     */
    private void saveOutbox(BaseEvent event, String topic) {
        try {
            String payload = objectMapper.writeValueAsString(event);
            UUID aggregateId = UUID.fromString(event.getDomainId());

            OutboxEvent outbox = OutboxEvent.of(
                    event.getDomainType(),
                    aggregateId,
                    event.getEventType(),
                    topic,
                    payload
            );
            outboxRepository.save(outbox);

            log.debug("Outbox 저장 완료: eventType={}, domainId={}, outboxId={}",
                    event.getEventType(), event.getDomainId(), outbox.getId());
        } catch (JsonProcessingException e) {
            log.error("이벤트 직렬화 실패: eventType={}", event.getEventType(), e);
            throw new IllegalStateException("이벤트 직렬화 실패: " + event.getEventType(), e);
        }
    }
}

효과:

  • 미지원 이벤트는 Spring 의 타입 매칭으로 자동 무시
  • 도메인 트랜잭션 영향 X
  • 새 이벤트 추가 시 새 메서드만 (switch 관리 불필요)
  • 컴파일러가 타입 검증

4. 인스턴스 다수 운영 시 Kafka 중복 발행 문제

  • published=false 조회 후 발행 과정에서, lock 없음 ->
    분산 환경에서 두 서버 인스턴스가 동시에 같은 행을 조회할 경우, 동일한 메시지가 Kafka에 두 번 발행되는 중복 발행 문제가 발생
  • FOR UPDATE SKIP LOCKED을 적용하여 해결할 수 있는데, 이를 구현하기 위해서는 native 쿼리가 필요, 이는 DB에 의존성을 깊게 가져가는 행위인 것 같아 MSA 특성에 맞지 않다고 판단

  • ShedLock 같은 라이브러리 적용하여 해결하는 방법이 있었음. 테이블 1개를 더 추가해야하고 라이브러리 의존성이 추가로 생기기 때문에 좀더 고려하 필요한 부분이라 생각되어 다중 인스턴스의 경우 모든 기능 구현 이후, 모니터링에 따라 적용되어야 할 부분인 것 같아 나중에 적용해보려고 함.

profile
풀스택 연습생. 끈기있는 삽질로 무대에서 화려하게 데뷔할 예정 ❤️🔥

0개의 댓글