Docker Kafka 구동 및 SpringBoot와 연동

정종일·2023년 4월 11일
0

Spring

목록 보기
6/18

Kafka-docker Clone

git clone https://github.com/wurstmeister/kafka-docker

위 명령어로 kafka-docker 폴더를 clone 하면

이렇게 폴더가 생성된다.

오늘은 해당 폴더 내 docker-compose-single-broker.yml 파일을 통해 kafka 토픽을 생성하고 메시지를 produce, consume 해보자.

docker-compose 설정 및 실행

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

위와 같이 local 환경에서 kafka와 zookeeper를 구동하는 컴포즈 파일 설정을 마치고

clone 받은 kafka-docker 폴더에서

docker-compose -f docker-compose-single-broker.yml up -d

명령어를 실행해 docker 컨테이너를 구동한다.

이후 docker ps -a 명령어를 통해 컨테이너가 run중인지 확인하자.

잘 구동되고 있다.

kafka topic 생성

구동까지 했다면 이제 토픽을 만들어보자.

docker exec -it kafka bash

위 명령어로 kafka 내부로 들어간 후

kafka-topics.sh --zookeeper localhost --create --topic 0406M --replication-factor 1 --partitions 1

이렇게 0406M 이라는 topic을 생성해준다.

Created topic {{topic name}}메시지가 나오면 topic 생성 완료

SpringBoot Kafka 연동

개발 환경

  • Java 11
  • SpringBoot 2.7.10
  • gradle 7.6.1

1. Kafka 의존성 추가

dependencies {
...

	implementation 'org.springframework.boot:spring-boot-starter-web'
	// kafka
	implementation 'org.springframework.kafka:spring-kafka'

...
}

2. application.yml 설정

server:
  servlet:
    context-path: /api

spring:
  kafka:
		// 로컬 환경
    bootstrap-servers: localhost:9092
    consumer:
      # 식별 가능한 Consumer Group Id
      group-id: test0406
      # Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
      # latest: 가장 최근에 생산된 메시지로 offeset reset
      # earliest: 가장 오래된 메시지로 offeset reset
      # none: offset 정보가 없으면 Exception 발생

      auto-offset-reset: earliest
      # 데이터를 받아올 때, key/value를 역직렬화
      # JSON 데이터를 받아올 것이라면 JsonDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    producer:
      # producer bootstrap servers가 따로 존재하면 설정
      # 데이터를 보낼 때, key/value를 직렬화
      # JSON 데이터를 보낼 것이라면 JsonDeserializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

logging:
  level:
    org.hibernate.sql: debug
    root: info

3. Controller, Producer, Consumer 설정

KafkaController

@RestController
@RequiredArgsConstructor
@RequestMapping("/kafka")
public class KafkaController {

    private final KafkaProducer producer;

    @PostMapping("send")
    public String sendMessage(@RequestParam("message") String message) {
        producer.sendMessage(message);

        return "success";
    }
}

KafkaProducer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {

    private static final String TOPIC = "0406M";
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        log.info(String.format("Produce message: %s", message));
        kafkaTemplate.send(TOPIC, message);
    }
}

KafkaConsumer

@Service
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "0406M", groupId = "test0406")
    public void consume(String message) throws IOException {
        log.info(String.format("Consumed message : %s", message));
    }
}

4. Server Run

0406M 이라는 topic을 미리 생성해놓았기 때문에 정상적으로 작동되는 것을 볼 수 있다.

만약 에러가 발생한다면 해당 topic이 존재하지 않아 발생하는 에러일 가능성이 크다.

5. Postman을 통한 test

postman을 통해 메시지를 produce 해보았다.

success가 정상적으로 나온다.

springBoot로 돌아와보면

정상적으로 produce 되고

consume도 정상적으로 되는 것을 확인할 수 있다.

마치며

예전 프로젝트를 진행할 때 STOMP와 함께 kafka로 채팅 시스템을 개발한 적이 있었는데, 당시에 메시지 produce는 정상적으로 되지만 consume이 정상적으로 되지 않을 때 해당 부분에 대한 처리를 어떻게 해야할 지 몰라 많이 헤맸었던 기억이 난다. 이 부분에 대한 자료도 찾아보아야겠다.

profile
제어할 수 없는 것에 의지하지 말자

0개의 댓글