Kafka와 Spring Boot를 연동하여 JSON 형식의 메시지를 안전하게 송수신하는 구조를 구현해보자.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181 # Kafka가 이 포트를 통해 Zookeeper에 접근함
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1 # 브로커 고유 ID (클러스터일 경우 각기 다르게 설정)
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # Kafka가 내부적으로 Zookeeper와 연결
# 외부(로컬 개발 환경 포함)에서 Kafka 브로커에 접근 가능한 주소
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
# Consumer의 offset 정보를 저장할 내부 토픽의 복제 수
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Kafka는 메시지를 처리하는 핵심 브로커이고 Zookeeper는 Kafka의 메타데이터(브로커, 파티션 등)를 관리해주는 관리자 역할이다.
KAFKA_ADVERTISED_LISTENERS
가 중요하다. 이 설정이 Spring 앱에서 Kafka에 연결할 수 있도록 해주는 Kafka의 "공식 입구"다.
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
는 최소 1 이상이어야 하는데, 단일 브로커에서는 1로 설정해야 오류가 나지 않는다.
spring:
kafka:
# Kafka 브로커의 접속 주소
# 클러스터 환경이라면 여러 개 적을 수도 있음
bootstrap-servers: localhost:9092
consumer:
# 컨슈머 그룹 ID. 같은 그룹에 속한 컨슈머는 서로 다른 파티션만 소비 (중복 없음)
group-id: test-group
# 해당 컨슈머 그룹의 오프셋 정보가 없을 경우 맨 앞(offset=0) 부터 읽도록 설정
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
# Kafka는 데이터를 바이트 배열로 전송해야 하므로 직렬화가 필요
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Kafka는 메시지를 전송할 때 데이터를 내부적으로 바이트 배열(ByteArray) 로 변환하여 전송한다. 따라서 객체 데이터를 Kafka로 전송하려면 직렬화(Serialize) 과정이 반드시 필요하다.
문자열을 보낼 때는 StringSerializer
를 설정하고 DTO 객체를 JSON으로 변환해 전송할 경우에는 JsonSerializer
를 설정해야 한다.
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, MessageDto> producerFactory() {
Map<String, Object> config = new HashMap<>();
// Kafka 브로커 주소
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 메시지 Key를 문자열로 직렬화
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 메시지 Value를 JSON으로 직렬화
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, MessageDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, MessageDto> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// Kafka 브로커 주소
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 컨슈머 그룹 ID
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
// Key 역직렬화
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Value 역직렬화
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 모든 패키지의 클래스를 신뢰 (보안 목적 주의 필요)
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
// JsonDeserializer에 타입 정보를 명시하여 DTO로 매핑
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(MessageDto.class, false));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
ProducerFactory
JsonSerializer
를 통해 JSON 형식으로 직렬화한다.KafkaTemplate
producerFactory
를 사용ConsumerFactory
JsonDeserializer
를 사용TRUSTED_PAKAGES
는 역직렬화할 클래스의 패키지를 허용하는 설정이며, 보안상 "*"
대신 특정 패키지를 지정하는 것이 권장된다.ConcurrentKafkaListenerContainerFactory
@kafkaListener
에서 사용할 Kafka 리스너 컨테이너 팩토리를 생성consumerFactory
를 사용해 메시지를 수신하고 멀티스레드 환경에서도 안정적으로 동작하도록 구성@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@ToString
public class MessageDto {
private String sender;
private String content;
}
@RestController
@RequiredArgsConstructor
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public ResponseEntity<String> send(@RequestBody MessageDto message) {
kafkaProducerService.send("test-topic", message);
return ResponseEntity.ok("카프카로 메세지 전송 성공");
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String, MessageDto> kafkaTemplate;
public void send(String topic, MessageDto message) {
kafkaTemplate.send(topic, message);
log.info("[Producer] 객체 전송 : " + message);
}
}
@Slf4j
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(MessageDto message) {
log.info("[Consumer] 객체 수신 : " + message);
}
}
/kafka/send
@KafkaListener
가 메시지를 실시간으로 수신Kafka 메시지는 기본적으로 Byte 배열로 전송되므로 객체를 전송하려면 직렬화/역직렬화 설정이 반드시 필요했다. KafkaTemplate
는 Producer 역할을 하고 @KafkaListener
는 Consumer 역할을 수행하고 각각의 동작을 위해 ProducerFactory
와 ConsumerFactory
의 설정이 중요하다는 걸 알 수 있었다.
또한 ConcurrentKafkaListenerContainerFactory
를 활용하면 멀티스레드 기반의 병렬 메시지 소비가 가능해지고 시스템의 확장성과 안정성을 모두 챙길 수 있는 구조로 발전시킬 수 있다는 점에서 매우 유용하다고 느꼈다.