4/6(월) 삭제 구현과 이벤트 처리

dev_joo·2026년 4월 6일

공통 모듈 - 이벤트 처리

공통모듈 이벤트 처리 흐름

[Application Service]
    Events.trigger(correlationId, domainType, domainId, eventType, payload)
         │
         │ Spring ApplicationEventPublisher로 내부 이벤트 발행
         ▼
[OutboxEventListener.recordOutbox()]@EventListener
    outbox 테이블에 INSERT (PENDING 상태)
    같은 트랜잭션 → DB 커밋 시 함께 저장
         │
         │ 트랜잭션 커밋 완료 후
         ▼
[OutboxEventListener.publish()]@TransactionalEventListener(AFTER_COMMIT)
    Kafka로 실제 전송
    성공 → outbox.complete() (PROCESSED)
    실패 → outbox.fail() (FAILED)
         │
         │ 전송 실패 시 (10초마다)[OutboxRelayScheduler.resendFailedMessages()]
    PENDING/FAILED + retryCount < 3 인 것 재전송
    3회 초과 → DLT로 격리

Event

package org.iimsa.common.event;

public class Events {
    private static KafkaTemplate<String, Object> kafkaTemplate;
    private static ApplicationEventPublisher eventPublisher;

    @Autowired
    public void init(KafkaTemplate<String, Object> kafkaTemplate, ApplicationEventPublisher eventPublisher) {
        Events.kafkaTemplate = kafkaTemplate;
        Events.eventPublisher = eventPublisher;
    }

    public static void trigger(String correlationId, String domainType, String domainId, String eventType,
                               Object payload) {
        if (kafkaTemplate != null && eventPublisher != null) {
            eventPublisher.publishEvent(new OutboxEvent(correlationId, domainType, domainId, eventType, payload));
        }
    }
}

trigger() 메서드

Events.trigger(
    correlationId, // 이 이벤트의 고유 ID (중복 방지용) → UUID 추천
    domainType,    // 어떤 도메인인지 ("USER", "ORDER" 등) → 로깅/추적용
    domainId,      // 도메인 식별자 → Kafka 파티션 키로 사용됨 (userId.toString())
    eventType,     // 토픽명 ("user.deleted" 등) → 튜터님이 말씀하신 것
    payload        // 실제 데이터 (UserDeletedPayload)
);

이벤트 처리 (각 서비스)

멱등성 처리

@IdempotentConsumer

Consumer 쪽에서 같은 메시지가 두 번 오면 중복 처리를 막는 어노테이션이다.

Outbox가 헤더에 message_id를 심어서 Kafka로 전송
         │
         ▼
Consumer가 메시지 수신
         │
         ▼
@IdempotentConsumer → InboxAdvice가 AOP로 가로챔
    message_id 헤더 추출
    Inbox 테이블에 INSERT 시도
    이미 있으면(중복) → 처리 건너뜀
    없으면 → 정상 처리 진행

카프카 설정

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-ui:                              # 토픽에 메시지 쌓이는지 눈으로 확인용
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8989:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

application.yml

spring:
  kafka:
      bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
      producer:
        key-serializer: ${KAFKA_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
        value-serializer: ${KAFKA_VALUE_SERIALIZER:org.apache.kafka.common.serialization.JsonSerializer}
      consumer:
        group-id: ${KAFKA_CONSUMER_GROUP_ID:user-service}
        key-deserializer: ${KAFKA_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
        value-deserializer: ${KAFKA_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
        auto-offset-reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}

# kafka
topics:
  user:
    approved: ${TOPIC_USER_APPROVED:user.approved}
    deleted: ${TOPIC_USER_DELETED:user.deleted}

이벤트 발행 (송신)

위의 공통모듈을 사용한다.

EventPayload

이벤트가 발생했을 때 다른 서비스로 보내줄 정보를 담는다.

public record UserDeletedPayload(
        UUID userId,
        String name,
        Integer deliverySequence, // 퇴사시 배송 순번(허브 배송 담당자, 다른 담당자라면 null),
        Role role,
        LocalDateTime deletedAt,
        String deletedBy
) {
    public static UserDeletedPayload from(User user) {
        Integer deliverySequence = user.getDeliveryManager() == null
                ? null
                : user.getDeliveryManager().getSequence();
        Role role = user.getRole() == null ? user.getRequestedRole() : user.getRole();
        return new UserDeletedPayload(
                user.getId(),
                user.getUsername(),
                deliverySequence,
                role,
                user.getDeletedAt(),
                user.getDeletedBy()
        );
    }
}

여태껏 조회에서 DTO를 만들었듯이 받는 부분이 어떤 부분이던, 모두 포괄하는 내용을 담도록 해 공통으로 사용하기로 했다.
(근데, 과연 이 방식이 좋은걸까??)

UserEventProducerImpl

도메인 계층에 정의된 UserEventProducer인터페이스에 맞게 실제로 각 이벤트에 맞게 이벤트 트리거를 호출하는 부분이다.

@Component
@RequiredArgsConstructor
@EnableConfigurationProperties(UserTopicProperties.class)
public class UserEventProducerImpl implements UserEventProducer {

    private final UserTopicProperties properties;

    @Override
    public void approved(User user) {
        // 추후 구현
    }

    @Override
    public void deleted(User user) {
        Events.trigger(
                getTraceId(),                     // correlationId
                "USER",                           // domainType
                user.getId().toString(),          // domainId (파티션 키)
                properties.deleted(),             // eventType = 토픽명
                UserDeletedPayload.from(user)     // payload
        );
    }

    @Override
    public void updated(User user) {
        // 나중 구현
    }

    private String getTraceId() {
        String traceId = MDC.get("traceId");
        return StringUtils.hasText(traceId) ? traceId : UUID.randomUUID().toString();
    }
}

카프카

// 공통 모듈에서 설정됨
@Configuration
public class KafkaConfig {
    @Bean
    public CommonErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
        // 실패한 메시지를 {원래토픽}.DLT 로 전송하는 리커버러 생성
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);

        // 에러 핸들러 등록 (yml에 정의한 back-off 설정을 자동으로 적용)
        return new DefaultErrorHandler(recoverer);
    }
}

