Kafka Skeleton Code

졸용·2025년 12월 4일

참고

목록 보기
15/15

🔹 Kafka란?

고속 대용량 로그/이벤트를 토픽 단위로 쌓아두고, 여러 서비스가 비동기로 구독해서 처리하는 분산 메시징/스트리밍 플랫폼.

조금 자세히 보면:

🔸 핵심 개념

  1. Broker

    • Kafka 서버 인스턴스 하나.
    • 여러 대를 띄워서 클러스터를 구성.
  2. Topic

    • 메시지를 쌓아두는 “채널” 이름.
    • 예: order-created, payment-completed, user-signup 등.
  3. Partition

    • Topic을 물리적으로 쪼개놓은 단위.
    • 병렬 처리(Consumer 여러 개)와 확장성(스케일 아웃)을 위해 존재.
    • 각 메시지는 파티션 안에서 순서를 가짐(Offset).
  4. Producer

    • 메시지를 “보내는 쪽” 애플리케이션.
    • KafkaTemplate, REST API 서비스 등.
  5. Consumer

    • 메시지를 “받아서 처리하는 쪽”.
    • 메시지 핸들링 로직을 가짐.
  6. Consumer Group

    • 같은 그룹에 속한 Consumer들이 하나의 Topic 파티션들을 나눠서 병렬 처리.
    • 그룹 단위로 “한 메시지는 그룹 내에서 딱 한 Consumer만” 처리.
  7. Offset

    • 파티션 내에서 메시지의 위치(일련 번호).
    • Consumer는 마지막으로 읽은 Offset을 기억해, 재시작 시 이어서 처리.

🔸 Kafka 왜 사용하는가?

  • 느슨한 결합: 주문 서비스 → 결제 서비스 → 알림 서비스 간을 동기 HTTP 호출 대신, Kafka Topic으로 느슨하게 연결.
  • 비동기 처리: 주문 요청에 바로 응답하고, 뒤에서 배송/알림/포인트 적립 처리.
  • 버퍼 역할: 트래픽이 몰려도 Kafka가 일단 적재하고, Consumer가 천천히 따라잡을 수 있음.
  • 재처리 가능: 특정 Offset부터 다시 읽어서 “replay” 가능 (이벤트 소싱/로그 분석에 좋음).

🔹 outBox

outbox는 kafka 메시지 전송에 실패했을 때에 대한 처리를 위해 필요하므로, 추후에 적용 예정.
(order에서 publilsh 할 때는 필요 없을 듯 하다)

outbox 역할:

  • 실패했을 때 트래킹을 위해서 필요
  • 지금은 Kafka로 바로 보내는데, 만약 처음부터 outbox 적용한다고 하면, order에서 createAfterOrder 보낼 때 outbox 테이블로 구성해서 여기에 메시지 정보를 저장해두고, 스케줄러가 outbox 데이터를 읽어서 주기적으로 보내주는 형태로 구현.
  • outbox 적용함으로써 주기적인 retry 가능 (실패에 대한 fallback 역할)

지금은 MVP 개발에 집중하고, 고도화 기간에 실패했을 때 어떡하지?에 대한 처리로 적용하면 베스트



Spring Boot에서 Kafka 적용 예시

“주문 생성 이벤트”를 Kafka로 발행하고, 다른 서비스가 그걸 구독해서 처리하는 시나리오로 생각해보았다.

Docker Compose로 Kafka가 localhost:9092(기본 포트번호)로 떠 있다고 가정했을 때,

📌 시나리오

  • order-server (Producer) : 주문 생성 API → Kafka OrderAfterCreate 이벤트 발행
  • hub-server (Consumer) : OrderAfterCreate 구독 → 메시지에서 받아온 주문 정보 기반으로 알고리즘 적용 후 예상 소요시간 계산
  • hub-server (Producer) : Kafka HubRouteAfterCrete(주문 정보 + 예상 소요 시간) 이벤트 발행
  • delivery-server (Consumer) : Kafka HubRouteAfterCrete 구독 → 배송 생성

🔹 order-server (Producer)


🔸 order-server: build.gradle

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

🔸 order-server: application.yml

spring:
  application:
    name: order-server
  
  kafka:
    bootstrap-servers: localhost:9092

app:
  kafka:
    topic:
      order-after-create: order-after-create

🔸 order-server: (infrastructure.config) KafkaProducerConfig

package chill_logistics.order_server.infrastructure.config;

