저번에 알아본것 처럼 각 이벤트에 key 값을 설정을 해주지 않는다면 partition에 무작위로 들어가게 되어 내가 보장받고 싶은 이벤트 순서가 보장이 안될수도 있다. 따라서 key를 설정해주면 같은 key는 같은 partition에 들어가기 때문에 순서가 보장된다. 한번 해보자
@RestController
@RequestMapping(value = "/kafka")
@RequiredArgsConstructor
public class KafkaController {
private final Producer nonAutoProducer;
private final KafkaProducer autoProducer;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
// 자동 consume X
@PostMapping("non-auto")
public String sendMessage(@RequestParam("key") String key, @RequestParam("value") String value) {
this.nonAutoProducer.sendMessage(key,value);
return "success";
}
// 자동 consume O
@PostMapping("/auto")
public String sendAutoMessage(@RequestParam("key") String key, @RequestParam("value") String value) {
this.autoProducer.sendMessage(key,value);
return "success";
}
//Listener 실행
@GetMapping
public String consumeMessage(@RequestParam("id") String id) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
listenerContainer.start();
return "success";
}
}
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "exam2";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String key, String value) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, key, value);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info(String.format("Produced event to topic %s: key = %-10s value = %s", TOPIC, key, value));
}
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
});
}
}
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(id = "consume", topics = "exam2", groupId = "foo", autoStartup = "false")
public void listen(String value,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
logger.info(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value));
}
}
코드가 간단하지만 짧게 설명하면 Consumer의 리스너에 설정값을 autoStartup="false"를 주어 자동으로 시작하는 것을 막아주었다. 따라서 Controller에 consumeMessage()
를 통해 리스너를 시작할 수 있도록 해주었다.
결과를 보면 알 수 있듯 각 key에 해당하는 값의 순서가 보장된채 소비가 되는것을 알 수 있다. 또한 아래의 웹에서의 메시지들을 보면 key 1 => 0번 파티션 , key 2,3 => 2번 파티션에 배정된것을 알 수 있다.
카프카를 잘 활용하기 위해 key를 잘 설정해보는게 좋을것 같다.