Spring Boot + Kafka 연동해 메시지 송수신 구조 구현 예제

송현진·2025년 4월 16일
0

Kafka

목록 보기
2/7

✅목표

Kafka와 Spring Boot를 연동하여 JSON 형식의 메시지를 안전하게 송수신하는 구조를 구현해보자.

Kafka 환경 구성 (docker-compose.yml)

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181  # Kafka가 이 포트를 통해 Zookeeper에 접근함

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1  # 브로커 고유 ID (클러스터일 경우 각기 다르게 설정)
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  # Kafka가 내부적으로 Zookeeper와 연결
      # 외부(로컬 개발 환경 포함)에서 Kafka 브로커에 접근 가능한 주소
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      # Consumer의 offset 정보를 저장할 내부 토픽의 복제 수
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Kafka는 메시지를 처리하는 핵심 브로커이고 Zookeeper는 Kafka의 메타데이터(브로커, 파티션 등)를 관리해주는 관리자 역할이다.

KAFKA_ADVERTISED_LISTENERS가 중요하다. 이 설정이 Spring 앱에서 Kafka에 연결할 수 있도록 해주는 Kafka의 "공식 입구"다.

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR는 최소 1 이상이어야 하는데, 단일 브로커에서는 1로 설정해야 오류가 나지 않는다.

Spring Boot 설정 (application.yml)

spring:
  kafka:
    # Kafka 브로커의 접속 주소
    # 클러스터 환경이라면 여러 개 적을 수도 있음
    bootstrap-servers: localhost:9092
    consumer:
      # 컨슈머 그룹 ID. 같은 그룹에 속한 컨슈머는 서로 다른 파티션만 소비 (중복 없음)
      group-id: test-group
	  # 해당 컨슈머 그룹의 오프셋 정보가 없을 경우 맨 앞(offset=0) 부터 읽도록 설정
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      # Kafka는 데이터를 바이트 배열로 전송해야 하므로 직렬화가 필요
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Kafka는 메시지를 전송할 때 데이터를 내부적으로 바이트 배열(ByteArray) 로 변환하여 전송한다. 따라서 객체 데이터를 Kafka로 전송하려면 직렬화(Serialize) 과정이 반드시 필요하다.
문자열을 보낼 때는 StringSerializer를 설정하고 DTO 객체를 JSON으로 변환해 전송할 경우에는 JsonSerializer를 설정해야 한다.

Kafka 메시지 송수신 구현

Config

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, MessageDto> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        // Kafka 브로커 주소
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 메시지 Key를 문자열로 직렬화
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 메시지 Value를 JSON으로 직렬화
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, MessageDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, MessageDto> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        // Kafka 브로커 주소
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 컨슈머 그룹 ID
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        // Key 역직렬화
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // Value 역직렬화
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        // 모든 패키지의 클래스를 신뢰 (보안 목적 주의 필요)
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        
        // JsonDeserializer에 타입 정보를 명시하여 DTO로 매핑
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(MessageDto.class, false));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageDto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

ProducerFactory

  • Kafka로 메시지를 보내기 위한 Producer 인스턴스를 생성하는 팩토리
  • 객체를 Kafka로 전송할 수 있도록 JsonSerializer를 통해 JSON 형식으로 직렬화한다.
  • Kafka 브로커 주소 및 직렬화 방식을 설정

KafkaTemplate

  • 실제 Kafka로 메시지를 전송하는 핵심 컴포넌트
  • REST API나 Service에서 Kafka로 메시지를 보낼 때 사용
  • 내부적으로 위의 producerFactory를 사용

ConsumerFactory

  • Kafka에서 메시지를 수신하는 Consumer 인스턴스를 생성하는 팩토리
  • Kafka에서 수신된 JSON 메시지를 Java 객체로 역직렬화 하기 위해 JsonDeserializer를 사용
  • TRUSTED_PAKAGES는 역직렬화할 클래스의 패키지를 허용하는 설정이며, 보안상 "*" 대신 특정 패키지를 지정하는 것이 권장된다.

ConcurrentKafkaListenerContainerFactory

  • @kafkaListener에서 사용할 Kafka 리스너 컨테이너 팩토리를 생성
  • 내부적으로 consumerFactory를 사용해 메시지를 수신하고 멀티스레드 환경에서도 안정적으로 동작하도록 구성

Dto - 메시지 객체

@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@ToString
public class MessageDto {
    private String sender;
    private String content;
}

Controller - 메시지 전송 API

@RestController
@RequiredArgsConstructor
@RequestMapping("/kafka")
public class KafkaController {

    private final KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public ResponseEntity<String> send(@RequestBody MessageDto message) {
        kafkaProducerService.send("test-topic", message);
        return ResponseEntity.ok("카프카로 메세지 전송 성공");
    }

}

📤Kafka Producer - 메시지 전송

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, MessageDto> kafkaTemplate;

    public void send(String topic, MessageDto message) {
        kafkaTemplate.send(topic, message);
        log.info("[Producer] 객체 전송 : " + message);
    }

}

📥Kafka Consumer – 메시지 수신

@Slf4j
@Component
public class KafkaConsumerService {

    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void consume(MessageDto message) {
        log.info("[Consumer] 객체 수신 : " + message);
    }

}

실행 화면

  • 메시지 전송 성공

  • 메시지 수신 로그

🔄️ 흐름도

  1. 사용자 → POST /kafka/send
  2. KafkaTemplate이 Kafka에 메시지 전송
  3. Kafka Broker가 메시지를 test-topic에 저장
  4. @KafkaListener가 메시지를 실시간으로 수신

📝배운점

Kafka 메시지는 기본적으로 Byte 배열로 전송되므로 객체를 전송하려면 직렬화/역직렬화 설정이 반드시 필요했다. KafkaTemplate는 Producer 역할을 하고 @KafkaListener는 Consumer 역할을 수행하고 각각의 동작을 위해 ProducerFactoryConsumerFactory의 설정이 중요하다는 걸 알 수 있었다.

또한 ConcurrentKafkaListenerContainerFactory를 활용하면 멀티스레드 기반의 병렬 메시지 소비가 가능해지고 시스템의 확장성과 안정성을 모두 챙길 수 있는 구조로 발전시킬 수 있다는 점에서 매우 유용하다고 느꼈다.

profile
개발자가 되고 싶은 취준생

0개의 댓글