Kafka 에서 발생한 Exception

박우영·2023년 5월 20일
0

트러블 슈팅

목록 보기
12/19

kafka 를 도입하는데 exception과 하루종일 싸웠던 과정입니다.

1. SerializerExcetion


kafka는 직렬화, 역직렬화를 하여 데이터를 송신 -> 수신 합니다.
SerializerExcetion 은 직렬화나 역직렬화 과정에서 오류를 뜻했다는거라고 생각했고

형변환 클래스 작성

public class MemberDtoSerializer implements Serializer<MemberDto> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String topic, MemberDto data) {
        if (data == null) {
            System.out.println("serialize 에서 발생");
            throw new NotFoundException("데이터 없음");
        }
        try {
            System.out.println("serialize 에서 발생2");
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializeException("Error", e);
        }
    }

    @Override
    public byte[] serialize(String topic, Headers headers, MemberDto data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

KafkaConfig

    @Bean
    public Map<String, Object> producerConfig(){
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MemberDtoSerializer.class.getName());
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);

        return props;
    }

다음과같이 객체로 Consumer가 변환할 수 있도록 하였습니다.

2. NosuchException


분명 DB에도 있는데 데이터가 들어있고 객체 변환까지 성공했을텐데도 Exception이 발생

코드는 다음과 같습니다

    @KafkaListener(topics = "${message.topic.name}", groupId = ConsumerConfig.GROUP_ID_CONFIG)
    public void consume(String message) throws IOException {
        log.info("#############메시지 대기중 ###############");
        log.info("message : {}", message);
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            map = objectMapper.readValue(message, new TypeReference<Map<Object, Object>>() {
            });
        } catch (JsonProcessingException e) {
            throw new NotFoundException(e + "데이터 없음");
        }
        try {
            Member member = service.getMember(map.get("id").toString).getData();
            System.out.println("#####################"+member.getBaekJoonName()+"#####################");
        } catch (NoSuchElementException e) {
            throw new NotFoundException("Member 데이터 없음");
        }
    }
}

member 를 찾을 수 없다는 의미라고 생각하고 디버깅 해봤습니다.
message는 직렬화를 통해 byte 타입으로 변환 된걸 확인할 수 있습니다.
map은 1번에서 처리한것과같이 key:value 형태로들어가 있고
memberservice에서 에러가 있다는것을 발견했고 String 으로 변환해서 username으로 찾은 에러였습니다.

Member member = service.getMember(map.get("id").toString).getData();

Member member = service.getMember((Long) map.get("id")).getData();

다음과같이 수정

classException 발생 Member id 가 Integer type으로 되어있습니다.

분명 MemberDto는

public class MemberDto {
    private Long id;
    private String BaekJoonName;
    private int bronze;
    private int silver;
    private int gold;
    private int platinum;
    private int diamond;
    private int ruby;

}

다음과같이 Long 타입이고 Member 또한 Long 타입입니다.
그리고 (Long) 을통해서 형변환이 된줄 알았는데 Integer 타입에서 Long 으로 형변환이 되지않아 발생한 ClassException 이었고 값을 변경 해봤습니다.

        try {
            Integer memberId = (Integer) map.get("id");
            Long longId = memberId.longValue();
            Member member = service.getMember(longId).getData();
            System.out.println("#####################"+member.getBaekJoonName()+"#####################");
        } catch (NoSuchElementException e) {
            throw new NotFoundException("Member 데이터 없음");
        }


객체 변환 성공

0개의 댓글