// 각 서비스의 설정 (application.yml)
topics:
  user:
    approved: user.approved.v1
    deleted: user.deleted.v1
    updated: user.updated.v1


// 각 서비스에서 설정을 사용 (Kafka 설정값 바인딩 DTO)
package org.iimsa.userservice.infrastructure.kafka;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "topics.user")
public record UserTopicProperties(
        String approved,
        String deleted,
        String updated
) {
}

// 공통 모듈 trigger() 를 통해 호출 (OutBox 패턴)
ProducerRecord<String, Object> record = new ProducerRecord<>(
    outbox.getEventType(),   // topic
    outbox.getDomainId(),    // key
    outbox.getPayload()      // value
);
kafkaTemplate.send(record)

이벤트 수신

주문

주문자

@Transactional
@IdempotentConsumer("user-deleted-order")
@KafkaListener(
    topics = "${topics.user.deleted:user.deleted}",
    groupId = "${spring.kafka.consumer.group-id:order-service}"
)

업체

업체 담당자

@Transactional
@IdempotentConsumer("user-deleted-company")
@KafkaListener(
    topics = "${topics.user.deleted:user.deleted}",
    groupId = "${spring.kafka.consumer.group-id:company-service}"
)

허브

허브 담당자

@Transactional
@IdempotentConsumer("user-deleted-hub")
@KafkaListener(
    topics = "${topics.user.deleted:user.deleted}",
    groupId = "${spring.kafka.consumer.group-id:hub-service}"
)

배송

업체배송 담당자,
허브 배송 담당자

@Slf4j
@Component
@RequiredArgsConstructor
public class UserEventListenerImpl {

    private final ObjectMapper objectMapper;

    @Transactional
    @IdempotentConsumer("user-deleted-delivery")
    @KafkaListener(
            topics = "${topics.user.deleted:user.deleted}",
            groupId = "${spring.kafka.consumer.group-id:delivery-service}"
    )
    public void onUserDeleted(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            UserDeletedPayload payload = objectMapper.readValue(
                    record.value(), UserDeletedPayload.class
            );

            // 내 서비스와 관련 없는 role이면 그냥 넘김
            if (!isMyRole(payload.role())) {
                ack.acknowledge();
                return;
            }

            log.info("[user.deleted] 수신 - userId={}, role={}", 
                    payload.userId(), payload.role());

            // TODO: 비즈니스 로직은 나중에 구현
            // deliveryService.handleUserDeleted(payload);

            ack.acknowledge();

        } catch (Exception e) {
            log.error("[user.deleted] 처리 실패 - offset={}", record.offset(), e);
            throw new IllegalArgumentException("역직렬화 실패", e);
        }
    }

    @KafkaListener(
            topics = "${topics.user.deleted:user.deleted}.DLT",
            groupId = "${spring.kafka.consumer.group-id:delivery-service-group}"
    )
    public void onDlt(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.error("[user.deleted.DLT] 최종 실패 - offset={}", record.offset());
        ack.acknowledge();
    }

    private boolean isMyRole(String role) {
        return "HUB_DELIVERY_MANAGER".equals(role)
                || "COMPANY_DELIVERY_MANAGER".equals(role);
    }
}
profile
풀스택 연습생. 끈기있는 삽질로 무대에서 화려하게 데뷔할 예정 ❤️🔥

0개의 댓글