Kafka 기본 이론

깃헙에 있는 정리본은 참고해주세요!

https://github.com/namusik/TIL-SampleProject/tree/main/Kafka

소스코드

https://github.com/namusik/TIL-SampleProject/tree/main/Kafka/Sample%20Project

작업환경

IntelliJ
Spring-boot
java 11
gradle

dependency

spring web
spring for Apache Kafka

application.yml

!!Producer/Consumer 설정을 bean을 통해서도 가능. 설정을 여러개로 하고 싶다면 bean 사용

  spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: treesick
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
  • consumer
    • bootstrap-servers : 카프카 서버의 IP:포트번호
    • group-id : 컨슈머그룹 이름 만들어주기. 원하는거 아무거나!
    • auto-offset-reset : 컨슈머 그룹에서 소비할 offset 정보가 없을 때, 어떻게 offset을 리셋할 것인지에 대한 설정.
      - latest : 가장 최근 메시지로
      - earliest : 가장 오래된 메시지로
      - none : offset 정보가 없으면 Exception 발생시
    • key, value-deserializer : 카프카에서 메세지를 받아올 때, 역직렬화 설정. String이면 StringDeserializer, JSON형식을 받아오면 JsonDeserializer.
      kafkaTemplate의 key-value를 말함.
  • producer
    • bootstrap-servers : 상동
    • key,value-deserializer : 반대로 카프카에 데이터를 보낼 때 하는 직렬화 설정.

KafkaProducerService

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafakaProducerService {
    private static final String TOPIC = "treesick";
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        System.out.println("produce message = " + message);
        kafkaTemplate.send(TOPIC, message);
    }
}

프로듀서 서비스클래스
kafkaTemplate.send를 통해 kafka 서버로 토픽과 메시지를 보냄.

KafkaConsumerService

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "treesick", groupId = "namu")
    public void consume(String message) throws IOException {
        System.out.println("consume message = " + message);
    }

}

Consumer 서비스클래스
@KafkaListener 어노테이션을 달아서 토픽과 컨슈머그룹아이디 정보를 써줌
메시지가 도달하면 출력을 하도록 코딩

KafkaController

import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.sample.kafka.service.KafkaProducerService;

@RestController
@RequiredArgsConstructor
public class KafkaController {
    private final KafkaProducerService producerService;

    @PostMapping("/kafka")
    public String sendMessage(@RequestParam("message") String message) {
        producerService.sendMessage(message);

        return "success";
    }
}

kafka에 메시지를 전달할 수 있도록 restApi를 만들어 주기

실행결과

Postman을 사용해서 api를 실행시키면

성공시 "success"를 반환하고

인텔리제이에는 컨슈머가 받은 메시지인 "hello"를 출력해주면 성공

이번 연습예제에서는 String 형태의 메시지를 주고 받는 연습을 했는데 보통의 프로젝트에서는 JSON형태의 객체를 주고 받을테니 다음에는 JSON메시지를 주고받는 예제를 만들어 보겠습니다~!

참고

https://victorydntmd.tistory.com/348
https://oingdaddy.tistory.com/308

profile
깃헙에 올린 예제 코드의 설명을 적어놓는 블로그

0개의 댓글

관련 채용 정보

Powered by GraphCDN, the GraphQL CDN