import chill_logistics.order_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, OrderAfterCreateV1> orderAfterCreateProducerFactory() {

        Map<String, Object> props = new HashMap<>();

        // Kafka Broker
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // 직렬화 설정
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // 필요시 기타 옵션 (acks, retries 등) 추가

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, OrderAfterCreateV1> orderAfterCreateKafkaTemplate() {
        return new KafkaTemplate<>(orderAfterCreateProducerFactory());
    }
}

🔸 order-server: (infrastructure.kafka.dto) OrderAfterCreateV1

package chill_logistics.order_server.infrastructure.kafka.dto;

import java.time.LocalDateTime;
import java.util.UUID;

public record OrderAfterCreateV1(
    UUID orderId,
    UUID supplierHubId,
    UUID receiverHubId,
    UUID receiverFirmId,
    String receiverFirmFullAddress,
    String receiverFirmOwnerName,
    String requestNote,
    String productName,
    int productQuantity,
    LocalDateTime orderCreatedAt
) {}

🔸 order-server: OrderAfterCreteProducer

package chill_logistics.order_server.infrastructure.kafka;

import chill_logistics.order_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderAfterCreateProducer {

    private final KafkaTemplate<String, OrderAfterCreateV1> orderAfterCreateKafkaTemplate;

    @Value("${app.kafka.topic.order-after-create}")
    private String orderAfterCreateTopic;

    /* [주문 생성 후 Kafka로 OrderAfterCreate 이벤트 발행]
     */
    public void sendOrderAfterCreate(OrderAfterCreateV1 message) {

        String key = message.orderId().toString();

        log.info("[Kafka] OrderAfterCreate 메시지 발행, topic={}, key={}, message={}",
            orderAfterCreateTopic, key, message);

        orderAfterCreateKafkaTemplate
            .send(orderAfterCreateTopic, key, message)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("[Kafka] OrderAfterCreate 메시지 전송 실패, key={}", key, ex);
                } else {
                    log.info("[Kafka] OrderAfterCreate 메시지 전송 성공, orderId={}, offset={}",
                        key, result.getRecordMetadata().offset());
                }
            });
    }
}

🔸 order-server: OrderService에서 사용 예시

package chill_logistics.order_server.application;

import chill_logistics.order_server.domain.entity.Order;
import chill_logistics.order_server.domain.repository.OrderRepository;
import chill_logistics.order_server.infrastructure.kafka.OrderAfterCreateProducer;
import chill_logistics.order_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OrderAfterCreateProducer orderAfterCreateProducer;

    @Transactional
    public UUID createOrder(CreateOrderRequest request) {

        // 1. 주문 엔티티 생성
        Order order = Order.create(
            request.supplierHubId(),
            request.receiverHubId(),
            request.receiverFirmId(),
            request.receiverFirmFullAddress(),
            request.receiverFirmOwnerName(),
            request.requestNote(),
            request.productName(),
            request.productQuantity()
        );

        // 2. 저장
        orderRepository.save(order);

        // 3. Kafka 메시지 생성
        OrderAfterCreateV1 message = new OrderAfterCreateV1(
            order.getId(),
            order.getSupplierHubId(),
            order.getReceiverHubId(),
            order.getReceiverFirmId(),
            order.getReceiverFirmFullAddress(),
            order.getReceiverFirmOwnerName(),
            order.getRequestNote(),
            order.getProductName(),
            order.getProductQuantity(),
            order.getCreatedAt()
        );

        // 4. Kafka 메시지 발행
        orderAfterCreateProducer.sendOrderAfterCreate(message);

        log.info("주문 생성 완료 + Kafka 메시지 발행 완료. orderId={}", order.getId());

        return order.getId();
    }
}



🔹 hub-server (Consumer)


🔸 hub-server: build.gradle

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

🔸 hub-server: application.yml

spring:
  application:
    name: hub-server
  
  kafka:
    bootstrap-servers: localhost:9092

🔸 hub-server: (infrastructure.config) KafkaConsumerConfig

package chill_logistics.hub_server.infrastructure.config;

