일단 도메인 이벤트를 발행하고 이 이벤트에 발행에 대해 Outbox로 영속 저장을 해서
이벤트 처리가 중간에 실패하더라도 서비스에서 이벤트가 전달될수 있어 정합성을 보장한다는 Outbox테이블 사용의 이유와
호출이 일어나면 이벤트가 만들어지고, polling으로 kafka가 가져간다는 큰 흐름까지는 설명할 수 있었지만,
예시를 보고 만들어서 왜 이 속성과 설정을 사용했는지까지는 설명할 자신이 없었다.
그래서 다시 한 번 원리와 세부 구현을 살펴보면서 이에 대해 확실하게 학습하고 넘어가기로 했다.
[도메인 이벤트 표현]
├ BaseEvent.java "이벤트의 공통 모양"
└ UserCreatedEvent.java "회원가입했다 라는 사실"
[Outbox 저장]
├ OutboxEvent.java "DB에 보관할 이벤트 (테이블 한 행)"
├ OutboxRepository.java "Outbox 다루는 인터페이스"
├ OutboxRepositoryAdapter.java "위 인터페이스 구현"
└ JpaOutboxRepository.java "Spring 이 자동 만들어주는 SQL 실행기"
[Outbox 흐름 연결]
├ UserApplicationService "회원가입하면서 이벤트 발행만 알림"
├ OutboxEventHandler "이벤트 받아서 Outbox에 저장"
└ OutboxPoller "주기적으로 Outbox 읽어서 Kafka로"
[설정]
├ application.yml "Kafka 접속 정보 + 직렬화 설정"
├ SchedulerConfig "스케줄링 켜기"
└ V3__create_outbox_table.sql "p_outbox 테이블 만들기"
목록화 하고 보니 이벤트 발행과 OutBox패턴을 만들기 위해 무려 11개의 파일을 새로 만들었다.(BaseEvent는 컨벤션)
[T=0초] 사용자가 POST /api/v1/users 요청
│
│ Body: {"loginId":"alice", ...}
│
▼
┌─────────────────────────────────────────────┐
│ UserController.signup() │
│ userApplicationService.signup(command) │
└─────────────────┬───────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ UserApplicationService.signup() │
│ @Transactional() 시작 │
│ │
│ ① User user = User.create(...) │
│ ② userRepository.save(user) │
│ ↓ │
│ INSERT INTO p_users │
│ │
│ ③ nicknameHistoryRepository.save(...) │
│ INSERT INTO p_user_nickname_histories │
│ │
│ ④ eventPublisher.publishEvent( │
│ UserCreatedEvent.from(saved) │
│ ) │
│ ↓ Spring 이벤트 시스템에 던짐 │
│ ↓ "이런 일이 일어났음" 알림 │
└─────────────────┬───────────────────────────┘
│
▼
"Spring"이 BEFORE_COMMIT 시점에
이벤트 구독자를 호출함
│
▼
┌──────────────────────────────────────────────┐
│ OutboxEventHandler.onDomainEvent(event) │
│ @TransactionalEventListener(BEFORE_COMMIT) │
│ │
│ ⑤ String payload = JSON 직렬화 │
│ ⑥ String topic = "user-created" │
│ ⑦ OutboxEvent outbox = OutboxEvent.of(...) │
│ ⑧ outboxRepository.save(outbox) │
│ ↓ │
│ INSERT INTO p_outbox │
│ (published=false) │
└─────────────────┬────────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ UserApplicationService.signup() 메서드 끝 │
│ │
│ @Transactional 커밋 │
│ = p_users + p_user_nickname_histories │
│ + p_outbox 한 번에 저장됨 │
│ │
│ 201 응답 │
└─────────────────────────────────────────────┘
여기까지 회원가입 응답 끝
사용자는 가입 성공 받음
Outbox 에는 published=false 인 행이 있음
[T=5초 후] OutboxPoller 가 자동 실행 (스케줄러)
│
▼
┌─────────────────────────────────────────────┐
│ OutboxPoller.publishPendingEvents() │
│ @Scheduled(fixedDelay = 5000) │
│ @Transactional 시작 │
│ │
│ ⑨ events = outboxRepository │
│ .findUnpublished(...) │
│ ↓ │
│ SELECT * FROM p_outbox │
│ WHERE published = false │
│ LIMIT 100 │
│ │
│ ⑩ for 각 event: │
│ kafkaTemplate.send(topic, key, payload) │
│ ↓ │
│ Kafka 브로커로 메시지 발행 │
│ │
│ ⑪ event.markPublished() │
│ = published=true 상태로 변경 │
│ │
│ @Transactional 커밋 │
│ = p_outbox UPDATE published=true │
└─────────────────────────────────────────────┘
여기까지 Kafka 에 메시지 도착 완료
[T=∞ 어느 시점] 다른 서비스 (예: 알림 서비스) 가
Kafka 에서 메시지 가져감
│
▼
Notification Service 가 user-created 토픽 구독
메시지 받아서 환영 메일 발송 등에 사용
eventPublisher.publishEvent(UserCreatedEvent.from(saved));
이 코드는 Spring이 내부적으로 이벤트 시스템을 동작하게 한다.
그 내부 이벤트 시스템 동작은 다음과 같다.
eventPublisher.publishEvent(myEvent)
│
▼
Spring 내부: "이 이벤트의 구독자 누구지?"
├ 발견: OutboxEventHandler.onDomainEvent(BaseEvent event)
│ (BaseEvent 가 UserCreatedEvent 의 부모라 매칭됨)
│
▼
onDomainEvent(myEvent) 자동 호출
@TransactionalEventListener / @EventListener 메서드 검색UserCreatedEvent (또는 부모 BaseEvent)으로 일치하는 메서드를 찾아내 호출한다.import org.springframework.context.ApplicationEventPublisher;
@Service
public class UserApplicationService {
private final ApplicationEventPublisher eventPublisher;
@Transactional
public SignupResponse signup(SignupCommand command) {
// ...
try {
User saved = userRepository.save(user);
nicknameHistoryRepository.save(nicknameHistory);
eventPublisher.publishEvent(UserCreatedEvent.from(saved)); // ← 추가
return SignupResponse.from(saved);
} catch (...) { ... }
}
}
일반적인 @EventListener 는 eventPublisher.publishEvent() 시점에 즉시 실행된다.
하지만 @TransactionalEventListener 는 트랜잭션 단계에 맞춰 실행된다:
fallbackExecution()을 명시적으로 활성화하지 않는 한 처리되지 않는다.@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
// 이 메서드는 트랜잭션 이벤트를 듣는다, 트랜잭션 커밋 직전에 실행
public void onDomainEvent(BaseEvent event) {
// ...
outboxRepository.save(outbox);
}
이벤트가 처리될 시점을 지정하는 EventListener의 phase 속성은 다음 4가지가 있다.
BEFORE_COMMIT ← 위 코드User 만 저장되고 Outbox 못 저장되는 일이 없도록, 원자성을 보장하기 위해 BEFORE_COMMIT 을 사용했다.
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxEventHandler {
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void onDomainEvent(BaseEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
String topic = resolveTopic(event);
// BaseEvent 의 domainId 는 String. OutboxEvent 의 aggregateId 는 UUID.
UUID aggregateId = UUID.fromString(event.getDomainId());
OutboxEvent outbox = OutboxEvent.of(
event.getDomainType(),
aggregateId,
event.getEventType(),
topic,
payload
);
outboxRepository.save(outbox);
log.debug("Outbox 저장 완료: ...");
} catch (JsonProcessingException e) {
log.error("이벤트 직렬화 실패: ...");
throw new IllegalStateException("이벤트 직렬화 실패: " + event.getEventType(), e);
}
}
// 이벤트 타입 → 토픽 매핑.
// Java 14+ 의 switch 표현식 (콜론 : 대신 화살표 ->)
private String resolveTopic(BaseEvent event) {
return switch (event.getEventType()) {
case "UserCreatedEvent" -> UserCreatedEvent.TOPIC;
default -> throw new IllegalArgumentException(
"Unsupported event type: " + event.getEventType());
};
}
}
JSON 직렬화 / 역직렬화 도구.
Spring Boot 자동 등록한 ObjectMapper 주입. JavaTimeModule이 자동 포함된다.
String payload = objectMapper.writeValueAsString(event);
Java 객체 → JSON 문자열:
UserCreatedEvent { eventId: "...", payload: { ... } }
↓
"{\"eventId\":\"...\",\"payload\":{...}}"
→ Jackson 라이브러리 사용할 때 WRITE_DATES_AS_TIMESTAMPS 옵션 사용 여부에 따라 달라짐:
1970년 1월 1일 0시부터 경과된 초(second)
"occurredAt": 1777339234.090765000
사용시 장점 :
Z : Zulu Time'의 약자로, 오프셋이 +00:00인 UTC(협정 세계시)를 의미"occurredAt": "2026-04-25T03:34:56Z"
↑ "Z" 마커 = UTC
사용시 장점 :
RFC 3339 문서)RFC 3339(ISO 8601) 문자열 형식을 표준으로 권장Java에서 Instant 객체는 내부적으로 두 개의 필드로 시간을 저장한다.
long 타입 (64비트)int 타입 (32비트)JavaScript의 Number는 64비트 부동 소수점 방식을 사용한다.
그래서 Epoch Timestamp 방식처럼 숫자형태로 시간을 저장하게 되면
JavaScript가 파싱할 때, 마지막 자리를 반올림하면서 데이터가 손실될 수 있다.
프론트엔드나 JS 기반의 미들웨어(Node.js 등)를 거치고 나면 값이 근사치로 바뀌면서 데이터의 정합성이 지켜지지 않을 위험이 있다.
이벤트의 주기적 발행을 이곳에서 담당한다.
@Scheduled로 주기적으로 실행됨을 명시하면 Spring Boot 가 application.yml 의 설정 보고 자동으로 등록한다.
@Scheduled(fixedDelayString = "${outbox.poll-interval-ms:5000}") // 설정값 없으면 기본 5000ms (5초)
@Transactional // 트랜잭션 안에서 엔티티 변경 → JPA dirty checking으로 자동 UPDATE
public void publishPendingEvents() {
@EnableScheduling 필요 — SchedulerConfigimport org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class SchedulerConfig {
}
fixedDelay :[실행 1: 1초] [대기 5초] [실행 2: 1초] [대기 5초] ...
0────1 6───7
[실행 1: 1초]
0────1
[실행 2: 1초]
5────6
[실행 3: 1초]
10───11
// 한 번의 폴링 사이클에서 처리할 최대 이벤트 개수
private static final int BATCH_SIZE = 100;
...
@Scheduled(...)
@Transactional
public void publishPendingEvents() {
List<OutboxEvent> events = outboxRepository.findUnpublished(
PageRequest.of(0, BATCH_SIZE) // 내부적으로 LIMIT 100 OFFSET 0
);
// 처리할 이벤트 없으면 바로 종료 (불필요한 로직 방지)
if (events.isEmpty()) {
return;
}
// for 루프 + try-catch 패턴 : 한 이벤트 실패해도 다음 이벤트 계속 처리됨
for (OutboxEvent event : events) {
try {
// Kafka로 이벤트 발행
publishToKafka(event);
event.markPublished(); // 내부적으로 published=true, publishedAt=now 설정
} catch (Exception e) {
event.recordFailure(e.getMessage());
}
}
}
private void publishToKafka(OutboxEvent event)
throws ExecutionException, InterruptedException {
kafkaTemplate.send(
event.getTopic(), // Kafka topic
event.getAggregateId().toString(), // 메시지 key (파티셔닝 기준)
event.getPayload() // JSON payload
).get(); // 비동기 send → .get()으로 동기화 (성공/실패 결과를 즉시 받기 위해)
}
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxPoller {
private static final int BATCH_SIZE = 100;
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedDelayString = "${outbox.poll-interval-ms:5000}")
@Transactional
public void publishPendingEvents() {
List<OutboxEvent> events = outboxRepository.findUnpublished(
PageRequest.of(0, BATCH_SIZE)
);
if (events.isEmpty()) {
return;
}
for (OutboxEvent event : events) {
try {
publishToKafka(event);
event.markPublished();
} catch (Exception e) {
event.recordFailure(e.getMessage());
}
}
}
private void publishToKafka(OutboxEvent event)
throws ExecutionException, InterruptedException {
kafkaTemplate.send(event.getTopic(),
event.getAggregateId().toString(),
event.getPayload()).get();
}
}
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
spring.json.add.type.headers: false
acks: all
retries: 3
enable.idempotence: true
outbox:
poll-interval-ms: 5000 # 위 OutboxPoller 의 fixedDelay 값
Java 객체 → 바이트로 어떻게 변환할지 설정
StringSerializer: 문자열을 UTF-8 바이트로.
우리 케이스:
key: aggregateId (UUID String) -> StringSerializer
value: 우리가 이미 JSON 으로 만든 String -> StringSerializer
Producer 가 메시지 보냈을 때 Kafka 가 언제 ACK 보낼지 설정
0 ACK 안 보냄 (가장 빠름, 손실 가능)1 리더 브로커가 받으면 ACKall 모든 ISR 에 복제된 후 ACKkafka 발행 재시도 과정에서 같은 메시지 두 번 발행되는것을 막기 위한 설정
Producer 가 메시지 보냄 → ACK 못 받음 → 재시도 → (같은 메시지 두 번 도달 가능)
enable.idempotence: true

