- 코드를 작성하기 전 아까 보았던 Kafka의 동작 원리 그림을 확인해보면 우리가 필요한게 어떤건지 알수 있다.
- Publisher(=producer)
- Subscriber(=consumer)
- 위 두개가 코드상에서 필요한 부분들이다.
- 나머지 Topic ,Broker, ZooKeeper 는 서버적인 설정이 필요하다.
- Publisher(=producer) → 데이터를 카프카에게 보내는 쪽이다. 지금 상황에서는 Mate 서비스에서 Title을 보내야하기에 Mate 서비스가 Publisher가 된다.
- KafkaProducerConfig.java
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
→ 카프카 클러스터에 연결하기 위한 브로커(서버)의 호스트와 포트 정보를 지정 (도커 컨테이너명 : 포트번호)
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
,ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
→ Kafka 프로듀서의 키 직렬화 클래스로 StringSerializer.class 사용하고 있다.
- KafkaProducer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, CreateMatePostRequest createMatePostRequest) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(createMatePostRequest);
} catch (JsonProcessingException exception) {
exception.printStackTrace();
}
log.info("Kafka Producer sent: " + jsonInString);
kafkaTemplate.send(topic, jsonInString);
}
}
mapper.writeValueAsString(createMatePostRequest)
→ createMatePostRequest를 JSON 형식의 문자열로 만들어준다.
- 직렬화를 해주는 이유 : Kafka에서 통신을 하기 위해서는 Json에 형태로 보내야하기에 직렬화를 해주었다.
kafkaTemplate.send(topic, jsonInString)
→ send 메소드를 이용하여 Kafka 서버로 토픽과 데이터를 보내주었다.
- 이렇게 되면 Kafka에서 해당 토픽을 구독하고 있는 Subscriber(=consumer) 서버를 찾아준다.
- Subscriber(=consumer) → 특정 Topic에서 메시지를 읽고 처리합니다. 지금 상황에서는 Title을 User 서비스에서 읽어야하므로 User 서비스가 Subscriber(=consumer)가 된다.
- KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
ConsumerConfig.GROUP_ID_CONFIG
→ Consumer Group을 식별하는 값이다.
- KafkaConsumer.java
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final UserJpaRepo userJpaRepo;
@KafkaListener(topics = "user-topic")
public void getTitle(String kafkaMessage) {
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
log.info("this is " + (String) map.get("title"));
}
}
@KafkaListener(topics = "user-topic")
→ Kafka에서 구독할 토익명을 적어준다.
mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {})
→ 첫 번째 매개변수인 kafkaMessage는 변환할 JSON 문자열입니다. 두 번째 매개변수인 TypeReference는 변환할 객체의 타입을 지정합니다.