[kafka] kafka with docker

rejs·2024년 6월 7일

Spring Kafka를 시작하게 되었다

이 글에서는 docker-compose로 kafka 실행시키고 spring과 연결되는 지까지만 확인할 예정이다.

Kafka With Docker-compose.yml

confluent Docs QuickStart

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

apache 공식으로도 kafka이미지가 있긴 한데 일단 confluentinc/cp-kafka를 사용하기로 했다

kafka 커넥터라던가... 스키마 레지스트리라던가... 추가할 게 많다

kafka 명령어

docker exec -it kafka /bin/bash

저기 있는 kafka는 container 이름이다. 저 명령어를 사용해서 kafka 컨테이너에 접근하도록 하자

당장 알아야하는 kafka 콘솔은 4가지이다.

토픽 생성
/bin/kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 
토픽 목록
/bin/kafka-topics --bootstrap-server localhost:9092 --list
토픽에 메시지 추가
/bin/kafka-console-producer --bootstrap-server localhost:9092 --topic test
토픽의 메시지 받기 (--from-beginning = 메시지를 처음부터 받기)
/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

apache의 공식 kafka이미지를 사용하는 경우 /opt/kafka/bin에 있고

confluentinc/cp-kafka 이미지를 사용하는 경우 /bin아래 다 있다.

Spring kafka

spring:
    kafka:
        bootstrap-servers:
            - localhost:9092
        producer:
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
            group-id: test

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'

이런 에러가 발생했다면 serializer가 제대로 되었는지 확인해주세요.

Caused by: java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

이 에러가 발생했다면 consumer.group-id를 추가해주세요.

kafka producer

@Slf4j
@RequiredArgsConstructor
@Component
public class TestKafkaProducer {
    private final KafkaTemplate<String , String > kafkaTemplate;

    public void sendMessage(String topic, String message){
        kafkaTemplate.send(topic, message);
        log.info("topic : {}, message : {}", topic, message);
    }
}

테스트

@ActiveProfiles("test")
@SpringBootTest
class TestKafkaProducerTest {
    @Autowired private TestKafkaProducer testKafkaProducer;

    @Test
    public void sendMessage(){
        testKafkaProducer.sendMessage("test", "delete all");
    }
}

kafka 컨테이너에 exec로 접근해서 test topic에 delete all 메시지가 있는지 확인해봅시다.

kafka consumer

@Slf4j
@Component
public class TestKafkaConsumer {
    @KafkaListener(topics="test")
    public void consume(String message){
        log.info("Received message : {}", message);
    }
}

kafka-console-producer로 메시지를 남긴 뒤 제대로 spring 서버가 제대로 읽어오는 지 확인해봅시다

0개의 댓글