카프카로 객체 주고받기

최창효·2023년 12월 18일
0

치팅시트

목록 보기
3/3
post-thumbnail

주고받는 객체가 maven repository에 등록되어 있어야 합니다.

Producer

Config

import org.springframework.kafka.support.serializer.JsonSerializer;

@Configuration
public class config {
    @Bean
    public ProducerFactory<String,Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String,Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
  • jsonSerializer로 값을 직렬화합니다.

Processor

@Component
@RequiredArgsConstructor
public class KafkaProcessor<T> {
    private final KafkaTemplate<String,T> kafkaTemplate;

    public void send(String topicName, T data) {
        kafkaTemplate.send(topicName, data);
    }
}
  • 제네릭을 활용한 Processor를 만들어줌으로써 다양한 타입을 손쉽게 Produce할 수 있게 합니다.

Service

import bloomingblooms.domain.flower.FlowerDto;
import bloomingblooms.domain.flower.StockChangeDto;

@Service
@RequiredArgsConstructor
public class ProducerService {

	// Processor의 T타입 위치에 전송하고 싶은 객체를 선언합니다. 
    private final KafkaProcessor<FlowerDto> flowerDtoKafkaProcessor;
    private final KafkaProcessor<StockChangeDto> stockChangeDtoKafkaProcessor;

    public void pub1() {
        flowerDtoKafkaProcessor.send("dto",new FlowerDto(1L, "name"));
    }

    public void pub2() {
        stockChangeDtoKafkaProcessor.send("dto2",new StockChangeDto(1L,1L,10L));
    }

}
  • FlowerDto와 StockChangeDto 모두 maven repository에 올라가있는 클래스입니다.
  • 각각 다른 타입의 KafkaProcessor를 만들어 send를 실행합니다.

Consumer

Config

@Configuration
public class config {
    @Bean
    public ConsumerFactory<String,Object> consumerFactory() {
        Map<String,Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "bloomingblooms.*");
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
  • Deserializer로 ErrorHandlingDesirializer를 선택합니다. 카프카의 consumer는 기본적으로 무한반복하는 poll()을 통해 데이터를 가져옵니다. 그렇기 때문에 ErrorHandlingDesirializer를 사용하지 않은 일반적인 상황에서는 계속해서 데이터를 가져오려는 시도를 하고 계속해서 실패하게 되면서 문제가 해결될때까지 엄청난 양의 로그가 쌓이게 됩니다.
    ErrorHandlingDesirializer는 역직렬화 작업을 설정으로 정의한 역직렬화 클래스에게
    위임하고, 해당 객체가 역직렬화에 실패했을 때 MessageConversionException이 발생합니다.
  • ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS로 ErrorHandlingDesirializer가 역직렬화 작업을 위임할 클래스를 선언합니다.
  • JsonDeserializer.TRUSTED_PACKAGES로 우리의 객체가 올라가있는 패키지를 허용해 줍니다.

Service

@Component
public class Consumer {

    @KafkaListener(topics = "dto", groupId = "g1")
    public void sub1(FlowerDto flowerDto) {
        System.out.println(flowerDto.getFlowerId());
    }

    @KafkaListener(topics = "dto2", groupId = "g2")
    public void sub2(StockChangeDto stockChangeDto) {
        System.out.println(stockChangeDto.getFlowerId());
    }

}
profile
기록하고 정리하는 걸 좋아하는 개발자.

0개의 댓글