Notify 프로젝트를 진행하던 중 카프카 메시지로 자바 객체를 보내야하는 상황이 왔다.
따라서 직렬화를 통해 자바 객체를 보내고 직렬화를 통해 자바 객체로 복구를 하는 과정을 거쳤야했고, 카프카 관련 설정들도 추가해야했다.
먼저 직렬화와 직렬화와 역직렬화에 대해 알아보았다.
결국 전송할 때는 스트림으로 전송하기 때문에 스트림으로 만드는 과정과 그 스트림을 다시 객체로 바꾸는 과정이 각각 직렬화와 역직렬화였다.
[build.gradle에 추가]
implementation 'com.fasterxml.jackson.core:jackson-databind'
나는 List<Notice>
객체를 직렬화해야했다. 따라서 먼저 Notice
클래스에 implements Serializable
를 추가하였다.
[Notice.java의 일부]
public class Notice extends BaseTimeEntity implements Serializable {
이후 직렬화를 실행하는 NoticeSerializer
클래스를 생성하였다.
[NoticeSerializer.java]
package com.example.notify_crawler.producer;
import com.example.notify_crawler.notice.domain.Notice;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
public class NoticeSerializer implements org.apache.kafka.common.serialization.Serializer<List<Notice>> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, List<Notice> data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException("List<Notice> 직렬화 오류", e);
}
}
}
value-serializer: com.example.notify_crawler.producer.NoticeSerializer
를 추가하여 직렬화할 메서드를 설정한다.
[application.yml 일부]
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.example.notify_crawler.producer.NoticeSerializer
Producer 클래스에서 아래 코드를 통해 직렬화를 실행하도록 수정하고 파라미터도 List<Notice>
를 받도록 수정하였다.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NoticeSerializer.class.getName());
[기존의 KafkaProducer.java]
package com.example.notify_crawler.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final String TOPIC = "notify-crawler-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
[수정 완료한 KafkaProducer.java]
package com.example.notify_crawler.producer;
import com.example.notify_crawler.notice.domain.Notice;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Properties;
@Service
public class KafkaProducer {
private static final String TOPIC = "notify-crawler-topic";
public static void produce(List<Notice> newNotices) {
// Kafka Producer 설정
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NoticeSerializer.class.getName());
// Kafka Producer 생성
org.apache.kafka.clients.producer.KafkaProducer<String, List<Notice>> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
// Kafka에 메시지 보내기
ProducerRecord<String, List<Notice>> record = new ProducerRecord<>(TOPIC, newNotices);
producer.send(record);
// Producer 종료
producer.flush();
producer.close();
}
}
[build.gradle에 추가]
implementation 'com.fasterxml.jackson.core:jackson-databind'
List<Notice>
객체로 역직렬화해야했다. 따라서 역직렬화 메서드를 구현하였다.
[NoticeListDeserializer.java]
package com.example.notifyserver.kafka;
import com.example.notifyserver.notice.domain.Notice;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.List;
public class NoticeListDeserializer implements Deserializer<List<Notice>> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public List<Notice> deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, new TypeReference<List<Notice>>() {});
} catch (Exception e) {
throw new RuntimeException(" List<Noitce> 역직렬화 오류 ", e);
}
}
}
value-deserializer: com.example.notifyserver.kafka.NoticeListDeserializer
를 추가하여 역직렬화할 메서드를 설정한다.
[application.yml 일부]
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: notify
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.example.notifyserver.kafka.NoticeListDeserializer
역직렬화 메서드를 사용하고, 데이터를 카프카로부터 제대로 받아오기 위해 아래와 같이 설정하였다.
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoticeListDeserializer.class);
위 코드를 통해 역직렬화 메서드를 사용하도록 설정했다.
[KafkaConsumerConfig.java]
package com.example.notifyserver.kafka;
import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,String> consumerFactory(){
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"notify");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoticeListDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
}
해당 클래스에서는 메시지 consume 시 List<Notice>
로 받도록 수정하였고 비어있는 경우도 처리해주었다.
[기존의 KafkaConsumer.java]
package com.example.notifyserver.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "notify-crawler-topic", groupId = "notify")
public void listen(String message) {
log.info(message + "from Kafka");
log.warn("===================="+message + "from Kafka ====================");
[수정 완료한 KafkaConsumer.java]
package com.example.notifyserver.kafka;
import com.example.notifyserver.notice.domain.Notice;
import com.example.notifyserver.user.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
public class KafkaConsumer {
@Autowired
UserService userService;
@KafkaListener(topics = "notify-crawler-topic", groupId = "notify")
public void consume(List<Notice> notices) {
if (notices == null || notices.isEmpty()) {
// 비어있는 noticeList가 오는 경우에 대한 처리
log.info("================ 새롭게 크롤링 된 게시물이 없습니다. ================");
return;
}
// Kafka 메시지를 역직렬화한 List<Notice> 객체를 사용하여 원하는 작업을 수행
for (Notice notice : notices) {
log.info("================ 새롭게 크롤링 된 게시물 제목: ================" + notice.getNoticeTitle());
}
}
}
참고: