Spring + Kafka 사용법

김형준·2025년 3월 12일
0

사용법

의존성 추가

implementation 'org.springframework.kafka:spring-kafka'

Kafka + Zookeeper Docker Compose

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Producer

@Configuration
public class KafkaProducerConfig {
 
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
        return new DefaultKafkaProducerFactory<>(config);
    }
 
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.easytable.reservation.dto.request.ReservationCreateReqDto;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Repository;

import java.util.concurrent.TimeoutException;

@Repository("kafka")
@RequiredArgsConstructor
@Slf4j
public class KafkaMessagePublisherImpl implements MessagePublisher {
    private static final int MAX_ATTEMPTS = 5;
    private static final long RETRY_DELAY_MS = 500;

    private final KafkaTemplate<String, ReservationCreateReqDto> kafkaTemplate;

    @Value("${kafka.topic.reservation:create-reservation}")
    private String topic;

    @Override
    public void publish(ReservationCreateReqDto dto) throws TimeoutException {
        int attempt = 0;
        while (attempt < MAX_ATTEMPTS) {
            try {
                kafkaTemplate.send(topic, dto.getRequestId(), dto).get();
                return;
            } catch (Exception e) {
                attempt++;
                sleepThread();
            }
        }
        throw new TimeoutException("Kafka 메시지 전송 실패");
    }

    private void sleepThread() {
        try {
            Thread.sleep(RETRY_DELAY_MS);
        } catch (InterruptedException e) {
            System.out.println("thread interrupted");
        }
    }
}

Consumer

@Configuration
public class KafkaConsumerConfig {
 
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
        return new DefaultKafkaConsumerFactory<>(config);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
 
        return factory;
    }
}

@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaReservationListener {

    private final SinksRegistry sinkRegistry;
    // 실제 예약 생성 로직을 담당하는 서비스 (DB 저장 등)
    private final ReservationService reservationProcessingService;

    @KafkaListener(topics = "${kafka.topic.reservation:create-reservation}", groupId = "reservation-group")
    public void listen(ReservationCreateReqDto request) {
        try {
            ReservationCreateResDto response = reservationProcessingService.createReservation(request);
            // SinkRegistry를 통해 요청을 보낸 스레드에 결과 전송
            sinkRegistry.completeSink(request.getRequestId(), response);
        } catch (Exception e) {
            log.error(e.getMessage());
            sinkRegistry.completeSinkExceptionally(request.getRequestId(), e);
        }
    }
}

의문점 정리

  1. Producer 측면
    1. Kafka에서는 add 시 maxlen을 신경쓰지 않아도 되는가?

      Kafka에서는 add 시 maxlen을 지정하지 않는다.

      전체 로그 크기, 보존 기간 등은 Topic 단위의 보존 정책에 따라 관리된다.

      결론적으로 Producer는 maxlen에 신경 쓸 필요 없다.

    2. 그렇다면 Topic을 어떻게 관리할 수 있는가?

      KafkaAdmin 빈을 통해 애플리케이션 시작 시 Topic을 자동으로 생성할 수 있다.

      @Bean
      public NewTopic topic() {
          Map<String, String> configs = new HashMap<>();
          configs.put("retention.ms", "604800000"); // 예: 7일 보존
          return TopicBuilder.name("create-reservation")
                  .partitions(3)
                  .replicas(1)
                  .configs(configs)
                  .build();
      }

      다만 이미 존재하는 토픽의 설정을 변경하는 것은 Spring이 아닌 Kafka CLI와 같은 도구에서 관리하는 것이 일반적이다.

  1. Consumer 측면
    1. Spring에서 Kafka 사용 시 Message에 ACK를 보내 관리하지 않아도 되는가?

      Spring Kafka는 기본적으로 컨테이너가 ACK 전송 등 오프셋을 자동으로 관리하는 auto-commit 기능을 제공한다.

      필요 시에는 수동으로도 commit 가능하다.

    2. 설정 코드를 통해 consumer 클래스들을 각 group에 subscribe되도록 명시하지 않아도 되는가?

      @KafkaListener, groupId 설정을 통해 consumer들이 자동으로 해당 group에 등록된다.

      또한 같은 groupId를 갖는 소비자들을 하나의 그룹으로 자동 인식하기에 별도의 group 생성 명령이 필요하지 않다.

    3. 여러 Kafka consumer 스레드들을 어떻게 동시에 작동시킬 수 있는가?

      ConcurrentKafkaListenerContainerFactory.setConcurrency() 메서드를 사용해 동시에 실행될 consumer thread 수를 조절할 수 있다.

      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, ReservationCreateReqDto> kafkaListenerContainerFactory() {
          ConcurrentKafkaListenerContainerFactory<String, ReservationCreateReqDto> factory =
              new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(consumerFactory());
          factory.setConcurrency(3); // 예: 3개의 스레드로 병렬 처리
          return factory;
      }

0개의 댓글

관련 채용 정보