[Apache Kafka] Docker 환경에서 Spring boot + Kafka 연동하기

목포·2022년 7월 4일
4

Apache Kafka

목록 보기
3/3

Kafka를 스터디하는 김에 Spring Application과 이를 연동하여 메세지를 주고 받는 애플리케이션을 간단하게 작성해보려 한다.

세팅 환경

  • ubuntu 18.04 LTS
  • docker
  • docker-compose
  • kafka (docker img : wurstmeister/kafka:2.12-2.5.0)

Docker로 Kafka 설치하기

docker-compose.yml

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

KAFKA_ADVERTISED_HOST_NAME은 카프카 클라이언트와 브로커가 통신을 하기 위해 처음으로 client가 부여받는 ip로 kafka client가 broker에게 통신 요청을 하고 이를 부여받으면 해당 ip 연결하여 사용 여부를 검사할 수 있다.

근데 만약 하나의 서버에 kafka client와 broker가 함께 설치 되어있다면 굳이 adv_host를 private ip로 부여할 필요 없이 localhost로 지정하면 그만이다.
나는 가상머신에 client와 broker를 1대 씩 설치하여 테스트할 것이기 때문에 일단 이 옵션을 127.0.0.1로 설정할 것이다.

container 실행

$ docker-compose up -d
.
.
.
.
.
Creating kafka		 	... done
Creating zookeeper 	... done

다음 명령어로 확인해보면 컨테이너가 잘 실행된 것을 볼 수 있다.

$ docker ps
CONTAINER ID   IMAGE                           COMMAND                  CREATED       STATUS       PORTS                                                                   NAMES
7c829867de43   wurstmeister/kafka:2.12-2.5.0   "start-kafka.sh"         2 hours ago   Up 2 hours   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                               kafka
a03d6d6df04f   wurstmeister/zookeeper          "/bin/sh -c '/usr/sb…"   2 hours ago   Up 2 hours   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp   zookeeper

kafka server.properties 변경

다음은 kafka 서버의 설정을 약간 수정해줄 것이다. console로 테스트할 때는 딱히 문제가 없지만 여러 테스트를 용이하게 하기 위해서이다.
먼저, kafka 컨테이너로 접속한다.

$ docker exec -it kafka /bin/bash
$ bash# cd /opt/kafka/config
$ bash# vi server.properties

server.properteis
먼저 Socket Server Setting 부분에 listeners와 advertised.listeners의 주석 처리를 해제하고 후자에는 공인 ip를 작성한다.

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://공인ip:9092


delete.topic.enable=true
auto.create.topics.enable=true

delete.topic.enable 설정은 토픽을 삭제 할 수 있게 하기 위함이고 auto.create.topics.enable 설정은 토픽이 존재하지 않은 상태에서 토픽을 찾으려 할 때 자동으로 이를 생성해주는 옵션이다.
자세한 것은 여기에 기술해두었다.

그리고 이제 exit로 컨테이너를 나와 다시 재기동 시켜주면 된다. restart를 쓰면 kafka 버그 때문에 바로 안 올라가는 경우가 있으니 당황하지말고 stop 후 좀 대기하여 start로 재기동해보자.

$ docker stop kafka 
$ docker start kafka

Kafka 테스트

먼저 세팅한 kafka 시스템이 잘 동작하는지 테스트 해보자.
일단 메세지를 보낼 토픽을 생성해보자.

$ bash# cd /opt/kafka/bin
$ bash# kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic exam-topic

위 명령어는 토픽의 각 파티션을 복제하는 replication-factor를 1개 파티션 수를 1개로 지정하여 토픽을 생성하겠다는 의미이다.

또, 카프카 분산 시스템은 커맨드라인으로 메세지를 전송할 수 있는 유틸을 제공한다. kafka-console-producer와 consumer로 메세지가 오고가는 것을 확인할 수 있다.
먼저 Terminal1에서 producer를 실행하여 메세지를 입력하고

Test Terminal1

$ bash# kafka-console-producer.sh --topic exam-topic --broker-list localhost:9092
> This is a msg;

다른 터미널을 하나 켜서 consumer를 실행하면 전송한 메세지가 들어오는 것을 확인할 수 있다.

Test Terminal2

$ bash# kafka-console-consumer.sh --topic exam-topic --bootstrap-server localhost:9092 --from-beginning
This is a msg;

Spring Application 연동

이제 Kafka가 제대로 동작하는 것을 확인했으니 메세지를 전송하고 받는 application을 만들어보자.

개발 환경

gradle 6.8.3
spring boot 2.7.1