import chill_logistics.hub_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, OrderAfterCreateV1> orderConsumerFactory() {

        // JSON → OrderAfterCreateV1 역직렬화를 위한 Deserializer
        JsonDeserializer<OrderAfterCreateV1> deserializer =
            new JsonDeserializer<>(OrderAfterCreateV1.class, false);

        // Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
        deserializer.addTrustedPackages(
            "chill_logistics.delivery_server.infrastructure.kafka.dto"
        );

        // Kafka Consumer 설정 값
        Map<String, Object> properties = new HashMap<>();

        // Kafka Broker 주소 (Docker Compose에서 기본적으로 localhost:9092로 띄움)
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // Consumer Group ID (같은 Group으로 묶인 Consumer들은 같은 메시지를 중복 처리하지 않음)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hub-server-group");

        // 메시지 Key 역직렬화 방식
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 메시지 Value 역직렬화 방식 (JsonDeserializer 사용)
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
        return new DefaultKafkaConsumerFactory<>(
            properties,
            new StringDeserializer(),   // Key Deserializer (String)
            deserializer                // Value Deserializer (OrderAfterCreateV1)
        );
    }

    // @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1>
    orderKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        // ConsumerFactory 설정
        factory.setConsumerFactory(orderConsumerFactory());

        return factory;
    }
}

🔸 hub-server: (infrastructure.kafka.dto) OrderAfterCreateV1

package chill_logistics.hub_server.infrastructure.kafka.dto;

import java.time.LocalDateTime;
import java.util.UUID;

public record OrderAfterCreateV1(
    UUID orderId,
    UUID supplierHubId,
    UUID receiverHubId,
    UUID receiverFirmId,
    String receiverFirmFullAddress,
    String receiverFirmOwnerName,
    String requestNote,
    String productName,
    int productQuantity,
    LocalDateTime orderCreatedAt) {

    // application 계층의 command 변환 메서드
    public OrderAfterCommandV1 toCommand() {
        return new OrderAfterCommandV1(
            orderId(),
            supplierHubId(),
            receiverHubId(),
            receiverFirmId(),
            receiverFirmFullAddress(),
            receiverFirmOwnerName(),
            requestNote(),
            productName(),
            productQuantity(),
            orderCreatedAt(),
        );
    }
}

🔸 hub-server: (application.dto) OrderAfterCommandV1

package chill_logistics.hub_server.application.dto.command;

import java.time.LocalDateTime;
import java.util.UUID;

public record OrderAfterCommandV1(
    UUID orderId,
    UUID supplierHubId,
    UUID receiverHubId,
    UUID receiverFirmId,
    String receiverFirmFullAddress,
    String receiverFirmOwnerName,
    String requestNote,
    String productName,
    int productQuantity,
    LocalDateTime orderCreatedAt,
) {}

🔸 hub-server: OrderAfterCreateListener

package chill_logistics.hub_server.infrastructure.kafka;

import chill_logistics.hub_server.application.HubCommandService;
import chill_logistics.hub_server.application.dto.command.OrderAfterCommandV1;
import chill_logistics.hub_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderAfterCreateListener {

    private final HubCommandService hubCommandService;

    @KafkaListener(
        topics = "order-after-create",
        containerFactory = "orderKafkaListenerContainerFactory"
    )
    public void listen(OrderAfterCreateV1 message) {

        log.info("Kafka 메시지 수신: {}", message);

        // DTO → Command 변환
        OrderAfterCommandV1 command = message.toCommand();

        hubCommandService.calculateExpectedDuration(command);
    }
}


🔹 hub-server (Producer)


🔸 hub-server: build.gradle

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

🔸 hub-server: application.yml

spring:
  application:
    name: hub-server
  
  kafka:
    bootstrap-servers: localhost:9092

app:
  kafka:
    topic:
      hub-route-after-create: hub-route-after-create

🔸 hub-server: (infrastructure.config) KafkaProducerConfig

package chill_logistics.hub_server.infrastructure.config;

import chill_logistics.hub_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, HubRouteAfterCreateV1> hubRouteAfterCreateProducerFactory() {

        Map<String, Object> props = new HashMap<>();

        // Kafka Broker
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // 직렬화 설정
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // 필요시 기타 옵션 (acks, retries 등) 추가 가능

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, HubRouteAfterCreateV1> hubRouteAfterCreateKafkaTemplate() {
        return new KafkaTemplate<>(hubRouteAfterCreateProducerFactory());
    }
}

🔸 hub-server: (infrastructure.kafka.dto) HubRouteAfterCreateV1

package chill_logistics.hub_server.infrastructure.kafka.dto;

import java.time.LocalDateTime;
import java.util.UUID;

