고속 대용량 로그/이벤트를 토픽 단위로 쌓아두고, 여러 서비스가 비동기로 구독해서 처리하는 분산 메시징/스트리밍 플랫폼.
조금 자세히 보면:
Broker
Topic
order-created, payment-completed, user-signup 등.Partition
Producer
KafkaTemplate, REST API 서비스 등.Consumer
Consumer Group
Offset
outbox는 kafka 메시지 전송에 실패했을 때에 대한 처리를 위해 필요하므로, 추후에 적용 예정.
(order에서 publilsh 할 때는 필요 없을 듯 하다)
outbox 역할:
지금은 MVP 개발에 집중하고, 고도화 기간에 실패했을 때 어떡하지?에 대한 처리로 적용하면 베스트
“주문 생성 이벤트”를 Kafka로 발행하고, 다른 서비스가 그걸 구독해서 처리하는 시나리오로 생각해보았다.
Docker Compose로 Kafka가 localhost:9092(기본 포트번호)로 떠 있다고 가정했을 때,
📌 시나리오
order-server(Producer) : 주문 생성 API → KafkaOrderAfterCreate이벤트 발행hub-server(Consumer) :OrderAfterCreate구독 → 메시지에서 받아온 주문 정보 기반으로 알고리즘 적용 후 예상 소요시간 계산hub-server(Producer) : KafkaHubRouteAfterCrete(주문 정보 + 예상 소요 시간) 이벤트 발행delivery-server(Consumer) : KafkaHubRouteAfterCrete구독 → 배송 생성
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
spring:
application:
name: order-server
kafka:
bootstrap-servers: localhost:9092
app:
kafka:
topic:
order-after-create: order-after-create
package chill_logistics.order_server.infrastructure.config;
import chill_logistics.order_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, OrderAfterCreateV1> orderAfterCreateProducerFactory() {
Map<String, Object> props = new HashMap<>();
// Kafka Broker
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 직렬화 설정
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 필요시 기타 옵션 (acks, retries 등) 추가
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, OrderAfterCreateV1> orderAfterCreateKafkaTemplate() {
return new KafkaTemplate<>(orderAfterCreateProducerFactory());
}
}
package chill_logistics.order_server.infrastructure.kafka.dto;
import java.time.LocalDateTime;
import java.util.UUID;
public record OrderAfterCreateV1(
UUID orderId,
UUID supplierHubId,
UUID receiverHubId,
UUID receiverFirmId,
String receiverFirmFullAddress,
String receiverFirmOwnerName,
String requestNote,
String productName,
int productQuantity,
LocalDateTime orderCreatedAt
) {}
package chill_logistics.order_server.infrastructure.kafka;
import chill_logistics.order_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderAfterCreateProducer {
private final KafkaTemplate<String, OrderAfterCreateV1> orderAfterCreateKafkaTemplate;
@Value("${app.kafka.topic.order-after-create}")
private String orderAfterCreateTopic;
/* [주문 생성 후 Kafka로 OrderAfterCreate 이벤트 발행]
*/
public void sendOrderAfterCreate(OrderAfterCreateV1 message) {
String key = message.orderId().toString();
log.info("[Kafka] OrderAfterCreate 메시지 발행, topic={}, key={}, message={}",
orderAfterCreateTopic, key, message);
orderAfterCreateKafkaTemplate
.send(orderAfterCreateTopic, key, message)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[Kafka] OrderAfterCreate 메시지 전송 실패, key={}", key, ex);
} else {
log.info("[Kafka] OrderAfterCreate 메시지 전송 성공, orderId={}, offset={}",
key, result.getRecordMetadata().offset());
}
});
}
}
package chill_logistics.order_server.application;
import chill_logistics.order_server.domain.entity.Order;
import chill_logistics.order_server.domain.repository.OrderRepository;
import chill_logistics.order_server.infrastructure.kafka.OrderAfterCreateProducer;
import chill_logistics.order_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OrderAfterCreateProducer orderAfterCreateProducer;
@Transactional
public UUID createOrder(CreateOrderRequest request) {
// 1. 주문 엔티티 생성
Order order = Order.create(
request.supplierHubId(),
request.receiverHubId(),
request.receiverFirmId(),
request.receiverFirmFullAddress(),
request.receiverFirmOwnerName(),
request.requestNote(),
request.productName(),
request.productQuantity()
);
// 2. 저장
orderRepository.save(order);
// 3. Kafka 메시지 생성
OrderAfterCreateV1 message = new OrderAfterCreateV1(
order.getId(),
order.getSupplierHubId(),
order.getReceiverHubId(),
order.getReceiverFirmId(),
order.getReceiverFirmFullAddress(),
order.getReceiverFirmOwnerName(),
order.getRequestNote(),
order.getProductName(),
order.getProductQuantity(),
order.getCreatedAt()
);
// 4. Kafka 메시지 발행
orderAfterCreateProducer.sendOrderAfterCreate(message);
log.info("주문 생성 완료 + Kafka 메시지 발행 완료. orderId={}", order.getId());
return order.getId();
}
}
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
spring:
application:
name: hub-server
kafka:
bootstrap-servers: localhost:9092
package chill_logistics.hub_server.infrastructure.config;
import chill_logistics.hub_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, OrderAfterCreateV1> orderConsumerFactory() {
// JSON → OrderAfterCreateV1 역직렬화를 위한 Deserializer
JsonDeserializer<OrderAfterCreateV1> deserializer =
new JsonDeserializer<>(OrderAfterCreateV1.class, false);
// Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
deserializer.addTrustedPackages(
"chill_logistics.delivery_server.infrastructure.kafka.dto"
);
// Kafka Consumer 설정 값
Map<String, Object> properties = new HashMap<>();
// Kafka Broker 주소 (Docker Compose에서 기본적으로 localhost:9092로 띄움)
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Consumer Group ID (같은 Group으로 묶인 Consumer들은 같은 메시지를 중복 처리하지 않음)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hub-server-group");
// 메시지 Key 역직렬화 방식
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 Value 역직렬화 방식 (JsonDeserializer 사용)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
return new DefaultKafkaConsumerFactory<>(
properties,
new StringDeserializer(), // Key Deserializer (String)
deserializer // Value Deserializer (OrderAfterCreateV1)
);
}
// @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1>
orderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// ConsumerFactory 설정
factory.setConsumerFactory(orderConsumerFactory());
return factory;
}
}
package chill_logistics.hub_server.infrastructure.kafka.dto;
import java.time.LocalDateTime;
import java.util.UUID;
public record OrderAfterCreateV1(
UUID orderId,
UUID supplierHubId,
UUID receiverHubId,
UUID receiverFirmId,
String receiverFirmFullAddress,
String receiverFirmOwnerName,
String requestNote,
String productName,
int productQuantity,
LocalDateTime orderCreatedAt) {
// application 계층의 command 변환 메서드
public OrderAfterCommandV1 toCommand() {
return new OrderAfterCommandV1(
orderId(),
supplierHubId(),
receiverHubId(),
receiverFirmId(),
receiverFirmFullAddress(),
receiverFirmOwnerName(),
requestNote(),
productName(),
productQuantity(),
orderCreatedAt(),
);
}
}
package chill_logistics.hub_server.application.dto.command;
import java.time.LocalDateTime;
import java.util.UUID;
public record OrderAfterCommandV1(
UUID orderId,
UUID supplierHubId,
UUID receiverHubId,
UUID receiverFirmId,
String receiverFirmFullAddress,
String receiverFirmOwnerName,
String requestNote,
String productName,
int productQuantity,
LocalDateTime orderCreatedAt,
) {}
package chill_logistics.hub_server.infrastructure.kafka;
import chill_logistics.hub_server.application.HubCommandService;
import chill_logistics.hub_server.application.dto.command.OrderAfterCommandV1;
import chill_logistics.hub_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderAfterCreateListener {
private final HubCommandService hubCommandService;
@KafkaListener(
topics = "order-after-create",
containerFactory = "orderKafkaListenerContainerFactory"
)
public void listen(OrderAfterCreateV1 message) {
log.info("Kafka 메시지 수신: {}", message);
// DTO → Command 변환
OrderAfterCommandV1 command = message.toCommand();
hubCommandService.calculateExpectedDuration(command);
}
}
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
spring:
application:
name: hub-server
kafka:
bootstrap-servers: localhost:9092
app:
kafka:
topic:
hub-route-after-create: hub-route-after-create
package chill_logistics.hub_server.infrastructure.config;
import chill_logistics.hub_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, HubRouteAfterCreateV1> hubRouteAfterCreateProducerFactory() {
Map<String, Object> props = new HashMap<>();
// Kafka Broker
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 직렬화 설정
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 필요시 기타 옵션 (acks, retries 등) 추가 가능
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, HubRouteAfterCreateV1> hubRouteAfterCreateKafkaTemplate() {
return new KafkaTemplate<>(hubRouteAfterCreateProducerFactory());
}
}
package chill_logistics.hub_server.infrastructure.kafka.dto;
import java.time.LocalDateTime;
import java.util.UUID;
public record HubRouteAfterCreateV1(
UUID orderId,
UUID startHubId,
String startHubName,
String startHubFullAddress,
UUID endHubId,
String endHubName,
String endHubFullAddress,
UUID receiverFirmId,
String receiverFirmFullAddress,
String receiverFirmOwnerName,
String requestNote,
String productName,
int productQuantity,
LocalDateTime orderCreatedAt,
Integer expectedDeliveryDuration
) {}
package chill_logistics.hub_server.infrastructure.kafka;
import chill_logistics.hub_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class HubRouteAfterCreateProducer {
private final KafkaTemplate<String, HubRouteAfterCreateV1> hubRouteAfterCreateKafkaTemplate;
@Value("${app.kafka.topic.hub-route-after-create}")
private String hubRouteAfterCreateTopic;
/* [허브 경로 생성 후 Kafka로 HubRouteAfterCreate 이벤트 발행]
*/
public void sendHubRouteAfterCreate(HubRouteAfterCreateV1 message) {
String key = message.orderId().toString();
log.info("[Kafka] HubRouteAfterCreate 메시지 발행, topic={}, key={}, message={}",
hubRouteAfterCreateTopic, key, message);
hubRouteAfterCreateKafkaTemplate
.send(hubRouteAfterCreateTopic, key, message)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[Kafka] HubRouteAfterCreate 메시지 전송 실패, key={}", key, ex);
} else {
log.info("[Kafka] HubRouteAfterCreate 메시지 전송 성공, orderId={}, offset={}",
key, result.getRecordMetadata().offset());
}
});
}
}
package chill_logistics.hub_server.application;
import chill_logistics.hub_server.domain.entity.Hub;
import chill_logistics.hub_server.domain.repository.HubRepository;
import chill_logistics.hub_server.infrastructure.kafka.HubRouteAfterCreateProducer;
import chill_logistics.hub_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@RequiredArgsConstructor
public class HubService {
private final HubRepository hubRepository;
private final HubRouteAfterCreateProducer hubRouteAfterCreateProducer;
@Transactional
public calculateExpectedDuration() {
// 1. 예상 소요 시간 계산
// 2. 저장 (임의로 hub라고 명칭했습니다)
hubRepository.save(hub);
// 3. Kafka 메시지 생성
HubRoutefterCreateV1 message = new HubRoutefterCreateV1(
hub.getOrderId(),
hub.getStartHubId(),
hub.getStartHubName(),
hub.getStartHubFullAddress(),
hub.getEndHubId(),
hub.getEndHubName(),
hub.getEndHubFullAddress(),
hub.getReceiverFirmId(),
hub.getReceiverFirmFullAddress(),
hub.getReceiverFirmOwnerName(),
hub.getRequestNote(),
hub.getProductName(),
hub.getProductQuantity(),
hub.getOrderCreatedAt(),
hub.getExpectedDuration(),
);
// 4. Kafka 메시지 발행
hubRouteAfterCreateProducer.sendHubRouteAfterCreate(message);
log.info("예상 소요 시간 계산 생성 완료 + Kafka 메시지 발행 완료. orderId={}", hub.getOrderId());
return hub.getOrderId();
}
}
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
spring:
application:
name: delivery-server
kafka:
bootstrap-servers: localhost:9092
package chill_logistics.delivery_server.infrastructure.config;
import chill_logistics.delivery_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, HubRouteAfterCreateV1> hubConsumerFactory() {
// JSON → HubRouteAfterCreateV1 역직렬화를 위한 Deserializer
JsonDeserializer<HubRouteAfterCreateV1> deserializer =
new JsonDeserializer<>(HubRouteAfterCreateV1.class, false);
// Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
deserializer.addTrustedPackages(
"chill_logistics.delivery_server.infrastructure.kafka.dto"
);
// Kafka Consumer 설정 값
Map<String, Object> properties = new HashMap<>();
// Kafka Broker 주소 (Docker Compose에서 기본적으로 localhost:9092로 띄움)
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Consumer Group ID (같은 Group으로 묶인 Consumer들은 같은 메시지를 중복 처리하지 않음)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "delivery-server-group");
// 메시지 Key 역직렬화 방식
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 Value 역직렬화 방식 (JsonDeserializer 사용)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
return new DefaultKafkaConsumerFactory<>(
properties,
new StringDeserializer(), // Key Deserializer (String)
deserializer // Value Deserializer (HubRouteAfterCreateV1)
);
}
// @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, HubRouteAfterCreateV1>
hubKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, HubRouteAfterCreateV1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// ConsumerFactory 설정
factory.setConsumerFactory(hubConsumerFactory());
return factory;
}
}
package chill_logistics.delivery_server.infrastructure.kafka.dto;
import chill_logistics.delivery_server.application.dto.command.HubRouteAfterCommandV1;
import java.time.LocalDateTime;
import java.util.UUID;
public record HubRouteAfterCreateV1(
UUID orderId,
UUID startHubId,
String startHubName,
String startHubFullAddress,
UUID endHubId,
String endHubName,
String endHubFullAddress,
UUID receiverFirmId,
String receiverFirmFullAddress,
String receiverFirmOwnerName,
String requestNote,
String productName,
int productQuantity,
LocalDateTime orderCreatedAt,
Integer expectedDeliveryDuration) {
// application 계층의 command 변환 메서드
public HubRouteAfterCommandV1 toCommand() {
return new HubRouteAfterCommandV1(
orderId(),
startHubId(),
startHubName(),
startHubFullAddress(),
endHubId(),
endHubName(),
endHubFullAddress(),
receiverFirmId(),
receiverFirmFullAddress(),
receiverFirmOwnerName(),
requestNote(),
productName(),
productQuantity(),
orderCreatedAt(),
expectedDeliveryDuration()
);
}
}
package chill_logistics.delivery_server.application.dto.command;
import java.time.LocalDateTime;
import java.util.UUID;
public record HubRouteAfterCommandV1(
UUID orderId,
UUID startHubId,
String startHubName,
String startHubFullAddress,
UUID endHubId,
String endHubName,
String endHubFullAddress,
UUID receiverFirmId,
String receiverFirmFullAddress,
String receiverFirmOwnerName,
String requestNote,
String productName,
int productQuantity,
LocalDateTime orderCreatedAt,
Integer expectedDeliveryDuration
) {}
package chill_logistics.delivery_server.infrastructure.kafka;
import chill_logistics.delivery_server.application.DeliveryCommandService;
import chill_logistics.delivery_server.application.dto.command.HubRouteAfterCommandV1;
import chill_logistics.delivery_server.infrastructure.kafka.dto.HubRouteAfterCreateV1;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class HubRouteAfterCreateListener {
private final DeliveryCommandService deliveryCommandService;
@KafkaListener(
topics = "hub-route-after-create",
containerFactory = "hubKafkaListenerContainerFactory"
)
public void listen(HubRouteAfterCreateV1 message) {
log.info("Kafka 메시지 수신: {}", message);
// 허브/업체 배송 담당자 배정 (임시 스텁)
UUID hubDeliveryPersonId = assignHubDeliveryPerson(message);
UUID firmDeliveryPersonId = assignFirmDeliveryPerson(message);
HubRouteAfterCommandV1 command = message.toCommand();
deliveryCommandService.createDelivery(command, hubDeliveryPersonId, firmDeliveryPersonId);
}
// 허브 배송 담당자 배정 (임시 버전 - 이후 배정 로직으로 교체)
private UUID assignHubDeliveryPerson(HubRouteAfterCreateV1 message) {
return UUID.fromString("00000000-0000-0000-0000-000000000001");
}
// 업체 배송 담당자 배정 (임시 버전 - 이후 배정 로직으로 교체)
private UUID assignFirmDeliveryPerson(HubRouteAfterCreateV1 message) {
return UUID.fromString("00000000-0000-0000-0000-000000000002"); // 임시값
}
}