Kafka Pub/Sub

hoyong.eom·2024년 10월 22일
0

스프링

목록 보기
59/59
post-thumbnail

Kafka

이전 포스팅에서 간단한 Redis를 이용한 Pub/Sub 예제를 구현해보았는데, 오늘은 Kafka를 이용한 간단한 Pub/Sub을 구현해보려고 한다.

이 포스팅에서는 Kafka에 대한 자세한 설명은 하지 않고, 간단한 설명만 적어놓으려고 한다.

Kafka는 기본적으로 프로듀서, 브로커, 토픽, 컨슈머로 구성되어 사용된다.

프로듀서(Producer)
프로듀서는 Kafka에 메시지를 발생하는 역할을 수행하는 컴포넌트이다.
프로듀서는 다양한 데이터소스(DB 혹은 외부로부터의 요청)로부터 데이터를 가져와 kafka의 특정 토픽에 메시지를 발행한다.

브로커(Broker)
브로커는 Kafka의 핵심 서버 컴포넌트로 프로듀서로부터 메시지를 받아서 저장하고, 컨슈머에게 메시지를 전달하는 역할을 한다.
Kafka 클러스터는 여러 브로커들로 구성되며, 각 브로커는 하나 이상의 토픽의 메시지를 저장하고 관리한다.
(브로커 안에 여러개의 토픽이 관리된다.)

토픽(Topic)
토픽은 Kafka에서 데이터를 분류하는 단위이다.
프로듀서는 메시지를 특정 토픽에 발생하고, 컨슈머는 토픽을 구독하여 메시지를 소비한다.(Redis의 채널과 동일하다.) 토픽은 여러 파티션으로 나뉘어질 수 있고, 이를 통해 데이터를 병렬로 처리할 수 있다.
각 파티션은 순서가 보장된 메시지 스트림을 제공하며, 브로커가 클러스터 내에서 파티션을 분산하여 저장한다.

컨슈머(Consmer)
컨슈머는 Kafka의 특정 토픽을 구독하고, 해당 토픽의 메시지를 소비하는 역할을 하는 컴포넌트이다.
컨슈머는 하나 이상의 토픽을 구독할 수 있고, 토픽의 파티션을 동시에 소비할 수 있다.

📌 Kafka Pub/Sub

전체 소스는 아래의 Git Repository에 등록되어 있다.

github
https://github.com/hoyo1744/kafka-pub-sub

Kafka Pub/Sub을 구현하기 위한 소스들을 분석해보자.

Consumer Config

KafkaConsumerConfig

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    /**
     * Kafka Consumer 생성 팩토리
     * @return
     */
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100000);

        return new DefaultKafkaConsumerFactory<>(config);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
  • @EnableKafka : Spring Kafka를 사용하는 데에 필요한 Kafka Consumer 관련 기능들을 활성화하는데 사용한다.
    Consumer 기능을 활성화하는데 사용되므로 일반적으로 ConsumerConfig에 붙여준다.

  • ConsumerFactory : ConsumerFactory는 Spring Kafka에서 Kafka Consumer 인스턴스를 생성하기 위한 팩토리 인터페이스다.
    즉, Kafka Consumer 를 생성하고 구성하는데 있어서 ConsumerFactory를 사용한다.

  • Spring Kafka(이전에는 Kafka Client를 사용했었다.)에서는 Consumer 생성 및 프로퍼티를 설정할 수 있도록 도와준다.
    즉, ConsumerFactory를 이용해서 Kafka 클러스터의 브로커 정보, 그룹 ID, 키 및 값의 직렬화/역직렬화 방법을 구성할 수 있다.
    이렇게 구성된 ConsumerFactory는 Spring Kafka에 의해 관리되어 애플리케이션이 필요할 때 마다 KafkaConsumer를 만들어 제공한다.
    ConsumerFactory는 Spring Kafka에서 org.springframework.kafka.core.ConsumerFactory을 구현하고 있으며, 기본적으로 DefaultKafkaConsumer를 사용한다.

  • ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG : Consumer가 연결한 브로커의 위치를 설정한다.

  • ConsumerConfig.GROUP_ID_CONFIG : Consumer 그룹을 지정한다.
    Consumer 그룹을 지정하는 이유는 (https://jhleed.tistory.com/180) 참고하자.

  • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class : 메시지 키를 바이트배열에서 문자열로 역직렬화하도록 지정한다.(원래는 바이트 배열이다.)

  • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class : 메시지 값을 바이트배열에서 문자열로 역직렬화하도록 지정한다.

  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG : Consumer가 반환만 메시지의 오프셋을 Kafka에 주기적으로 커밋합니다. 커밋된 오프셋은 Consuming의 시작 위치로 사용됩니다.

  • ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG : Kafka에 초기 오프셋이 없거나

  • KafkaListenerCotainerFactory : Spring Kafka에서는 메시지 수신을 위해서 제공하는 방식중 하나인 @KafkaLisnter는 기본설정으로 KafkaLisnterContainerFactory으로 등록된 Bean으로 ConcurrentMessageListener를 생성하여 메시지를 수신한다.

Message Listener Containers
MessageListenerContainer의 구현체로는 2가지가 제공된다.

  • KafkaMessageListenerContainer: 싱글 스레드에서 설정 토픽 또는 파티션의 모든 메시지를 수신한다.
  • ConcurrentMessageListenerContainer : 하나 이상의 KafkaMessageListenerContainer 인스턴스를 제공하여 멀티스레드로를 제공한다.

Producer Config

@Configuration
public class KafkaProductConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());

    }
}
  • ProducerFactory : Producer 객체 생성 팩토리
  • KafkaTemplate : 메시지 전달 개체

    Message 전송 방식

  • 동기 전송 방식(Sync) : kafakTemplate.send().get();
  • 비동기 전송 방식(ASync) : CompletableFuture future = kafkaTemplate.send();

Send

@Service
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessage(String topicName, String message) {
        kafkaTemplate.send(topicName, message);
    }
}
  • kafkaTemplate.send() : kafkaTemplate를 이용해 메시지를 토픽에 전송한다. 기본적으로 KafKa Producer는 비동기 메시지 전송 방식을 택한다.

send() 내부 구현

	@Override
	public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
		ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
		return observeSend(producerRecord);
	}

Listenr

@Service
@RequiredArgsConstructor
public class KafkaConsumerService {

    @KafkaListener(topics = "topic", groupId = "group_1")
    public void listen(String message) {
        System.out.println("Received Message in group group_1 : " + message);
    }
}
  • KafkaListenr : KafKaListenrContainerFactory를 통해서 지정한 COnsumer가 KafkaListenr 애노테이션을 통해서 지정한 그룹과토픽에 맞춰 호출된다.

📌 참고

https://medium.com/@underwater2/spring-boot%EC%97%90%EC%84%9C-kafka-cluster-%EC%82%AC%EC%9A%A9%ED%95%98%EC%97%AC-pub-sub-%EC%98%88%EC%A0%9C-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0-7c701f8ca0c4
https://min9yu98.tistory.com/24
https://dnl1029.tistory.com/44
https://dkswnkk.tistory.com/705
https://hoehen-flug.tistory.com/42
https://velog.io/@youmakemesmile/Spring-KafkaKafka-Consumer-%EC%A0%95%EB%A6%AC

0개의 댓글