[SpringBoot] 카프카와 스프링부트 연동

Taeho·2022년 6월 4일
1

SpringFramework

목록 보기
1/4
post-thumbnail

Kafka : 3.2.0
SpringBoot : 2.4.3

1. 카프카 다운로드

아파치 카프가 공식 홈페이지 에서 다운로드 (https://kafka.apache.org/downloads)

나는 리눅스에서 wget으로 다운로드, 설치는 압축을 풀면 끝

wget https://downloads.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz

2. 카프카 브로커(서버) 실행

  • 카프카는 주키퍼를 사용한다. 별도 설치된 주키퍼가 없다면 내장된 주키퍼를 사용할 수 있다. 설치된 카프카에는 단일 노드 주키퍼가 포함되어 있다.
  • 카프카 서버의 기본 포트는 9092번이다. 변경하려면 config 디렉토리에 있는 설정 파일을 수정하면 된다.

1) 주키퍼 서버 실행

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

2) 카프카 브로커(서버) 실행

./bin/kafka-server-start.sh ./config/server.properties

※ 1년 전에 처음 자료 정리하고 테스트할 때는 이상 없었는데, 오늘 다시 테스트 해보니 Consumer에서 "could not be established. Broker may not be available" 오류 메세지 출력 되었다.
→ server.properties에 advertised.listeners에 IP와 포트를 지정해서 문제 해결

3. 토픽 생성 테스트

1) 토픽생성

./bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2) 생성된 토픽 확인

./bin/kafka-topics.sh --list --zookeeper localhost:2181

3) 생성된 토픽 삭제

./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

4. SpringBoot with kafka 예제

4.1 Depandency 설정

build.gradle에 아래 의존성 추가

    // for apache kafka
    implementation 'org.springframework.kafka:spring-kafka'

4.2 application.peorperties에 kafka 설정 추가

spring:  
  kafka:
    bootstrap-servers:
      - 192.168.0.4:9092
    consumer:
      # consumer bootstrap servers가 따로 존재하면 설정
      # bootstrap-servers: 192.168.0.4:9092

      # 식별 가능한 Consumer Group Id
      group-id: testgroup
      # Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 존재하지 않을 경우 수행할 작업을 설정
      # latest: 가장 최근에 생산된 메시지로 offeset reset
      # earliest: 가장 오래된 메시지로 offeset reset
      # none: offset 정보가 없으면 Exception 발생
      auto-offset-reset: earliest
      # 데이터를 받아올 때, key/value를 역직렬화
      # JSON 데이터를 받아올 것이라면 JsonDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # producer bootstrap servers가 따로 존재하면 설정
      # bootstrap-servers: 3.34.97.97:9092

      # 데이터를 보낼 때, key/value를 직렬화
      # JSON 데이터를 보낼 것이라면 JsonDeserializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

1) spring.kafka.consumer

  • bootstrap-servers
    • kafka 클러스터에 대한 초기 연결에 사용할 IP 목록, 쉼표로 구분
  • group-id
    • Consumer는 Consumer Group이 존재하고, 유일하게 식별할 수 있는 Consumer Group ID
  • auto-offset-reset
    • Kafka 서버에 초기 offset이 없거나, 서버에 현재 offset이 더 이상 없는 경우 수행할 작업을 작성
    • Consumer Group의 Consumer는 메시지를 소비할 때 Topic 내에 Partition에서 다음에 소비할 offset이 어디인지 공유하고 있다. 그런데 오류 등으로 인해 offset 정보가 없어졌을 때 어떻게 offset을 reset 할 것인지를 명시
      • latest : 가장 최근에 생산된 메시지로 offset reset
      • earliest : 가장 오래된 메시지로 offset reset
      • none : offset 정보가 없으면 Exception 발생
  • key-deserializer / value-deserializer
    • Kafka에서 데이터를 받아올 때 key/value를 역직렬화 한다.
    • key, value는 KafkaTemplate의 key, value를 의미한다.
    • 메시지가 문자열 데이터이면 StringDeserializer를 사용하고, JSON 데이터면 JsonDeserializer도 가능하다.

2) spring.kafka.producer

  • bootstrap-servers
    • consumer.bootstrap-server와 동일하지만 producer 전용으로 오버라이딩 하려면 작성한다.
  • key-serializer / value-serializer
    • Kafka에 데이터를 보낼때 key/value를 직렬화 한다.

4.3 Kafka Producer 클래스

KafkaTemplate에 Topic명과 메시지를 전달한다. kafkaTemplate.send() 메서드가 실행되면 Kafka 서버로 메시지가 전송된다.

@Service
public class KafkaProducer {

    @Value(value = "${message.topic.name}")
    private String topicName;

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        System.out.println(String.format("Produce message : %s", message));
        this.kafkaTemplate.send(topicName, message);
    }
}

4.4 Kafka Consumer 클래스

Kafka로 부터 메시지를 받으려면 @KafkaListener 어노테이션을 달아주면 된다.

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "${message.topic.name}", groupId = ConsumerConfig.GROUP_ID_CONFIG)
    public void consume(String message) throws IOException {
        System.out.println(String.format("Consumed message : %s", message));
    }
}

4.5 Controller 클래스

@Slf4j
@CrossOrigin("*")
@RestController
@RequestMapping(value = "/kafka/test")
public class SampleController {
    private final KafkaProducer producer;

    @Autowired
    SampleController(KafkaProducer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/message")
    public String sendMessage(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
        return "success";
    }
}

4.6 Test

카프카와 스프링부트를 실행시키고 연동상태를 확인한 후 포스트맨에서 컨트롤러의 sendMessage를 호출하면 파라미터로 받은 메시지를 Producer로 카프카에 전송하고 Consumer에서 이를 수신하여 출력되는 것을 볼 수 있다.

※ Producer와 Consumer 설정을 application.properties에 작성했지만 설정을 여러 개로 관리하고 싶다면 Bean으로 구현하는 것이 좋은 방법이다

참고자료

https://victorydntmd.tistory.com/348

kafka 에서 Connection to node -1 (/ip:port) could not be established. Broker may not be available. 해결방안을 알아보자!

profile
한걸음 더 내딛는 개발자

1개의 댓글

comment-user-thumbnail
2023년 1월 9일

안녕하세요. 덕분에 초기 설정을 쉽게 했습니다.
한 가지 말씀드릴게 있다면

"4.4 Kafka Consumer 클래스"에서
@KafkaListener(topics = "${message.topic.name}", groupId = ConsumerConfig.GROUP_ID_CONFIG)

이렇게 작성하셨는데요. 이렇게하면 Consumer group id가 application.yml에서 설정한 testgroup이 아니라 group.id로 뜨게 됩니다.
"groupId = ConsumerConfig.GROUP_ID_CONFIG"를 지우니 testgroup으로 나오네요.

"ConsumerConfig.GROUP_ID_CONFIG"는 "group.id" 값을 가진 상수로 보이는데 혹시 이렇게 설정하신 이유가 있을까요?

좋은 글 작성해주셔서 정말 감사합니다.

답글 달기