카프카 With Spring

김파란·2024년 8월 30일

Spring-Library

목록 보기
7/7

참고) https://dkswnkk.tistory.com/705
https://velog.io/@raddaslul/Stomp%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%98%EC%97%AC-%EC%B1%84%ED%8C%85-%EB%B0%8F-item-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0#8-redissubscriber
https://velog.io/@ha0kim/%EC%8A%A4%ED%94%84%EB%A7%81-%EC%9D%B8-%EC%95%A1%EC%85%98-8.%EB%B9%84%EB%8F%99%EA%B8%B0-%EB%A9%94%EC%8B%9C%EC%A7%80-%EC%A0%84%EC%86%A1%ED%95%98%EA%B8%B0

1. 환경설정

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
#      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: org.apache.kafka.common.serialization.StringSerializer

# 소비자도 yml로 설정가능
#  kafka:
#    consumer:
#      bootstrap-servers: localhost:9092 # Kafka 클러스에 대한 초기 연결에 사용할 호스트 : 포트 목록
#      group-id: consumer_group01 # Group Id
#      auto-offset-reset: earliest # offset 이 없거나 더 이상 없는 경우 어떻게 처리할지 전략 결정
       ## Deserialze 방법은 KafkaConsumerConfig 로 설정
#      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2. 설정정보

  • Producer 설정방법은 yml과 Bean 두가지로 작성할 수 있다

1). 정보

public class KafkaConstants {
    public static final String KAFKA_AI_TOPIC = "chat-ai-topic";
    public static final String KAFKA_TOPIC = "chat-topic";
    public static final String GROUP_ID = "chat-group";
    public static final String KAFKA_BROKER = "localhost:9092";
    public static List<Integer> partitionList;
}

2). 소비자설정

@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ChatMessageDto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ChatMessageDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        ContainerProperties prop = factory.getContainerProperties();
        prop.setConsumerRebalanceListener(rebalanceListener());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, ChatMessageDto> consumerFactory() {
        final Map<String, Object> config = new HashMap<>();
        JsonDeserializer<ChatMessageDto> deserializer = new JsonDeserializer<>(ChatMessageDto.class);
        deserializer.addTrustedPackages("*");

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConsumerAwareRebalanceListener rebalanceListener() {
        return new ConsumerAwareRebalanceListener() {
            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                // here partitions
                List<Integer> partList = new ArrayList<>();
                for (TopicPartition partition : partitions) {
                    int partition1 = partition.partition();
                    partList.add(partition1);
                    log.info("사용중인파티션:{}", partition1);
                }
                KafkaConstants.partitionList = partList;
            }
        };
    }

3). 생산자 설정

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, ChatMessageDto> producerFactory(){
        return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration());

    }

    @Bean
    public Map<String, Object> kafkaProducerConfiguration() {
        final Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKER);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return config;
    }

    @Bean
    public KafkaTemplate<String, ChatMessageDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

3. 사용

1). sub

@Service
@RequiredArgsConstructor
public class MessageReceiverService {

    private final SimpMessageSendingOperations template;

    @KafkaListener(topics = KafkaConstants.KAFKA_TOPIC, groupId = KafkaConstants.GROUP_ID)
    public void receiveMessage(ChatMessageDto message) {
        try {

            // 메세지객체 내부의 채팅방 ID 참조 -> 구독자에게 메세지 발송
            template.convertAndSend("/sub/chat/room/" + message.getRoomId(), message);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @KafkaListener(topics = KafkaConstants.KAFKA_AI_TOPIC, groupId = KafkaConstants.GROUP_ID)
    public void receiveAIMessage(ChatMessageDto message) {
        try {
            // 메세지객체 내부의 채팅방 ID 참조 -> 구독자에게 메세지 발송
            template.convertAndSend("/sub/chat/ai/room/" + message.getRoomId(), message);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2). pub

@Service
@RequiredArgsConstructor
@Slf4j
public class MessageSenderService {

    private final KafkaTemplate<String, ChatMessageDto> kafkaTemplate;
    private final ChatMessageService chatService;
    private final ChatRoomService chatRoomService;
    private final ChatRoomUserService chatRoomUserService;
    private final ChatClient chatClient;


    public void send(ChatMessageDto message) {

        message.setCreateDate(LocalDateTime.now());

        // Kafka Template 을 사용하여 메세지를 지정된 토픽으로 전송
        try {
            if (message.getType().equals(MessageType.ENTER) && chatRoomUserService.isUserAlreadyInRoom(message.getRoomId(), message.getLoginId())) {
                chatRoomUserService.activateUser(message);
            }
            if (message.getType().equals(MessageType.TALK)) {
                chatService.save(message);
                kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message);
                chatRoomUserService.getUserInactiveAndNotNotified(message);
            }
            else if (message.getType().equals(MessageType.ENTER) && !chatRoomUserService.isUserAlreadyInRoom(message.getRoomId(), message.getLoginId())) {
                message.setMessage(message.getSender() +"님 입장!!");
                chatRoomService.addUserInChat(message.getRoomId(), message.getLoginId());
                chatService.save(message);
                kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message);
            } else if (message.getType().equals(MessageType.LEAVE)) {
                message.setMessage(message.getSender() + "님 퇴장!!");
                chatRoomService.deleteUserInChat(message.getRoomId(), message.getLoginId());
                chatService.save(message);
                kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC, message);
            }


        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }
}

0개의 댓글