redis pub/sub로 알림구현하기

Jung In Lee·2024년 4월 6일
0

Novelit

목록 보기
2/5

Novelit 복구, 고도화

(1) redis pub/sub

  • 기존에 SSE방식으로 전달하는 알림방식은, username에 해당하는 사람에게만 전송하는 방식으로 처리를 한다.

  • 댓글을 작성하면 해당 구독자에게만 알림을 보낸다.

  • 다만 둘이 양방향 소통은 아니고, publisher(Editor) -> subscriber(Writer), 혹은 그 반대로 삼아 전달한다.

  • 뭔가 진행방식은 양방향인데, 양방향으로 계속 소켓을 열어두기에는 리소스 소모가 조금 크다고 보았다.

  • 처음엔 리소스 절감과 일방향 소통이라고 생각해 sse 하나만으로 상관이없겠다고 생각했다.

  • 하지만 Editor가 댓글을쓰면 Writer에게 알림이 가야하고, Writer가 답글을 쓰면 Editor에게 알림이 가야한다.

  • 양방향 소통 방식의 그림은 띄지만, 실시간 양방향은 아니다.

  • 따라서 둘이 소통할수있는 하나의 channel을 둬서, 아키텍처를 생성하려고한다.

spring boot에서 redis pub/sub 구현

  • Redis 설정은 이전장에서 설명을 했으므로, 이번엔 spring 쪽을 다루려고한다.

  • RedisConfig

package com.example.practice.redis.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Slf4j
@Configuration
@EnableRedisRepositories
public class RedisConfig { // spring <-> redis 연결설정

    @Value("${spring.data.redis.host}")
    private String redisHost;
    @Value("${spring.data.redis.port}")
    private int redisPort;
    @Value("${spring.data.redis.password}")
    private String redisPassword;

    //lettuce
    @Bean
    public RedisConnectionFactory redisConnectionFactory(){
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(redisHost,redisPort);
        config.setPassword(redisPassword);
        return new LettuceConnectionFactory(config);
    }

    // redisTemplate: default Value Type = JSON
    /* If you want to use String Type, you can change ValueSerializer
    / to StringRedisSerializer or Use StringRedisTemplate*/
    @Bean
    public RedisTemplate<?, ?> redisTemplate(){
        RedisTemplate<?, ?> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory()); // connection
        redisTemplate.setKeySerializer(new StringRedisSerializer()); // key
        // Java Object <-> JSON <-> String value
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));

        return redisTemplate;
    }

    /*
    * Redis pub/sub 메시지 처리 Listener
    * */
    @Bean
    public RedisMessageListenerContainer redisMessageListener(){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory());
        return container;
    }
}
  • RedisConfig에는 연결관련 설정을 해준다.

  • RedisTemplate가 거의 핵심역할이라고 생각한다. 여기서 Redis의 <key, value> 형태를 java Object로 변환할수있게하고, 반대도 가능하게 해주는 설정을 가진다.

  • RedisController

package com.example.practice.redis.controller;

import com.example.practice.redis.dto.MessageDto;
import com.example.practice.redis.service.RedisPubService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

@RestController
@RequiredArgsConstructor
@Slf4j
@RequestMapping("/api/v1/redis/pubsub")
public class RedisPubSubController {
    private final RedisPubService redisSubscribeService;

    @PostMapping("/send")
    public void sendMessage(@RequestParam(required = true) String channel,
                            @RequestBody MessageDto message) {
        log.info("Redis Pub MSG Channel = {}", channel);
        redisSubscribeService.pubMsgChannel(channel, message);
    }

    @PostMapping("/cancel")
    public void cancelSubChannel(@RequestParam String channel) {
        redisSubscribeService.cancelSubChannel(channel);
    }
}
  • 실험할 api다. /send api를 사용하면 해당 채널에 메시지를 전송할수있다.

  • MessageDto

package com.example.practice.redis.dto;

import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Getter
@NoArgsConstructor
public class MessageDto implements Serializable {
    private static final long serialVersionUID = 1L;

