<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
해당 패키지는 producer/consumer의 관리를 위해 만든 것으로 안만들어도 상관 없음
※ message를 주고 받는 경우 직렬화/역직렬화가 필요하다.
이때, 메시지를 생산하는 producer가 메시지를 직렬화하여 보내게 되며, 메시지를 받는 consumer가 직렬화된 메시지를 역직렬화하여 받게 된다.해당 패키지에 들어갈 내용
- kafka producer/consumer 설정파일
- kafka producer/consumer 파일
kafka producer 클래스 생성
이 설정파일(클래스)에는 producer를 사용할때 필요한 설정들을 등록한 것으로 빈을 등록해 사용하게 된다.
- ProducerFactory : producer를 생성하는 전략을 설정하는 객체이다.
- DefaultKafkaProducerFactory : ProducerFactory라는 인터페이스의 구현체라고 보면된다.(싱글톤으로 생성됨)
- KafkaTemplate : DefaultKafkaProducerFactory와 함께사용되는 것으로 메시지가 담기는 객체라고 보면 된다.
(메시지를 담을때 어떠한 설정으로 담을 것인가를 설정하는 것)
이때, KafkaTemplate의 send메서드를 실행할때, 만약 보내는 토픽이 없는 경우 자동으로 생성되는 것을 확인할 수 있다.- ProducerConfig : 카프카 producer의 설정에 대한 정보를 가진 클래스이다.
@EnableKafka @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String,String> producerFactory(){ Map<String,Object> properties = new HashMap<>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(properties); } @Bean public KafkaTemplate<String,String> kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); } }
kafka producer 클래스 생성
KafkaTemplate 의존성을 주입받아서 해당 빈이 갖고 있는 메서드인 send를 이용해 메시지를 보낸다.
메시지를 보내기 전, 메시지를 가공하는 데, 이때, ObjectMapper사용되기도 한다.@Service @Slf4j public class KafkaProducer { private KafkaTemplate<String,String> kafkaTemplate; @Autowired public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public OrderDto send(String topic, OrderDto orderDto){ ObjectMapper mapper = new ObjectMapper(); String jsonInString = ""; try{ jsonInString = mapper.writeValueAsString(orderDto); }catch(JsonProcessingException ex){ ex.printStackTrace(); } kafkaTemplate.send(topic,jsonInString); log.info("kafka Producer sent data " + orderDto); return orderDto; } }
ObjectMapper란
ObjectMapper는 Object를 JSON(직렬화) 혹은 JSON을 Object로 변환 시켜주는 라이브러리이다.
위에서는 OrderDto 객체를 Json문자열로 변환시켜 메시지를 만드는 역활을 하였다.kafka consumer 설정클래스 생성
- @EnableKafka : 리스너(ConcurrentKafkaListenerContainerFactory)를 사용할 수 있도록 등록해주는 어노테이션이다.
- ConsumerFactory : consumer를 생성하는 전략을 설정하는 객체이다.
- DefaultKafkaConsumerFactory : 새로운 consumer를 생성하는 객체 ConsumerFactory의 구현체라고 보면 된다.
이러한 설정은 앞서했던.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
이러한 설정을 코드로 옮긴 것이다.- ConcurrentKafkaListenerContainerFactory : @KafkaListener가 붙은 메서드의 컨테이너를 빌드(생성)해준다.
이때, 설정정보가 담긴 ConsumerFactory를 setting할 수 있다.@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String,String> consumerFactory(){ Map<String,Object> properties = new HashMap<>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupId"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(properties); } @Bean public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>(); kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); return kafkaListenerContainerFactory; } }
kafka consumer 클래스 생성
카프카 topic으로 부터 받은 메시지를 역직렬화(ObjectMapper)를 통해 Map으로 변환한 뒤 적절하게 데이터를 db에 저장하면 된다.
@Service @Slf4j public class KafkaConsumer { CatalogRepository repository; @Autowired public KafkaConsumer(CatalogRepository repository) { this.repository = repository; } @KafkaListener(topics = "example-catalog-topic") public void updateQty(String kafkaMessage){ log.info("kafka Message : ",kafkaMessage); Map<Object,Object> map = new HashMap<>(); ObjectMapper mapper = new ObjectMapper(); try{ map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {}); }catch (JsonProcessingException ex){ ex.printStackTrace(); } CatalogEntity entity = repository.findByProductId((String)map.get("productId")); if(entity != null){ entity.setStock(entity.getStock() - (Integer)map.get("qty")); repository.save(entity); } } }
이러한 작업을 줄여주는 것이 바로 kafka connector 이다.