Spring Kafka를 사용하여 메시지를 송수신하는 과정에서 org.springframework.kafka.KafkaException: Seek to current after exception
오류가 발생하였습니다.
이 오류는 Kafka 컨슈머가 메시지를 정상적으로 처리하지 못했을 때 발생합니다.
오류의 원인은 프로듀서와 컨슈머 간의 데이터 직렬화 및 역직렬화 설정이 서로 불일치하였기 때문입니다.
프로듀서는 JsonSerializer
를 사용하여 객체를 JSON 문자열로 변환하여 메시지를 전송하였으나, 컨슈머는 StringDeserializer
를 사용하여 메시지를 문자열로만 변환하려고 시도하였습니다.
이로 인해 데이터 타입이 불일치하게 되어 메시지 처리 중 예외가 발생하였습니다.
직렬화 설정 일치
프로듀서와 컨슈머 모두 StringSerializer
및 StringDeserializer
를 사용하도록 설정을 변경하여, 데이터의 직렬화 및 역직렬화 방식을 일치시켰습니다.
예외 처리
@KafkaListener
어노테이션이 적용된 메서드 내에서 예외가 발생할 경우 이를 적절히 처리하여, 오류 발생 시에도 시스템이 안정적으로 동작하도록 하였습니다.
아래의 코드에서는 메시지를 수신한 후 JSON 문자열을 AdminUserManagemetKafkaDto
객체로 변환하여 처리하고 있습니다.
오류가 발생하지 않도록 직렬화 및 역직렬화 설정을 일치시켰습니다.
@KafkaListener(topics = "bookDonationEventApplyInput", groupId = "event-apply-Input-consumer-group")
public void AdminUserManagementConsume(String message) throws JsonProcessingException {
System.out.println("Received Message in group 'test-consumer-group1': " + message);
ObjectMapper objectMapper = new ObjectMapper();
AdminUserManagemetKafkaDto kafkaDto = objectMapper.readValue(message, AdminUserManagemetKafkaDto.class);
Page<User> users = userService.findUsersByUsernameAndRoleV1(kafkaDto.getUserName(), kafkaDto.getUserRole()
, PageRequest.of(kafkaDto.getPage(), kafkaDto.getPageSize()));
List<UserResponseDto> userResponseDtos = users.stream().map(UserResponseDto::new).toList();
userResponseDtos.forEach(System.out::println);
String jsonString = objectMapper.writeValueAsString(userResponseDtos);
producer.sendMessage("bookDonationEventApplyOutput", jsonString);
}
설정을 일치시킨 후 Kafka를 통한 메시지 송수신이 정상적으로 이루어졌으며, Seek to current after exception
오류가 발생하지 않았습니다.
또한, 프론트쪽 서버의 컨슈머에서도 메시지를 정상적으로 전달 받았다는 것을 확인할 수 있었습니다.
Kafka를 사용할 때 설정 불일치는 메시지 처리 실패로 이어질 수 있으므로 주의 깊게 확인하고 일치시켜야 한다는 것을 알 수 있었습니다.