Kafka로 Java 객체 전송하기

이정빈·2024년 6월 2일
1
post-thumbnail

Notify 프로젝트를 진행하던 중 카프카 메시지로 자바 객체를 보내야하는 상황이 왔다.
따라서 직렬화를 통해 자바 객체를 보내고 직렬화를 통해 자바 객체로 복구를 하는 과정을 거쳤야했고, 카프카 관련 설정들도 추가해야했다.

먼저 직렬화와 직렬화와 역직렬화에 대해 알아보았다.

✅ 직렬화 : 객체들의 데이터를 연속적인 데이터(스트림)로 변형하여 전송 가능한 형태로 만드는 것

✅ 역직렬화 : 직렬화된 데이터를 다시 객체의 형태로 만드는 것

결국 전송할 때는 스트림으로 전송하기 때문에 스트림으로 만드는 과정과 그 스트림을 다시 객체로 바꾸는 과정이 각각 직렬화와 역직렬화였다.

Producer 부분(직렬화)

1. build.gradle에 직렬화를 위한 라이브러리 추가

[build.gradle에 추가]

implementation 'com.fasterxml.jackson.core:jackson-databind'

2. 직렬화 메서드 구현

나는 List<Notice> 객체를 직렬화해야했다. 따라서 먼저 Notice 클래스에 implements Serializable를 추가하였다.
[Notice.java의 일부]

public class Notice extends BaseTimeEntity implements Serializable {

이후 직렬화를 실행하는 NoticeSerializer클래스를 생성하였다.

[NoticeSerializer.java]

package com.example.notify_crawler.producer;

import com.example.notify_crawler.notice.domain.Notice;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;

public class NoticeSerializer implements org.apache.kafka.common.serialization.Serializer<List<Notice>> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, List<Notice> data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new RuntimeException("List<Notice> 직렬화 오류", e);
        }
    }
}

3. application.yml 수정

value-serializer: com.example.notify_crawler.producer.NoticeSerializer를 추가하여 직렬화할 메서드를 설정한다.

[application.yml 일부]

  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.example.notify_crawler.producer.NoticeSerializer

4. Producer 클래스 수정

Producer 클래스에서 아래 코드를 통해 직렬화를 실행하도록 수정하고 파라미터도 List<Notice>를 받도록 수정하였다.

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NoticeSerializer.class.getName());

[기존의 KafkaProducer.java]

package com.example.notify_crawler.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private static final String TOPIC = "notify-crawler-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

[수정 완료한 KafkaProducer.java]

package com.example.notify_crawler.producer;

import com.example.notify_crawler.notice.domain.Notice;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Properties;

@Service
public class KafkaProducer {
    private static final String TOPIC = "notify-crawler-topic";

    public static void produce(List<Notice> newNotices) {
        // Kafka Producer 설정
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NoticeSerializer.class.getName());

        // Kafka Producer 생성
        org.apache.kafka.clients.producer.KafkaProducer<String, List<Notice>> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        // Kafka에 메시지 보내기
        ProducerRecord<String, List<Notice>> record = new ProducerRecord<>(TOPIC, newNotices);
        producer.send(record);

        // Producer 종료
        producer.flush();
        producer.close();
    }
}

Consumer 부분(직렬화)

1. build.gradle에 역직렬화를 위한 라이브러리 추가

[build.gradle에 추가]

implementation 'com.fasterxml.jackson.core:jackson-databind'

2. 역직렬화 메서드 구현

List<Notice> 객체로 역직렬화해야했다. 따라서 역직렬화 메서드를 구현하였다.
[NoticeListDeserializer.java]

package com.example.notifyserver.kafka;

import com.example.notifyserver.notice.domain.Notice;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.List;

public class NoticeListDeserializer implements Deserializer<List<Notice>> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public List<Notice> deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(data, new TypeReference<List<Notice>>() {});
        } catch (Exception e) {
            throw new RuntimeException(" List<Noitce> 역직렬화 오류 ", e);
        }
    }
}

3. application.yml 수정

value-deserializer: com.example.notifyserver.kafka.NoticeListDeserializer 를 추가하여 역직렬화할 메서드를 설정한다.

[application.yml 일부]

    kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: notify
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.example.notifyserver.kafka.NoticeListDeserializer

4. KafkaConsumerConfig 클래스 추가

역직렬화 메서드를 사용하고, 데이터를 카프카로부터 제대로 받아오기 위해 아래와 같이 설정하였다.

   properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoticeListDeserializer.class);

위 코드를 통해 역직렬화 메서드를 사용하도록 설정했다.

[KafkaConsumerConfig.java]

package com.example.notifyserver.kafka;

import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"notify");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NoticeListDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(properties);
    }
}

5. Consumer 클래스 수정

해당 클래스에서는 메시지 consume 시 List<Notice>로 받도록 수정하였고 비어있는 경우도 처리해주었다.

[기존의 KafkaConsumer.java]

package com.example.notifyserver.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "notify-crawler-topic", groupId = "notify")
    public void listen(String message) {
        log.info(message + "from Kafka");
        log.warn("===================="+message + "from Kafka ====================");

[수정 완료한 KafkaConsumer.java]

package com.example.notifyserver.kafka;

import com.example.notifyserver.notice.domain.Notice;
import com.example.notifyserver.user.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@Slf4j
public class KafkaConsumer {
    @Autowired
    UserService userService;

    @KafkaListener(topics = "notify-crawler-topic", groupId = "notify")
    public void consume(List<Notice> notices) {
        if (notices == null || notices.isEmpty()) {
            // 비어있는 noticeList가 오는 경우에 대한 처리
            log.info("================ 새롭게 크롤링 된 게시물이 없습니다. ================");
            return;
        }

        // Kafka 메시지를 역직렬화한 List<Notice> 객체를 사용하여 원하는 작업을 수행
        for (Notice notice : notices) {
            log.info("================ 새롭게 크롤링 된 게시물 제목: ================" + notice.getNoticeTitle());
        }

    }
}

참고:

profile
사용자의 입장에서 생각하며 문제를 해결하는 백엔드 개발자입니다✍

0개의 댓글