Kafka란?
고속 대용량 로그/이벤트를 토픽 단위로 쌓아두고, 여러 서비스가 비동기로 구독해서 처리하는 분산 메시징/스트리밍 플랫폼.
조금 자세히 보면:
Broker
Topic
order-created, payment-completed, user-signup 등.Partition
Producer
KafkaTemplate, REST API 서비스 등.Consumer
Consumer Group
Offset
“주문 생성 이벤트”를 Kafka로 발행하고, 다른 서비스가 그걸 구독해서 처리하는 시나리오로 생각해보았다.
로컬에서 Kafka가 떠 있다고 가정했을 때,
📌 시나리오
order-server: 주문 생성 API → Kafka 토픽에OrderAfterCreate이벤트 발행 (Producer)delivery-server: KafkaOrderAfterCreate구독 → 배송 생성 (Consumer)
실제로는 공통 라이브러리 모듈(예: order-events-common)을 만들어서 두 서비스에서 같이 쓰는 게 베스트.
두 서비스 모두 implementation project(":order-events-common") 같은 식으로 의존성 추가해서 사용한다고 생각하면 됨.
package chill_logistics.delivery_server.infrastructure.kafka.dto;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
import java.time.LocalDateTime;
import java.util.UUID;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class OrderAfterCreateV1 {
@NotNull
private UUID startHubId;
@NotBlank
private String startHubFullAddress;
@NotNull
private UUID endHubId;
@NotBlank
private String endHubFullAddress;
@NotNull
@Positive
private Integer deliverySequenceNum;
@NotBlank
private String deliveryStatus; // ENUM String
@NotNull
@Positive
private Double expectedDistance;
@NotNull
private LocalDateTime expectedDeliveryDuration;
@Builder
private OrderAfterCreateV1(
UUID startHubId,
String startHubName,
String startHubFullAddress,
UUID endHubId,
String endHubName,
String endHubFullAddress,
Integer deliverySequenceNum,
String deliveryStatus,
Double expectedDistance,
LocalDateTime expectedDeliveryDuration
) {
this.startHubId = startHubId;
this.startHubFullAddress = startHubFullAddress;
this.endHubId = endHubId;
this.endHubFullAddress = endHubFullAddress;
this.deliverySequenceNum = deliverySequenceNum;
this.deliveryStatus = deliveryStatus;
this.expectedDistance = expectedDistance;
this.expectedDeliveryDuration = expectedDeliveryDuration;
}
}
DTO는 따로 구성하는 것이 좋고, 필요한 데이터만 뽑아서 쓸 수 있다.
이 때, 데이터를 받을 때, 인프라 계층에서 데이터를 모두 받고 애플리케이션 계층에서 필요한 데이터만 뽑아서 써도 된다.
Kafka는 infrastructure 계층에 KafkaListner(Adapter역할), Consumer를 두고, application 계층에서 어댑터 사용.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
implementation project(':order-events-common')
}
server:
port: 8081
spring:
application:
name: order-service
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer를
JsonSerializer로 설정해서 DTO를 JSON으로 자동 직렬화.
package com.example.order.config;
import com.example.events.OrderCreatedEvent;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, OrderCreatedEvent> orderProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, OrderCreatedEvent> orderKafkaTemplate() {
return new KafkaTemplate<>(orderProducerFactory());
}
}
package com.example.order.kafka;
import com.example.events.OrderCreatedEvent;
import java.time.LocalDateTime;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventProducer {
private static final String TOPIC = "order-created";
private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
public void publishOrderCreatedEvent(String orderId, Long userId, Long productId, Integer quantity) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(orderId)
.userId(userId)
.productId(productId)
.quantity(quantity)
.orderedAt(LocalDateTime.now())
.build();
log.info("[order-service] publishing event to Kafka: {}", event);
kafkaTemplate.send(TOPIC, orderId, event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[order-service] failed to send event", ex);
} else {
log.info("[order-service] event sent. topic={}, partition={}, offset={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset()
);
}
});
}
}
package com.example.order.presentation;
import com.example.order.kafka.OrderEventProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@RestController
@RequiredArgsConstructor
@RequestMapping("/orders")
public class OrderController {
private final OrderEventProducer orderEventProducer;
@PostMapping
public String createOrder(@RequestParam Long userId,
@RequestParam Long productId,
@RequestParam Integer quantity) {
// 1. 실제로는 여기서 DB에 주문 저장
String orderId = "ORD-" + System.currentTimeMillis(); // 예시용
// 2. Kafka 이벤트 발행
orderEventProducer.publishOrderCreatedEvent(orderId, userId, productId, quantity);
// 3. 클라이언트 응답
return "Order created: " + orderId;
}
}
이 서비스는 8081포트에서 HTTP 요청 처리 + Kafka Producer 역할만 담당.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
implementation project(':order-events-common')
}
웹 API가 굳이 필요 없으면 spring-boot-starter-web 없이, 배치/워커 앱처럼 돌아가도 됨.
server:
port: 19096
spring:
application:
name: delivery-server
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: notification-service-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
listener:
ack-mode: record
여기서 선택지가 두 가지 있다.
이 Consumer만 커스터마이징 → Java 설정 유지할 거라면, yaml에서 최소로만 두고, 진짜 동작은 KafkaConsumerConfig에 맡긴다.
예를 들어 yaml는 단순히 broker 주소만:
spring:
kafka:
bootstrap-servers: localhost:9092
그리고 나머지 consumer 옵션, trusted packages, JsonDeserializer 타입 등은 전부 자바 config에서 관리.
Java Config를 더 단순하게 하고, yml 쪽을 더 적극적으로 쓰고 싶다면:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: delivery-server-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: chill_logistics.delivery_server.infrastructure.kafka.dto
그리고 자바 코드는 훨씬 간단하게:
@Bean
public ConsumerFactory<String, OrderAfterCreateV1> orderConsumerFactory() {
JsonDeserializer<OrderAfterCreateV1> deserializer =
new JsonDeserializer<>(OrderAfterCreateV1.class, false);
deserializer.addTrustedPackages(
"chill_logistics.delivery_server.infrastructure.kafka.dto"
);
return new DefaultKafkaConsumerFactory<>(
new HashMap<>(),
new StringDeserializer(),
deserializer
);
}
필자는 1번의 방법으로 선택했다. 이유는,
다른 컨슈머가 추가될 경우에도
이렇게 확장성이 좋아진다고 판단했기 때문이다.
package chill_logistics.delivery_server.infrastructure.kafka.config;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, OrderAfterCreateV1> orderConsumerFactory() {
// JSON → OrderAfterCreateV1 역직렬화를 위한 Deserializer
JsonDeserializer<OrderAfterCreateV1> deserializer =
new JsonDeserializer<>(OrderAfterCreateV1.class, false);
// Kafka 메시지 역직렬화 시 허용할 패키지를 명시적으로 지정
deserializer.addTrustedPackages(
"chill_logistics.delivery_server.infrastructure.kafka.dto"
);
// Kafka Consumer 설정 값
Map<String, Object> properties = new HashMap<>();
// Kafka Broker 주소 (Docker Compose에서 기본적으로 localhost:9092로 띄움)
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Consumer Group ID (같은 Group으로 묶인 Consumer들은 같은 메시지를 중복 처리하지 않음)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "delivery-server-group");
// 메시지 Key 역직렬화 방식
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 Value 역직렬화 방식 (JsonDeserializer 사용)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 위 설정값 + Key/Value Deserializer를 기반으로 실제 Consumer 인스턴스를 만들어 KafkaListener에 제공
return new DefaultKafkaConsumerFactory<>(
properties,
new StringDeserializer(), // Key Deserializer (String)
deserializer // Value Deserializer (OrderAfterCreateV1)
);
}
// @KafkaListener가 사용할 Listener 컨테이너 생성 (ConsumerFactory를 주입하여 실제 Kafka Consumer 동작을 구성)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1>
orderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderAfterCreateV1> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// ConsumerFactory 설정
factory.setConsumerFactory(orderConsumerFactory());
return factory;
}
}
⭐ Kafka는 infrastructure 계층에 KafkaListner(Adapter역할), Consumer를 두고, application 계층에서 어댑터 사용하는 방식으로 수정 필요.
package chill_logistics.delivery_server.application;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import org.springframework.stereotype.Service;
@Service
public class DeliveryService {
public void createDelivery(OrderAfterCreateV1 message) {
}
}
⭐ Kafka는 infrastructure 계층에 KafkaListner(Adapter역할), Consumer를 두고, application 계층에서 어댑터 사용하는 방식으로 수정 필요.
package chill_logistics.delivery_server.infrastructure.kafka;
import chill_logistics.delivery_server.infrastructure.kafka.dto.OrderAfterCreateV1;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderAfterCreateListener {
@KafkaListener(
topics = "order-after-create",
containerFactory = "orderKafkaListenerContainerFactory"
)
public void listen(OrderAfterCreateV1 message) {
log.info("메시지 수신 OrderAfterCreateV1: {}", message);
// 여기서 이제 service 호출
}
}
이 서비스는 HTTP 요청 없이 백그라운드 워커처럼 돌아가면서 Kafka 메시지만 소비해도 되고, 필요하면 조회 API를 추가해서 “알림 이력 조회” 같은 기능도 만들 수 있다.
Kafka/Zookeeper 띄우기
delivery-server 실행 (19096)
order-server 실행 (19095)
주문 생성 요청 보내기
curl -X POST "http://localhost:8081/orders?userId=1&productId=100&quantity=2"
로그에서 확인:
order-server : Kafka로 event 발행 로그delivery-server : Kafka에서 event 소비 + 배송 생성 로그