dependency

당연히 먼저 kafka 연동을 위한 의존성을 추가해야한다. 간단한 로그를 위해 lombok도 추가했다.

implementation 'org.springframework.boot:spring-boot-starter-web'

implementation 'org.springframework.kafka:spring-kafka'

compileOnly 'org.projectlombok:lombok'

application.yml

공식문서

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

그 다음은 application과 kafka의 연동 설정을 위해 yml 파일을 작성한다.

  • spring.kafka

    • bootstrap-servers

      • kafka 서버와 연결할 호스트와 포트 정보이다.
      • 만약 producer와 consumer가 다른 서버에 있다면 spring.kafka.consumer(또는 producer).bootstrap-servers으로 설정하면된다.)
        - 이 때, 글로벌 설정 정보가 있어도(spring.kafka.bootstrap-servers) consumer 전용으로 오버라이딩한다.
    • consumer

      • group-id

        • consumer group을 구별하기 위한 유니크한 id 작성

          • [참고] Consumer Group란?

            - 컨슈머 인스턴스들을 대표하는 그룹
          • Consumer Instance란?
            - 하나의 프로세스 또는 하나의 서버라고 할 수 있다.
          • Offset이란?
            - 파티션안에 데이터 위치를 유니크한 숫자로 표시한 것을 offset이라 부르고, 컨슈머는 자신이 어디까지 데이터를 가져갔는지 offset을 이용해서 관리한다.
        • auto-offset-reset

          • Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 없는 경우 수행할 작업을 작성한다.

            - earliest : 가장 오래된 메세지로 offset reset
            - latest : 가장 최근에 생산된 메세지로 offset reset
            - none : offset 정보가 없으면 Exception을 발생
        • key-deserializer/value-desrializer

          • Kafka에서 consumer가 데이터를 받아올 때 key/value를 역직렬화한다.
          • 우리가 주고 받는 데이터의 형태는 String이므로 StringDeserializer를 사용했다.
    • producer

      • key-deserializer/value-desrializer

KafkaConfiguration.java

데이터 전송을 위한 KafkaProducerBean을 만든다. 그리고 이 ProducerFactory를 이용하는 KafkaTemlate을 통해 데이터를 send하면 된다.
만약 여러 데이터 타입을 전송하고 싶을 때는 Producer Bean을 여러 개 정의하면 될 듯 하다.

참고로 환경변수 파일을 통해 Kafka를 연동할 때 직렬화 방식을 지정하지 않으면 SpringSerializer가 default로 설정된다.
하지만, java bean으로 설정 시 key, value의 직렬화 정보를 지정하지 않으면 default 값이 자동으로 구성되지 않으니 에러가 발생하지 않도록 조심하자.

@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {

        Map<String,Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory(configs);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaProducer.java
Configuration 파일에서 정의한 kafkaTemplate 빈을 이용해 “exam-topic”이라는 이름의 토픽을 전송할 수 있도록 메서드를 정의한다.

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
    private static final String TOPIC = "exam-topic";

    @Autowired
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        log.info("Produce message : {}", message);
        this.kafkaTemplate.send(TOPIC, message);
    }
}

KafkaConsumer.java
다음은 exam-topic이라는 이름의 토픽이 전달됐다면 해당 메세지들을 가져올 수 있게끔 하는 KafkaListener를 달아 정의해주면 된다.

@Service
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "exam-topic", groupId = "foo")
    public void consume(String message) throws IOException {
        log.info("Consumed message : {}", message);
    }
}

KafkaController.java

@RestController
@RequestMapping(value = "/kafka")
@Slf4j
public class KafkaController {

    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer){
        this.producer = producer;
    }


    @PostMapping
    @ResponseBody
    public String sendMessage(@RequestParam String message) {
        log.info("message : {}", message);
        this.producer.sendMessage(message);

        return "success";
    }
}

테스트

이제 애플리케이션을 구동시켜 작성한 코드가 잘 돌아가는지 확인해보자.
먼저 terminal로 kafka-console-consumer.sh 을 동작시키고

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic exam-topic --from-beginning

api를 호출해본다.

그러면 terminal에 consumer가 끌어온 메세지 내용들이 쭉 찍히고

애플리케이션에서도 찍은 로그에 주고 받은 메세지 내용이 찍힌 것을 확인할 수 있다.


참고

More Kafka3-1. 카프카 프로듀서 · 앞으로 점프
SpringBoot Kafka 연동하기 :: victolee
#03 Spring boot & Kafka 연동하기

profile
mokpo devlog

0개의 댓글