kafka 를 도입하는데 exception과 하루종일 싸웠던 과정입니다.
kafka는 직렬화, 역직렬화를 하여 데이터를 송신 -> 수신 합니다.
SerializerExcetion 은 직렬화나 역직렬화 과정에서 오류를 뜻했다는거라고 생각했고
public class MemberDtoSerializer implements Serializer<MemberDto> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Serializer.super.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, MemberDto data) {
if (data == null) {
System.out.println("serialize 에서 발생");
throw new NotFoundException("데이터 없음");
}
try {
System.out.println("serialize 에서 발생2");
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializeException("Error", e);
}
}
@Override
public byte[] serialize(String topic, Headers headers, MemberDto data) {
return Serializer.super.serialize(topic, headers, data);
}
@Override
public void close() {
Serializer.super.close();
}
}
@Bean
public Map<String, Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MemberDtoSerializer.class.getName());
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
return props;
}
다음과같이 객체로 Consumer가 변환할 수 있도록 하였습니다.
분명 DB에도 있는데 데이터가 들어있고 객체 변환까지 성공했을텐데도 Exception이 발생
코드는 다음과 같습니다
@KafkaListener(topics = "${message.topic.name}", groupId = ConsumerConfig.GROUP_ID_CONFIG)
public void consume(String message) throws IOException {
log.info("#############메시지 대기중 ###############");
log.info("message : {}", message);
Map<Object, Object> map = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
try {
map = objectMapper.readValue(message, new TypeReference<Map<Object, Object>>() {
});
} catch (JsonProcessingException e) {
throw new NotFoundException(e + "데이터 없음");
}
try {
Member member = service.getMember(map.get("id").toString).getData();
System.out.println("#####################"+member.getBaekJoonName()+"#####################");
} catch (NoSuchElementException e) {
throw new NotFoundException("Member 데이터 없음");
}
}
}
member 를 찾을 수 없다는 의미라고 생각하고 디버깅 해봤습니다.
message는 직렬화를 통해 byte 타입으로 변환 된걸 확인할 수 있습니다.
map은 1번에서 처리한것과같이 key:value 형태로들어가 있고
memberservice에서 에러가 있다는것을 발견했고 String 으로 변환해서 username으로 찾은 에러였습니다.
Member member = service.getMember(map.get("id").toString).getData();
를
Member member = service.getMember((Long) map.get("id")).getData();
다음과같이 수정
classException 발생 Member id 가 Integer type으로 되어있습니다.
분명 MemberDto는
public class MemberDto {
private Long id;
private String BaekJoonName;
private int bronze;
private int silver;
private int gold;
private int platinum;
private int diamond;
private int ruby;
}
다음과같이 Long 타입이고 Member 또한 Long 타입입니다.
그리고 (Long) 을통해서 형변환이 된줄 알았는데 Integer 타입에서 Long 으로 형변환이 되지않아 발생한 ClassException 이었고 값을 변경 해봤습니다.
try {
Integer memberId = (Integer) map.get("id");
Long longId = memberId.longValue();
Member member = service.getMember(longId).getData();
System.out.println("#####################"+member.getBaekJoonName()+"#####################");
} catch (NoSuchElementException e) {
throw new NotFoundException("Member 데이터 없음");
}
객체 변환 성공