주고받는 객체가 maven repository
에 등록되어 있어야 합니다.
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class config {
@Bean
public ProducerFactory<String,Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String,Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Component
@RequiredArgsConstructor
public class KafkaProcessor<T> {
private final KafkaTemplate<String,T> kafkaTemplate;
public void send(String topicName, T data) {
kafkaTemplate.send(topicName, data);
}
}
import bloomingblooms.domain.flower.FlowerDto;
import bloomingblooms.domain.flower.StockChangeDto;
@Service
@RequiredArgsConstructor
public class ProducerService {
// Processor의 T타입 위치에 전송하고 싶은 객체를 선언합니다.
private final KafkaProcessor<FlowerDto> flowerDtoKafkaProcessor;
private final KafkaProcessor<StockChangeDto> stockChangeDtoKafkaProcessor;
public void pub1() {
flowerDtoKafkaProcessor.send("dto",new FlowerDto(1L, "name"));
}
public void pub2() {
stockChangeDtoKafkaProcessor.send("dto2",new StockChangeDto(1L,1L,10L));
}
}
@Configuration
public class config {
@Bean
public ConsumerFactory<String,Object> consumerFactory() {
Map<String,Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "bloomingblooms.*");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Component
public class Consumer {
@KafkaListener(topics = "dto", groupId = "g1")
public void sub1(FlowerDto flowerDto) {
System.out.println(flowerDto.getFlowerId());
}
@KafkaListener(topics = "dto2", groupId = "g2")
public void sub2(StockChangeDto stockChangeDto) {
System.out.println(stockChangeDto.getFlowerId());
}
}