[Application Service]
Events.trigger(correlationId, domainType, domainId, eventType, payload)
│
│ Spring ApplicationEventPublisher로 내부 이벤트 발행
▼
[OutboxEventListener.recordOutbox()] ← @EventListener
outbox 테이블에 INSERT (PENDING 상태)
같은 트랜잭션 → DB 커밋 시 함께 저장
│
│ 트랜잭션 커밋 완료 후
▼
[OutboxEventListener.publish()] ← @TransactionalEventListener(AFTER_COMMIT)
Kafka로 실제 전송
성공 → outbox.complete() (PROCESSED)
실패 → outbox.fail() (FAILED)
│
│ 전송 실패 시 (10초마다)
▼
[OutboxRelayScheduler.resendFailedMessages()]
PENDING/FAILED + retryCount < 3 인 것 재전송
3회 초과 → DLT로 격리
package org.iimsa.common.event;
public class Events {
private static KafkaTemplate<String, Object> kafkaTemplate;
private static ApplicationEventPublisher eventPublisher;
@Autowired
public void init(KafkaTemplate<String, Object> kafkaTemplate, ApplicationEventPublisher eventPublisher) {
Events.kafkaTemplate = kafkaTemplate;
Events.eventPublisher = eventPublisher;
}
public static void trigger(String correlationId, String domainType, String domainId, String eventType,
Object payload) {
if (kafkaTemplate != null && eventPublisher != null) {
eventPublisher.publishEvent(new OutboxEvent(correlationId, domainType, domainId, eventType, payload));
}
}
}
Events.trigger(
correlationId, // 이 이벤트의 고유 ID (중복 방지용) → UUID 추천
domainType, // 어떤 도메인인지 ("USER", "ORDER" 등) → 로깅/추적용
domainId, // 도메인 식별자 → Kafka 파티션 키로 사용됨 (userId.toString())
eventType, // 토픽명 ("user.deleted" 등) → 튜터님이 말씀하신 것
payload // 실제 데이터 (UserDeletedPayload)
);
Consumer 쪽에서 같은 메시지가 두 번 오면 중복 처리를 막는 어노테이션이다.
Outbox가 헤더에 message_id를 심어서 Kafka로 전송
│
▼
Consumer가 메시지 수신
│
▼
@IdempotentConsumer → InboxAdvice가 AOP로 가로챔
message_id 헤더 추출
Inbox 테이블에 INSERT 시도
이미 있으면(중복) → 처리 건너뜀
없으면 → 정상 처리 진행
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-ui: # 토픽에 메시지 쌓이는지 눈으로 확인용
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8989:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: ${KAFKA_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
value-serializer: ${KAFKA_VALUE_SERIALIZER:org.apache.kafka.common.serialization.JsonSerializer}
consumer:
group-id: ${KAFKA_CONSUMER_GROUP_ID:user-service}
key-deserializer: ${KAFKA_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value-deserializer: ${KAFKA_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
auto-offset-reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
# kafka
topics:
user:
approved: ${TOPIC_USER_APPROVED:user.approved}
deleted: ${TOPIC_USER_DELETED:user.deleted}
위의 공통모듈을 사용한다.
이벤트가 발생했을 때 다른 서비스로 보내줄 정보를 담는다.
public record UserDeletedPayload(
UUID userId,
String name,
Integer deliverySequence, // 퇴사시 배송 순번(허브 배송 담당자, 다른 담당자라면 null),
Role role,
LocalDateTime deletedAt,
String deletedBy
) {
public static UserDeletedPayload from(User user) {
Integer deliverySequence = user.getDeliveryManager() == null
? null
: user.getDeliveryManager().getSequence();
Role role = user.getRole() == null ? user.getRequestedRole() : user.getRole();
return new UserDeletedPayload(
user.getId(),
user.getUsername(),
deliverySequence,
role,
user.getDeletedAt(),
user.getDeletedBy()
);
}
}
여태껏 조회에서 DTO를 만들었듯이 받는 부분이 어떤 부분이던, 모두 포괄하는 내용을 담도록 해 공통으로 사용하기로 했다.
(근데, 과연 이 방식이 좋은걸까??)
도메인 계층에 정의된 UserEventProducer인터페이스에 맞게 실제로 각 이벤트에 맞게 이벤트 트리거를 호출하는 부분이다.
@Component
@RequiredArgsConstructor
@EnableConfigurationProperties(UserTopicProperties.class)
public class UserEventProducerImpl implements UserEventProducer {
private final UserTopicProperties properties;
@Override
public void approved(User user) {
// 추후 구현
}
@Override
public void deleted(User user) {
Events.trigger(
getTraceId(), // correlationId
"USER", // domainType
user.getId().toString(), // domainId (파티션 키)
properties.deleted(), // eventType = 토픽명
UserDeletedPayload.from(user) // payload
);
}
@Override
public void updated(User user) {
// 나중 구현
}
private String getTraceId() {
String traceId = MDC.get("traceId");
return StringUtils.hasText(traceId) ? traceId : UUID.randomUUID().toString();
}
}
// 공통 모듈에서 설정됨
@Configuration
public class KafkaConfig {
@Bean
public CommonErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
// 실패한 메시지를 {원래토픽}.DLT 로 전송하는 리커버러 생성
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// 에러 핸들러 등록 (yml에 정의한 back-off 설정을 자동으로 적용)
return new DefaultErrorHandler(recoverer);
}
}
// 각 서비스의 설정 (application.yml)
topics:
user:
approved: user.approved.v1
deleted: user.deleted.v1
updated: user.updated.v1
// 각 서비스에서 설정을 사용 (Kafka 설정값 바인딩 DTO)
package org.iimsa.userservice.infrastructure.kafka;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "topics.user")
public record UserTopicProperties(
String approved,
String deleted,
String updated
) {
}
// 공통 모듈 trigger() 를 통해 호출 (OutBox 패턴)
ProducerRecord<String, Object> record = new ProducerRecord<>(
outbox.getEventType(), // topic
outbox.getDomainId(), // key
outbox.getPayload() // value
);
kafkaTemplate.send(record)
주문자
@Transactional
@IdempotentConsumer("user-deleted-order")
@KafkaListener(
topics = "${topics.user.deleted:user.deleted}",
groupId = "${spring.kafka.consumer.group-id:order-service}"
)
업체 담당자
@Transactional
@IdempotentConsumer("user-deleted-company")
@KafkaListener(
topics = "${topics.user.deleted:user.deleted}",
groupId = "${spring.kafka.consumer.group-id:company-service}"
)
허브 담당자
@Transactional
@IdempotentConsumer("user-deleted-hub")
@KafkaListener(
topics = "${topics.user.deleted:user.deleted}",
groupId = "${spring.kafka.consumer.group-id:hub-service}"
)
업체배송 담당자,
허브 배송 담당자
@Slf4j
@Component
@RequiredArgsConstructor
public class UserEventListenerImpl {
private final ObjectMapper objectMapper;
@Transactional
@IdempotentConsumer("user-deleted-delivery")
@KafkaListener(
topics = "${topics.user.deleted:user.deleted}",
groupId = "${spring.kafka.consumer.group-id:delivery-service}"
)
public void onUserDeleted(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
UserDeletedPayload payload = objectMapper.readValue(
record.value(), UserDeletedPayload.class
);
// 내 서비스와 관련 없는 role이면 그냥 넘김
if (!isMyRole(payload.role())) {
ack.acknowledge();
return;
}
log.info("[user.deleted] 수신 - userId={}, role={}",
payload.userId(), payload.role());
// TODO: 비즈니스 로직은 나중에 구현
// deliveryService.handleUserDeleted(payload);
ack.acknowledge();
} catch (Exception e) {
log.error("[user.deleted] 처리 실패 - offset={}", record.offset(), e);
throw new IllegalArgumentException("역직렬화 실패", e);
}
}
@KafkaListener(
topics = "${topics.user.deleted:user.deleted}.DLT",
groupId = "${spring.kafka.consumer.group-id:delivery-service-group}"
)
public void onDlt(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.error("[user.deleted.DLT] 최종 실패 - offset={}", record.offset());
ack.acknowledge();
}
private boolean isMyRole(String role) {
return "HUB_DELIVERY_MANAGER".equals(role)
|| "COMPANY_DELIVERY_MANAGER".equals(role);
}
}