public record HubRouteAfterCreateV1(
    UUID orderId,
    UUID startHubId,
    String startHubName,
    String startHubFullAddress,
    UUID endHubId,
    String endHubName,
    String endHubFullAddress,
    UUID receiverFirmId,
    String receiverFirmFullAddress,
    String receiverFirmOwnerName,
    String requestNote,
    String productName,
    int productQuantity,
    LocalDateTime orderCreatedAt,
    Integer expectedDeliveryDuration
    ) {}

🔸 hub-server: HubRouteAfterCreteProducer

package chill_logistics.hub_server.infrastructure.kafka;

import chill_logistics.hub_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class HubRouteAfterCreateProducer {

    private final KafkaTemplate<String, HubRouteAfterCreateV1> hubRouteAfterCreateKafkaTemplate;

    @Value("${app.kafka.topic.hub-route-after-create}")
    private String hubRouteAfterCreateTopic;

    /* [허브 경로 생성 후 Kafka로 HubRouteAfterCreate 이벤트 발행]
     */
    public void sendHubRouteAfterCreate(HubRouteAfterCreateV1 message) {

        String key = message.orderId().toString();

        log.info("[Kafka] HubRouteAfterCreate 메시지 발행, topic={}, key={}, message={}",
            hubRouteAfterCreateTopic, key, message);

        hubRouteAfterCreateKafkaTemplate
            .send(hubRouteAfterCreateTopic, key, message)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("[Kafka] HubRouteAfterCreate 메시지 전송 실패, key={}", key, ex);
                } else {
                    log.info("[Kafka] HubRouteAfterCreate 메시지 전송 성공, orderId={}, offset={}",
                        key, result.getRecordMetadata().offset());
                }
            });
    }
}

🔸 hub-server: HubService에서 사용 예시

package chill_logistics.hub_server.application;

import chill_logistics.hub_server.domain.entity.Hub;
import chill_logistics.hub_server.domain.repository.HubRepository;
import chill_logistics.hub_server.infrastructure.kafka.HubRouteAfterCreateProducer;
import chill_logistics.hub_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class HubService {

    private final HubRepository hubRepository;
    private final HubRouteAfterCreateProducer hubRouteAfterCreateProducer;

    @Transactional
    public calculateExpectedDuration() {

        // 1. 예상 소요 시간 계산

        // 2. 저장 (임의로 hub라고 명칭했습니다)
        hubRepository.save(hub);

        // 3. Kafka 메시지 생성
        HubRoutefterCreateV1 message = new HubRoutefterCreateV1(
            hub.getOrderId(),
            hub.getStartHubId(),
            hub.getStartHubName(),
            hub.getStartHubFullAddress(),
            hub.getEndHubId(),
            hub.getEndHubName(),
            hub.getEndHubFullAddress(),
            hub.getReceiverFirmId(),
            hub.getReceiverFirmFullAddress(),
            hub.getReceiverFirmOwnerName(),
            hub.getRequestNote(),
            hub.getProductName(),
            hub.getProductQuantity(),
            hub.getOrderCreatedAt(),
            hub.getExpectedDuration(),
        );

        // 4. Kafka 메시지 발행
        hubRouteAfterCreateProducer.sendHubRouteAfterCreate(message);

        log.info("예상 소요 시간 계산 생성 완료 + Kafka 메시지 발행 완료. orderId={}", hub.getOrderId());

        return hub.getOrderId();
    }
}


🔹 delivery-server (Consumer)


🔸 delivery-server: build.gradle

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

🔸 delivery-server: application.yml

spring:
  application:
    name: delivery-server
  
  kafka:
    bootstrap-servers: localhost:9092

🔸 delivery-server: (infrastructure.config) KafkaConsumerConfig

package chill_logistics.delivery_server.infrastructure.config;

import chill_logistics.delivery_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, HubRouteAfterCreateV1> hubConsumerFactory() {

        // JSON → HubRouteAfterCreateV1 역직렬화를 위한 Deserializer
        JsonDeserializer<HubRouteAfterCreateV1> deserializer =
            new JsonDeserializer<>(HubRouteAfterCreateV1.class, false);

        // Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
        deserializer.addTrustedPackages(
            "chill_logistics.delivery_server.infrastructure.kafka.dto"
        );

        // Kafka Consumer 설정 값
        Map<String, Object> properties = new HashMap<>();

        // Kafka Broker 주소 (Docker Compose에서 기본적으로 localhost:9092로 띄움)
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // Consumer Group ID (같은 Group으로 묶인 Consumer들은 같은 메시지를 중복 처리하지 않음)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "delivery-server-group");

        // 메시지 Key 역직렬화 방식
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 메시지 Value 역직렬화 방식 (JsonDeserializer 사용)
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
        return new DefaultKafkaConsumerFactory<>(
            properties,
            new StringDeserializer(),   // Key Deserializer (String)
            deserializer                // Value Deserializer (HubRouteAfterCreateV1)
        );
    }

    // @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, HubRouteAfterCreateV1>
    hubKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, HubRouteAfterCreateV1> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        // ConsumerFactory 설정
        factory.setConsumerFactory(hubConsumerFactory());

        return factory;
    }
}

🔸 delivery-server: (infrastructure.kafka.dto) HubRouteAfterCreateV1

package chill_logistics.delivery_server.infrastructure.kafka.dto;

import chill_logistics.delivery_server.application.dto.command.HubRouteAfterCommandV1;
import java.time.LocalDateTime;
import java.util.UUID;

public record HubRouteAfterCreateV1(
    UUID orderId,
    UUID startHubId,
    String startHubName,
    String startHubFullAddress,
    UUID endHubId,
    String endHubName,
    String endHubFullAddress,
    UUID receiverFirmId,
    String receiverFirmFullAddress,
    String receiverFirmOwnerName,
    String requestNote,
    String productName,
    int productQuantity,
    LocalDateTime orderCreatedAt,
    Integer expectedDeliveryDuration) {

    // application 계층의 command 변환 메서드
    public HubRouteAfterCommandV1 toCommand() {
        return new HubRouteAfterCommandV1(
            orderId(),
            startHubId(),
            startHubName(),
            startHubFullAddress(),
            endHubId(),
            endHubName(),
            endHubFullAddress(),
            receiverFirmId(),
            receiverFirmFullAddress(),
            receiverFirmOwnerName(),
            requestNote(),
            productName(),
            productQuantity(),
            orderCreatedAt(),
            expectedDeliveryDuration()
        );
    }
}

🔸 delivery-server: (application.dto) HubRouteAfterCommandV1

package chill_logistics.delivery_server.application.dto.command;

import java.time.LocalDateTime;
import java.util.UUID;

public record HubRouteAfterCommandV1(
    UUID orderId,
    UUID startHubId,
    String startHubName,
    String startHubFullAddress,
    UUID endHubId,
    String endHubName,
    String endHubFullAddress,
    UUID receiverFirmId,
    String receiverFirmFullAddress,
    String receiverFirmOwnerName,
    String requestNote,
    String productName,
    int productQuantity,
    LocalDateTime orderCreatedAt,
    Integer expectedDeliveryDuration
) {}

🔸 delivery-server: HubRouteAfterCreateListener

package chill_logistics.delivery_server.infrastructure.kafka;

import chill_logistics.delivery_server.application.DeliveryCommandService;
import chill_logistics.delivery_server.application.dto.command.HubRouteAfterCommandV1;
import chill_logistics.delivery_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class HubRouteAfterCreateListener {

    private final DeliveryCommandService deliveryCommandService;

    @KafkaListener(
        topics = "hub-route-after-create",
        containerFactory = "hubKafkaListenerContainerFactory"
    )
    public void listen(HubRouteAfterCreateV1 message) {

        log.info("Kafka 메시지 수신: {}", message);

        // 허브/업체 배송 담당자 배정 (임시 스텁)
        UUID hubDeliveryPersonId = assignHubDeliveryPerson(message);
        UUID firmDeliveryPersonId = assignFirmDeliveryPerson(message);

        HubRouteAfterCommandV1 command = message.toCommand();

        deliveryCommandService.createDelivery(command, hubDeliveryPersonId, firmDeliveryPersonId);
    }

    // 허브 배송 담당자 배정 (임시 버전 - 이후 배정 로직으로 교체)
    private UUID assignHubDeliveryPerson(HubRouteAfterCreateV1 message) {

        return UUID.fromString("00000000-0000-0000-0000-000000000001");
    }

    // 업체 배송 담당자 배정 (임시 버전 - 이후 배정 로직으로 교체)
    private UUID assignFirmDeliveryPerson(HubRouteAfterCreateV1 message) {

        return UUID.fromString("00000000-0000-0000-0000-000000000002"); // 임시값
    }
}
profile
꾸준한 공부만이 답이다

0개의 댓글