Kafka 이벤트 순서 보장 시키기

방진혁·2022년 4월 22일
0
post-thumbnail
post-custom-banner

Kafka key를 이용해서 순서 보장 시켜주기

저번에 알아본것 처럼 각 이벤트에 key 값을 설정을 해주지 않는다면 partition에 무작위로 들어가게 되어 내가 보장받고 싶은 이벤트 순서가 보장이 안될수도 있다. 따라서 key를 설정해주면 같은 key는 같은 partition에 들어가기 때문에 순서가 보장된다. 한번 해보자

Spring code

Controller

@RestController
@RequestMapping(value = "/kafka")
@RequiredArgsConstructor
public class KafkaController {
    private final Producer nonAutoProducer;
    private final KafkaProducer autoProducer;
    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    // 자동 consume X
    @PostMapping("non-auto")
    public String sendMessage(@RequestParam("key") String key, @RequestParam("value") String value) {
        this.nonAutoProducer.sendMessage(key,value);

        return "success";
    }
    // 자동 consume O
    @PostMapping("/auto")
    public String sendAutoMessage(@RequestParam("key") String key, @RequestParam("value") String value) {
        this.autoProducer.sendMessage(key,value);

        return "success";
    }

    //Listener 실행
    @GetMapping
    public String consumeMessage(@RequestParam("id") String id) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);
        listenerContainer.start();

        return "success";
    }
}

Producer

@Service
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "exam2";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String key, String value) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, key, value);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                logger.info(String.format("Produced event to topic %s: key = %-10s value = %s", TOPIC, key, value));
            }
            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
            }
        });
    }

}

Consumer

@Service
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @KafkaListener(id = "consume", topics = "exam2", groupId = "foo", autoStartup = "false")
    public void listen(String value,
                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                       @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        logger.info(String.format("Consumed event from topic %s: key = %-10s value = %s", topic, key, value));
    }
}

코드가 간단하지만 짧게 설명하면 Consumer의 리스너에 설정값을 autoStartup="false"를 주어 자동으로 시작하는 것을 막아주었다. 따라서 Controller에 consumeMessage()를 통해 리스너를 시작할 수 있도록 해주었다.

event 넣어주고 소비시켜 순서 확인해보기

  1. key 1 = (11,12,13) key 2 = (21,22,23) key 3 = (31,32,33)으로 각각의 key에 해당하는 값들은 순서를 지켜주며 다음과 같이 넣어주었다.
  1. 이제 리스너를 시작을 시키면 다음과 같이 소비된다.

결과를 보면 알 수 있듯 각 key에 해당하는 값의 순서가 보장된채 소비가 되는것을 알 수 있다. 또한 아래의 웹에서의 메시지들을 보면 key 1 => 0번 파티션 , key 2,3 => 2번 파티션에 배정된것을 알 수 있다.

카프카를 잘 활용하기 위해 key를 잘 설정해보는게 좋을것 같다.

profile
꾸준히 성장하고픈 개발자입니다.
post-custom-banner

0개의 댓글