delivery-server가 Consumer인 상황의 적용 연습이다.
// Kafka
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
spring:
kafka:
bootstrap-servers: localhost:9092
package chill_logistics.delivery_server.infrastructure.config;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, OrderAfterCreateV1> orderConsumerFactory() {
// JSON → OrderAfterCreateV1 역직렬화를 위한 Deserializer
JsonDeserializer<OrderAfterCreateV1> deserializer =
new JsonDeserializer<>(OrderAfterCreateV1.class, false);
// Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
deserializer.addTrustedPackages(
"chill_logistics.delivery_server.infrastructure.kafka.dto"
);
// Kafka Consumer 설정 값
Map<String, Object> properties = new HashMap<>();
// Kafka Broker 주소 (Docker Compose에서 기본적으로 localhost:9092로 띄움)
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Consumer Group ID (같은 Group으로 묶인 Consumer들은 같은 메시지를 중복 처리하지 않음)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "delivery-server-group");
// 메시지 Key 역직렬화 방식
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 Value 역직렬화 방식 (JsonDeserializer 사용)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
return new DefaultKafkaConsumerFactory<>(
properties,
new StringDeserializer(), // Key Deserializer (String)
deserializer // Value Deserializer (OrderAfterCreateV1)
);
}
// @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1>
orderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// ConsumerFactory 설정
factory.setConsumerFactory(orderConsumerFactory());
return factory;
}
}
package chill_logistics.delivery_server.infrastructure.kafka.dto;
import java.time.LocalDateTime;
import java.util.UUID;
public record OrderAfterCreateV1(
UUID orderId,
UUID startHubId,
UUID endHubId,
UUID receiverFirmId,
String receiverFirmFullAddress,
String receiverFirmOwnerName,
String requestNote,
String productName,
int productQuantity,
LocalDateTime orderCreatedAt
) {}
// OpenFeign
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign'
spring:
kafka:
bootstrap-servers: localhost:9092
clients:
hub:
url: http://hub-server:19093
feign:
client:
config:
default:
connectTimeout: 2000
readTimeout: 3000
loggerLevel: basic
package chill_logistics.delivery_server.infrastructure.client.dto;
import java.util.UUID;
public record HubForDeliveryResponseV1(
UUID hubId,
String hubName,
String hubFullAddress
) {}
package chill_logistics.delivery_server.infrastructure.config;
import feign.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FeignConfig {
@Bean
public Logger.Level feignLoggerLevel() {
return Logger.Level.BASIC;
}
}
package chill_logistics.delivery_server.infrastructure.client;
import chill_logistics.delivery_server.infrastructure.client.dto.HubForDeliveryResponseV1;
import java.util.UUID;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@FeignClient(
name = "hubClient",
url = "${clients.hub.url}",
configuration = chill_logistics.delivery_server.infrastructure.config.FeignConfig.class
)
public interface HubClient {
@GetMapping("/v1/internal/hubs/{hubId}")
HubForDeliveryResponseV1 getHub(@PathVariable("hubId") UUID hubId);
}
package chill_logistics.delivery_server.application;
import chill_logistics.delivery_server.domain.entity.DeliveryStatus;
import chill_logistics.delivery_server.domain.entity.FirmDelivery;
import chill_logistics.delivery_server.domain.entity.HubDelivery;
import chill_logistics.delivery_server.domain.repository.FirmDeliveryRepository;
import chill_logistics.delivery_server.domain.repository.HubDeliveryRepository;
import chill_logistics.delivery_server.infrastructure.client.HubClient;
import chill_logistics.delivery_server.infrastructure.client.dto.HubForDeliveryResponseV1;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@RequiredArgsConstructor
public class DeliveryService {
private final HubDeliveryRepository hubDeliveryRepository;
private final FirmDeliveryRepository firmDeliveryRepository;
private final HubClient hubClient;
/* [허브 배송 생성 메서드]
* Kafka 메시지로 order 정보 + FeignClient로 hub 정보 받아와서 허브 배송 생성
*/
@Transactional
public void createHubDelivery(OrderAfterCreateV1 message) {
log.info("[허브 배송 생성 시작] - orderId={}", message.orderId());
// 1. Hub 서비스에서 허브 정보 조회 (Feign)
HubForDeliveryResponseV1 startHub = hubClient.getHub(message.startHubId());
HubForDeliveryResponseV1 endHub = hubClient.getHub(message.endHubId());
// 2. 초기 배송 상태 & 배송 순서 셋팅
DeliveryStatus deliveryStatus = DeliveryStatus.WAITING_FOR_HUB;
// TODO: 배송순서 로직 수정 필요
int deliverySequenceNum = 1;
// 3. HubDelivery 엔티티 생성
HubDelivery hubDelivery = HubDelivery.createFrom(
message,
startHub.hubName(),
startHub.hubFullAddress(),
endHub.hubName(),
endHub.hubFullAddress(),
deliveryStatus,
deliverySequenceNum
);
// 4. 허브 배송 저장
HubDelivery savedHubDelivery = hubDeliveryRepository.save(hubDelivery);
log.info("[허브 배송 생성 완료] - hubDeliveryId={}, orderId={}",
savedHubDelivery.getId(), savedHubDelivery.getOrderId());
}
/* [업체 배송 생성 메서드]
* Kafka 메시지로 order 정보 받아와서 업체 배송 생성
*/
@Transactional
public void createFirmDelivery(OrderAfterCreateV1 message) {
log.info("[업체 배송 생성 시작] - orderId={}", message.orderId());
// 1. 초기 배송 상태 & 배송 순서 셋팅
DeliveryStatus deliveryStatus = DeliveryStatus.MOVING_TO_FIRM;
int deliverySequenceNum = 2;
// 2. FirmDelivery 엔티티 생성
FirmDelivery firmDelivery = FirmDelivery.createFrom(
message,
deliveryStatus,
deliverySequenceNum
);
// 3. 업체 배송 저장
FirmDelivery savedFirmDelivery = firmDeliveryRepository.save(firmDelivery);
log.info("[업체 배송 생성 완료] - firmDeliveryId={}, orderId={}",
savedFirmDelivery.getId(), savedFirmDelivery.getOrderId());
}
/* [전체 배송 생성]
* 허브 배송 + 업체 배송 = 전체 배송 생성
*/
@Transactional
public void createDelivery(OrderAfterCreateV1 message) {
log.info("[배송 생성 시작] - orderId={}", message.orderId());
createHubDelivery(message);
createFirmDelivery(message);
log.info("[배송 생성 완료] - orderId={}", message.orderId());
}
}
package chill_logistics.delivery_server.presentation;
import chill_logistics.delivery_server.application.DeliveryService;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import lib.entity.BaseStatus;
import lib.web.response.BaseResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/delivery")
public class DeliveryController {
private final DeliveryService deliveryService;
/**
* [배송 생성]
*
* @param message 주문 정보가 담긴 Kafka 메시지
* @return status CREATED 반환
*/
@PostMapping
public BaseResponse<Void> createDelivery(OrderAfterCreateV1 message) {
deliveryService.createDelivery(message);
return BaseResponse.ok(BaseStatus.CREATED);
}
}