    private String message;
    private String sender;
    private String roomId; // 타겟 channel

    @Builder
    public MessageDto(String message, String sender, String roomId) {
        this.message = message;
        this.sender = sender;
        this.roomId = roomId;
    }
}
  • 이건 MessageDto의 형태.

Publisher

  • RedisPublisher
package com.example.practice.redis.service;

import com.example.practice.redis.dto.MessageDto;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Service;

@Service
public class RedisPublisher {
    private final RedisTemplate<String, Object> template;

    public RedisPublisher(RedisTemplate<String, Object> template) {
        this.template = template;
    }

    /*
    *  특정 채널에 메시지를 전송한다.
    * */

    /*
     * Object Publish
     * */
    public void publish(ChannelTopic topic, MessageDto messageDto) {
        template.convertAndSend(topic.getTopic(), messageDto);
    }

    /*
     * String Publish
     * */
    public void publish(ChannelTopic topic, String data) {
        template.convertAndSend(topic.getTopic(), data);
    }
}
  • RedisTemplate를 사용해 메시지를 전송한다. 파라미터는 채널과 메세지이다.

  • RedisPubService

package com.example.practice.redis.service;

import com.example.practice.redis.dto.MessageDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Slf4j
public class RedisPubService {
    private final RedisMessageListenerContainer redisMessageListenerContainer;
    private final RedisPublisher redisPublisher;

    // 각 Channel 별 Listener
    private final RedisSubscribeListener redisSubscribeListener;

    /*
     * Channel 별 Message 전송
     * @Param
     * */
    public void pubMsgChannel(String channel, MessageDto messageDto) {
        // 1. 요청한 Channel을 구독
        redisMessageListenerContainer.addMessageListener(redisSubscribeListener,
                new ChannelTopic(channel));

        // 2. Message 전송
        redisPublisher.publish(new ChannelTopic(channel), messageDto);
    }

    /*
     * Channel 구독 취소
     * @Param channel
     * */
    public void cancelSubChannel(String channel) {
        redisMessageListenerContainer.removeMessageListener(redisSubscribeListener);
    }
}
  • 서비스 단이다. 메세지 구독자들에게 모든 알림을 전송한다.

Subscibe

  • RedisSubscribeListener
package com.example.practice.redis.service;

import com.example.practice.redis.dto.MessageDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Slf4j
public class RedisSubscribeListener implements MessageListener {

    private final RedisTemplate<String, Object> template;
    private final ObjectMapper objectMapper;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String publishMessage = template
                    .getStringSerializer().deserialize(message.getBody());

            MessageDto messageDto = objectMapper.readValue(publishMessage, MessageDto.class);

            log.info("Redis Subscribe Channel : ", messageDto.getRoomId());
            log.info("Redis SUB Message : {}", publishMessage);

            // Return || Another Method Call (etc.sae to DB)
            // TODO
            /*
            * 여기 알림이 들어갈 예정이다.
            * 
            * */
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }
    }
}
  • publisher로 부터 받은 메세지를 String 형태로 deserialize 시킨다.
  • 이후, DTO 형태에 맡게 넣어주고, log를 찍어 이를 확인한다.
  • 추후에 여기는 알림을 send해줘야하기때문에 알림 메소드가 대신들어간다.

결과

  • postman으로 api를 쏘면
  • spring에서는 submessage가 정상적으로 받아짐을 볼수있고,
  • movaxterm에서 redis 채널 모니터링도 가능하다.

이후

  • redis를 일단 하나만 사용했는데, message가 많이 발생하는걸 대비해 clustering을 먼저 할 생각이다.

  • redis pub/sub는 데이터를 저장하지않는다. 하지만, 전체 알림확인을 위해 db에 데이터를 저장하긴해야하는데,

  • kafka를 사용해서 두개의 consumer를 만들생각이다. 하나는 db에 연결하고, 하나는 위와같은 알림 consumer.

참고 및 출처

좋은 코드 알려주시는 분들 항상 감사합니다.

profile
Spring Backend Developer

0개의 댓글