Kafka producer/consumer실습

김성환·2022년 9월 12일
0

실습

목록 보기
4/6
post-thumbnail

의존성 추가

<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
</dependency>

messagequeue 패키지 생성

해당 패키지는 producer/consumer의 관리를 위해 만든 것으로 안만들어도 상관 없음
※ message를 주고 받는 경우 직렬화/역직렬화가 필요하다.
이때, 메시지를 생산하는 producer가 메시지를 직렬화하여 보내게 되며, 메시지를 받는 consumer가 직렬화된 메시지를 역직렬화하여 받게 된다.

해당 패키지에 들어갈 내용

  1. kafka producer/consumer 설정파일
  2. kafka producer/consumer 파일

kafka producer 클래스 생성

이 설정파일(클래스)에는 producer를 사용할때 필요한 설정들을 등록한 것으로 빈을 등록해 사용하게 된다.

  • ProducerFactory : producer를 생성하는 전략을 설정하는 객체이다.
  • DefaultKafkaProducerFactory : ProducerFactory라는 인터페이스의 구현체라고 보면된다.(싱글톤으로 생성됨)
  • KafkaTemplate : DefaultKafkaProducerFactory와 함께사용되는 것으로 메시지가 담기는 객체라고 보면 된다.
    (메시지를 담을때 어떠한 설정으로 담을 것인가를 설정하는 것)
    이때, KafkaTemplate의 send메서드를 실행할때, 만약 보내는 토픽이 없는 경우 자동으로 생성되는 것을 확인할 수 있다.
  • ProducerConfig : 카프카 producer의 설정에 대한 정보를 가진 클래스이다.
@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String,String> producerFactory(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }
    @Bean
    public KafkaTemplate<String,String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

kafka producer 클래스 생성

KafkaTemplate 의존성을 주입받아서 해당 빈이 갖고 있는 메서드인 send를 이용해 메시지를 보낸다.
메시지를 보내기 전, 메시지를 가공하는 데, 이때, ObjectMapper사용되기도 한다.

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String,String> kafkaTemplate;
    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    public OrderDto send(String topic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try{
            jsonInString = mapper.writeValueAsString(orderDto);
        }catch(JsonProcessingException ex){
            ex.printStackTrace();
        }
        kafkaTemplate.send(topic,jsonInString);
        log.info("kafka Producer sent data " + orderDto);
        return orderDto;
    }
}

ObjectMapper란

ObjectMapper는 Object를 JSON(직렬화) 혹은 JSON을 Object로 변환 시켜주는 라이브러리이다.
위에서는 OrderDto 객체를 Json문자열로 변환시켜 메시지를 만드는 역활을 하였다.

kafka consumer 설정클래스 생성

  • @EnableKafka : 리스너(ConcurrentKafkaListenerContainerFactory)를 사용할 수 있도록 등록해주는 어노테이션이다.
  • ConsumerFactory : consumer를 생성하는 전략을 설정하는 객체이다.
  • DefaultKafkaConsumerFactory : 새로운 consumer를 생성하는 객체 ConsumerFactory의 구현체라고 보면 된다.
    이러한 설정은 앞서했던 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 이러한 설정을 코드로 옮긴 것이다.
  • ConcurrentKafkaListenerContainerFactory : @KafkaListener가 붙은 메서드의 컨테이너를 빌드(생성)해준다.
    이때, 설정정보가 담긴 ConsumerFactory를 setting할 수 있다.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(properties);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();       
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}

kafka consumer 클래스 생성

카프카 topic으로 부터 받은 메시지를 역직렬화(ObjectMapper)를 통해 Map으로 변환한 뒤 적절하게 데이터를 db에 저장하면 된다.

@Service
@Slf4j
public class KafkaConsumer {
    CatalogRepository repository;
    @Autowired
    public KafkaConsumer(CatalogRepository repository) {
        this.repository = repository;
    }
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage){
        log.info("kafka Message : ",kafkaMessage);
        Map<Object,Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try{
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        }catch (JsonProcessingException ex){
            ex.printStackTrace();
        }
        CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
        if(entity != null){
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            repository.save(entity);
        }
    }
}

이러한 작업을 줄여주는 것이 바로 kafka connector 이다.

profile
개발자가 되고 싶다

0개의 댓글