@Query("""
...
WHERE e.failureCount >= :threshold
""")
List<OutboxEvent> findFailedExceedingThreshold(int threshold, Pageable pageable);
:threshold = JPQL 안의 변수. 메서드의 threshold 파라미터 값이 들어감.
Spring 이 어떻게 매핑?:
메서드 파라미터 이름 (threshold) ↔ JPQL 파라미터 이름 (:threshold)
public void findFailedExceedingThreshold(int threshold, Pageable pageable);
// 컴파일 후 .class 파일에서 실제 보이는 것
public void findFailedExceedingThreshold(int arg0, Pageable arg1);
// ↑ 이름이 사라짐!
# parameters 플래그가 있어야 .class 에 threshold 라는 이름이 남음.
javac -parameters MyClass.java
빌드 스크립트에
compileJava { options.compilerArgs << '-parameters' }
를 추가하면 자동으로 빌드시 parameters 플래그를 추가해준다.
plugins {
id 'org.springframework.boot' version '3.5.13'
}
SpringBoot 플러그인이 내부적으로 compileJava { options.compilerArgs << '-parameters' } 추가한다.
하지만 누군가 build.gradle 손보다가 Spring Boot 플러그인 옵션을 변경해서 -parameters 플래그가 설정되지 않는다면,
JPQL과 파라미터 변수 매핑 과정에서 RepositoryInitializationException이 발생할 수 있다.
과거에는 .class 파일 용량을 줄이기 위해 디버깅에 꼭 필요하지 않은 정보(로컬 변수 이름 등)는 삭제하는 것이 기본값이었음.
Java 8부터 -parameters 옵션이 도입
Spring Boot 3.x(Java 17 이상 권장)부터는 이 옵션이 선택이 아닌 필수에 가깝게 취급되기 시작함.
import org.springframework.data.repository.query.Param;
List<OutboxEvent> findFailedExceedingThreshold(
@Param("threshold") int threshold, // ← 명시적 매핑
Pageable pageable
);
@Param("threshold") = "이 파라미터 변수가 JPQL 의 :threshold 와 매핑됨"
→ 컴파일러 플래그 없어도 작동 보장.
이에 더해 파라미터 변수를 JPQL에서 사용한다는 의도가 코드에 명시되는 장점도 있다.
hibernate - jpa.named-parameters
Spring Data JPA - Java API for HQL and JPQL
@Transactional ← DB 트랜잭션 열림
public void publishPendingEvents() {
List<OutboxEvent> events = outboxRepository.findUnpublished(...);
for (OutboxEvent event : events) {
try {
publishToKafka(event);
event.markPublished();
} catch (Exception e) {
event.recordFailure(e.getMessage());
}
}
}
private void publishToKafka(OutboxEvent event) throws ... {
kafkaTemplate.send(topic, key, payload).get(); ← 무한 대기 가능
}
kafkaTemplate.send().get() 이 broker 응답 지연 시 스케줄러 점유.
DB 트랜잭션도 열린 상태 유지.
producer 타임아웃 설정 없으면 unbounded blocking 가능.
CompletableFuture<RecordMetadata> future = kafkaTemplate.send(...);
future.get(); // 결과(성공/실패) 올 때까지 대기
프로듀서가 브로커에게 데이터를 보낸 후, 브로커로부터 응답(Ack)을 기다리는 최대 시간
단일 네트워크 요청 하나에만 해당한다.
send() 메서드를 호출한 시점부터 성공/실패가 확정될 때까지의 전체 시간에 대한 제한
재시도(retries)를 포함한 데이터 전송의 전체 과정을 포함한다.
Spring 의 기본 스케줄러는 싱글 스레드이다.
Thread: scheduling-1
┌─────────────────────────┐
│ publishPendingEvents() │
│ send().get() │ ← 2분간 대기
└─────────────────────────┘
다음 5초 후 폴링 → 못 함 (스레드 점유 중)
10초 후 폴링 → 못 함
... 2분 후에야 다음 폴링
@Transactional ← 트랜잭션 시작
public void publishPendingEvents() {
// SELECT ... FROM p_outbox ← 트랜잭션 안에서
send().get(); ← 2분 대기
// 이 동안 트랜잭션 계속 열려있음
}
영향:
DB 의 connection pool 한 자리 점유하고 있음 -> connection leak 위험
HA(High Availability) 구성을 위해 여러 서버에서 스케줄러를 가동할 경우 다른 트랜잭션이 같은 row 만지려면 대기 (락 경합)
-> 전체적인 이벤트 처리량이 떨어진다.
[T=0] Kafka 잠시 응답 안 함
[T=0:01] send().get() 대기 중
[T=0:30] 첫 폴링 사이클 끝나야 하는데 안 끝남
[T=2:00] 첫 사이클 timeout 으로 실패
[T=2:05] 다음 폴링 시작
[T=2:06] Kafka 또 안 됨 → 또 대기
[T=4:00] 두 번째 사이클 끝
...
Kafka가 다시 빨라져도, 이미 Outbox 테이블에는 수십만 건의 데이터가 쌓여있을 수 있다.
이 데이터를 다 처리해서 '실시간' 상태로 돌아오려면 며칠이 걸릴 수도 있다.
private static final long SEND_TIMEOUT_SECONDS = 10;
private void publishToKafka(OutboxEvent event)
throws ExecutionException, InterruptedException, TimeoutException {
kafkaTemplate.send(topic, key, payload)
.get(SEND_TIMEOUT_SECONDS, TimeUnit.SECONDS); ← 변경
}
타임아웃 시간 권장 공식:
delivery.timeout.ms >= request.timeout.ms * (retries + 1)
spring:
kafka:
producer:
properties:
request.timeout.ms: 5000 # 단일 요청 5초
delivery.timeout.ms: 10000 # 전체 발행 10초 (retries 포함)
외부 시스템 호출은 항상 timeout 가져야 함 (HTTP, DB, Kafka 등)
package com.pagely.userservice.infrastructure.messaging.outbox;
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxEventHandler {
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void onDomainEvent(BaseEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
String topic = resolveTopic(event);
UUID aggregateId = UUID.fromString(event.getDomainId());
OutboxEvent outbox = OutboxEvent.of(
event.getDomainType(),
aggregateId,
event.getEventType(),
topic,
payload
);
outboxRepository.save(outbox);
log.debug("Outbox 저장 완료: eventType={}, domainId={}, outboxId={}",
event.getEventType(), event.getDomainId(), outbox.getId());
} catch (JsonProcessingException e) {
log.error("이벤트 직렬화 실패: eventType={}", event.getEventType(), e);
throw new IllegalStateException("이벤트 직렬화 실패: " + event.getEventType(), e);
}
}
/**
* 이벤트 타입에 따른 Kafka 토픽 결정.
*/
private String resolveTopic(BaseEvent event) {
return switch (event.getEventType()) {
case "UserCreatedEvent" -> UserCreatedEvent.TOPIC;
// 이곳에서 문제 발생:
default -> throw new IllegalArgumentException(
"Unsupported event type: " + event.getEventType());
};
}
}
eventPublisher.publishEvent(myEvent) 가 호출됨 ->@TransactionalEventListener / @EventListener 메서드 검색이를 활용해 새 이벤트 추가시, switch문을 갱신 하지 않아도 되도록 리펙토링한다.
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxEventHandler {
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void onUserCreated(UserCreatedEvent event) {
saveOutbox(event, UserCreatedEvent.TOPIC);
}
/**
* 공통 처리 로직 — 직렬화 + Outbox 저장.
*/
private void saveOutbox(BaseEvent event, String topic) {
try {
String payload = objectMapper.writeValueAsString(event);
UUID aggregateId = UUID.fromString(event.getDomainId());
OutboxEvent outbox = OutboxEvent.of(
event.getDomainType(),
aggregateId,
event.getEventType(),
topic,
payload
);
outboxRepository.save(outbox);
log.debug("Outbox 저장 완료: eventType={}, domainId={}, outboxId={}",
event.getEventType(), event.getDomainId(), outbox.getId());
} catch (JsonProcessingException e) {
log.error("이벤트 직렬화 실패: eventType={}", event.getEventType(), e);
throw new IllegalStateException("이벤트 직렬화 실패: " + event.getEventType(), e);
}
}
}
효과:
published=false 조회 후 발행 과정에서, lock 없음 ->FOR UPDATE SKIP LOCKED을 적용하여 해결할 수 있는데, 이를 구현하기 위해서는 native 쿼리가 필요, 이는 DB에 의존성을 깊게 가져가는 행위인 것 같아 MSA 특성에 맞지 않다고 판단
ShedLock 같은 라이브러리 적용하여 해결하는 방법이 있었음. 테이블 1개를 더 추가해야하고 라이브러리 의존성이 추가로 생기기 때문에 좀더 고려하 필요한 부분이라 생각되어 다중 인스턴스의 경우 모든 기능 구현 이후, 모니터링에 따라 적용되어야 할 부분인 것 같아 나중에 적용해보려고 함.