[Spring Boot/Kafka] 카프카 브로커 실습

simhani1·2025년 9월 11일

Backend

목록 보기
6/6
post-thumbnail

환경

  • Spring Boot(producer)
  • Spring Boot(consumer)
  • Kafka(Docker)

Kafka docker-compose.yml

version: '3.8'

services:
  zookeeper:
    image: bitnami/zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"  # Zookeeper는 Kafka를 위한 메타데이터 저장소 역할
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    image: bitnami/kafka:3.5.1
    container_name: kafka
    ports:
      - "9092:9092"  # Kafka 브로커가 외부로 노출되는 포트
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181  # Zookeeper 연결 설정
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092  # 내부용 Listener
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092  # 외부용 Listener
      - ALLOW_PLAINTEXT_LISTENER=yes  # 인증 없이 허용 (개발용)
      - KAFKA_ENABLE_KRAFT=no  # KRaft 모드 비활성화 → Zookeeper 기반 Kafka 사용
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false   # 토픽 자동 생성 끄기
    depends_on:
      - zookeeper  # Kafka는 Zookeeper가 먼저 실행되어야 함

이때 KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false 값을 지정하지 않으면 기본적으로 true가 설정됩니다. true라면 프로듀서가 메시지를 특정 토픽으로 전송할 때 토픽을 자동으로 생성합니다. 자동으로 토픽이 생성되면 파티션과 레플리케이션이 모두 1로 설정되어 실제 운영 환경에서는 주의해서 사용해야 합니다.

현재 컨슈머가 한 개이므로 partitions를 1개로 지정했습니다. 또한 브로커도 한 개이므로 replication-factor도 1로 지정하여 토픽을 생성했습니다.

## 1. 터미널에서
docker exec -it kafka kafka-topics.sh \
  --create \
  --bootstrap-server localhost:9092 \
  --topic demo-topic \
  --partitions 1 \
  --replication-factor 1
  
## 2. 컨테니어 접속 후
/opt/bitnami/kafka/bin/kafka-topics.sh \
  --create \
  --bootstrap-server localhost:9092 \
  --topic demo-topic \
  --partitions 1 \
  --replication-factor 1

아래 명령어를 사용하면 토픽 정보를 조회할 수 있습니다.

## 1. 터미널에서
docker exec -it kafka kafka-topics.sh \
  --describe \
  --bootstrap-server localhost:9092 \
  --topic demo-topic  

## 2. 컨테니어 접속 후
/opt/bitnami/kafka/bin/kafka-topics.sh \
  --describe \
  --bootstrap-server localhost:9092 \
  --topic demo-topic

Producer 애플리케이션

  • config
@Configuration
public class KafkaProducerConfig {

	private final String BOOTSTRAP_ADDRESS = "localhost:9092";

	@Bean
	public ProducerFactory<String, String> producerFactory() {
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return new DefaultKafkaProducerFactory<>(configProps);
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}
  • application.yml
server:
  port: 8080
spring:
  application:
    name: kafka
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      acks: all
      properties:
        enable.idempotence: true
        delivery.timeout.ms: 120000 
        retry.backoff.ms: 100
        max.in.flight.requests.per.connection: 5

akcs 값을 all로 설정하면 멱등성이 보장되어 중복 메시지 전송을 방지할 수 있습니다.

  • controller
@Slf4j
@RestController
@RequiredArgsConstructor
public class MessageController {

	private final KafkaTemplate<String, String> kafkaTemplate;
	private final DeadLetterPublisher deadLetterPublisher;

	private static final String TOPIC = "demo-topic";
	private static final String DLT_TOPIC = TOPIC + "-dlt";

	@PostMapping("/send")
	public void kafkaSendMessage(@RequestBody SendReq req) {
		String message = req.getMessage();
		try {
			CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);
			future.exceptionally(ex -> {
				log.error("Unable to send message=[ {} ] due to: {}", message, ex.getMessage());
				return null;
			});
		} catch (Exception e) {
			log.error("Unable to send message=[ {} ] due to: {}", message, e.getMessage());
		}
	}
}

간단히 엔드포인트를 만들고 메시지를 전송하도록 구현했습니다. 그리고 kafkaTemplate을 주입받아 send() 메서드로 메시지를 전송합니다.

Consumer 애플리케이션

  • config
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

	private final String BOOTSTRAP_ADDRESS = "localhost:9092";
	private final String GROUP_ID = "group1";

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return new DefaultKafkaConsumerFactory<>(props);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}
}

@EnableKafka 어노테이션은 @KafkaListener가 붙은 스프링 빈을 스캔할 수 있도록 합니다. 여기서 GROUP_ID는 컨슈머 그룹을 지정하는 것이며 카프카는 컨슈머 인스턴스의 그룹ID를 기준으로 컨슈머 그룹으로 묶습니다. 그리고 토픽 내부 파티션을 컨슈머 그룹에 속한 컨슈머와 맵핑합니다. 만약 파티션이 3개, 컨슈머 그룹에 컨슈머가 2개라면 하나의 컨슈머는 파티션 두 개를 담당하고 나머지 컨슈머는 파티션 하나만 연결됩니다. 따라서 컨슈머 개수가 파티션 개수 이하가 되도록 아키텍처를 구성해야 합니다.

Annotation Interface EnableKafka

  • application.yml
server:
  port: 8081
spring:
  application:
    name: kafka
  kafka:
    consumer:
      bootstrap-servers: localhost:9092

akcs 값을 all로 설정하면 멱등성이 보장되어 중복 메시지 전송을 방지할 수 있습니다.

  • consumer
@Slf4j
@Component
public class Consumer {

	private final String TOPIC = "demo-topic";
	private final String GROUP_ID = "demo-group";

	@KafkaListener(topics = TOPIC, groupId = GROUP_ID)
	public void consume(String message) {
		log.info("[{}] 브로커로부터 메시지 소비, 내용={}", this.getClass().getSimpleName(), message);
	}
}

카프카의 세부 내용

0개의 댓글