Server-Sent Events(SSE), Redis pub/sub, Kafka로 알림 기능 개선하기.

taehee kim·2023년 4월 4일
9
post-thumbnail

0. 기능 개선 후 동작 flow

  1. Client에서 Sse연결을 요청하면 Server에서는 연결 객체를 생성하여 이를 서버 인메모리내에 저장하고 Client에게 연결 정보를 제공해줍니다.
  2. 댓글 작성, 글 참여, 매칭 확정등의 이벤트(특정 사용자가 API호출 시)가 발생하면 해당 요청내용을 먼저 처리한 후 정상 처리 된다면 알림 이벤트를 produce하여 kafka에 발행합니다.
  3. kafka에 저장된 알림 이벤트는 두 Consumer Group에 의해서 consume되며 첫 번째 Consumer Group 은 알림 이벤트 데이터로 알림 엔티티를 만들어서 RDB에 저장하고 두 번째 Consumer Group은 SSE 응답을 보내기 위해 Redis pub/sub에 pub 메시지를 보냅니다.
  4. Redis channel에 pub메시지가 보내지면 해당 채널을 구독하고 있는 subscriber들에게 메시지가 push됩니다. subscriber들은 WebApplication Server들 입니다.
  5. Web Application Server가 redis pub메시지를 수신하면 다음 로직을 수행합니다. 먼저 인메모리 내의 ConcurrentHashMap에서 pub 메시지 정보에 해당하는 내용으로 SseEmitter객체를 찾을 수 있는 지 확인합니다. 이는 해당 WAS가 SSE응답을 보내야하는 클라이언트와 연결된 WAS인지 확인하는 과정이기도 합니다. SseEmitter를 하나라도 찾은 WAS는 SSE응답을 클라이언트에게 보내고 이 응답을 통해 클라이언트는 스스로 요청(polling)하지 않고 알림 내역을 비동기적으로 응답받을 수 있습니다.

1.기존 알림 기능

1-1. 문제점

  • 현재는 알림이 생성되는 시점에 클라이언트가 이를 알 수 있는 방법이 없고 단지 서버의 상태만 변경됩니다. 따라서 클라이언트에서 조회api를 호출해야만 알림이 갱신 됩니다. 이를 자동적으로 사용자 화면에 반영될 수 있도록 하려고 합니다.

1-2. 해결법

Short Polling

  • 클라이언트에서 주기적으로 서버로 요청을 보내는 방법입니다.
  • 서버에 대한 부담이 크지 않고 요청 주기를 넉넉하게 잡아도 될 정도로 실시간성이 중요하지 않다면 고려해 볼 만한 방법입니다.

Long Polling

  • 요청을 보내고 서버에서 변경이 일어날 때까지 대기하는 방법입니다.
  • Short Polling에 비해서는 서버의 부담이 줄어들지만 한번의 서버 변경마다 다시 연결을 재요청해야 합니다.

Server-Sent Events

  • 서버와 한번 연결을 맺고나면 일정 시간동안 서버에서 변경이 발생할 때마다 데이터를 전송받는 방법입니다.
  • Long Polling과 달리 서버에서 변경이 발생해도 다시 재연결 할 필요가 없습니다.
  • 최대 연결 수에 제한이 있습니다.

WebSocket

  • 서버와 연결을 유지하면서 양방향 통신을 유지할 수 있습니다.
  • SSE에 비해서 구현과 학습에 비용이 클 수 있습니다.

1-3.선택

  • 위의 해결법들의 장단점을 고려하여 SSE 방식을 선택했습니다.
  • polling방식의 경우 client에서 주기적으로 요청을 해야하기 때문에 요청량이 많아질 경우 비효율적으로 서버의 리소스 사용량이 증가합니다.
  • 알림 기능은 서버에서 클라이언트로 비동기적으로 통신을 할 수 있으면 되기 때문에 WebSocket을 선택하지 않기로 했습니다.

2. Server-Sent Events(SSE)

  • Server-Sent Events(이하 SSE)는 HTTP 스트리밍을 통해 서버에서 클라이언트로 단방향의 Push Notification을 전송할 수 있는 HTML5 표준 기술입니다.
  • HTTP/1.1 프로토콜 사용시 브라우저에서 1개 도메인에 대해 생성할 수 있는 EventSteam의 최대 개수는 6개로 제한됩니다. (HTTP/2 프로토콜 사용시에는 브라우저와 서버간의 조율로 최대 100개까지 유지가 가능합니다.)
  • 이벤트 데이터는 UTF-8 인코딩된 문자열만 지원합니다. 서버 사이드에서 이벤트 데이터를 담은 객체를 JSON으로 마샬링하여 전송하는 것이 가장 일반적입니다.
  • 현재 Internet Explorer을 제외한 모든 브라우저에서 지원합니다. JavaScript에서는 EventSource를 이용하여 연결 생성 및 전송된 이벤트에 대한 제어가 가능합니다.
  • Spring Framework는 4.2(2015년)부터 SseEmitter 클래스를 제공하여 서버 사이드에서의 SSE 통신 구현이 가능해졌습니다.

2-1. 기본적인 동작 흐름

1. 클라이언트에서 SSE 연결 요청을 보낸다.

GET /connect HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache

이벤트의 미디어 타입은 text/event-stream이 표준으로 정해져있습니다. 이벤트는 캐싱하지 않으며 지속적 연결을 사용해야합니다(HTTP 1.1에서는 기본적으로 지속 연결을 사용합니다).

2. 서버에서는 클라이언트와 매핑되는 SSE 통신 객체를 만들고 연결 확정 응답을 보낸다.

HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
  • 응답의 미디어 타입은 text/event-stream 입니다. 이때 Transfer-Encoding 헤더의 값을 chuncked로 설정합니다. 서버는 동적으로 생성된 컨텐츠를 스트리밍하기 때문에 본문의 크기를 미리 알 수 없기 때문입니다.

3. 서버에서 이벤트가 발생하면 해당 객체를 통해 클라이언트로 데이터를 전달한다.

  • 서버에서 SSE통신 객체를 유저마다 저장해놓고 해당 유저에게 응답을 보내야 할 경우 해당 객체를 활용하여 데이터를 전달합니다.

2-2. 만료 시간

  • SSE연결 시 서버에서 만료시간을 설정합니다.
  • EventStream의 만료 시간을 너무 길게 설정하는 것은 서버 입장에서 좋은 방법이 아닙니다. 운영 레벨에서 긴 수명을 가진 EventStream이 차지하는 커넥션과 쓰레드는 잠재적인 퍼포먼스 저하 요소가 될 수 있기 때문입니다. 또한, 서버 앞단의 로드 밸런서도 최대 연결 시간 설정에 제한이 있기 때문에 비지니스 로직을 고려하여 적절한 만료 시간을 정해야 합니다.
  • 만료가 도래하면 EventStream의 새로운 생성을 요청하기 때문에 큰 흐름에서 EventStream은 유지됩니다.

2-3. 구현 과정

2-3-1. SSE 구독

  • SSE 구독 api를 호출하면 서버에서 만료시간을 지정하여 SSE연결을 수립하고 연결 객체를 반환합니다.
  • 최초의 SSE연결 이후에 바로 한번 SSEEvent를 생성해서 클라이언트에 보내주어야합니다.
  • 만료 시간까지 아무런 데이터도 보내지 않으면 재연결 요청시 503 Service Unavailable 에러가 발생할 수 있습니다. 따라서 처음 SSE 연결 시 더미 데이터를 전달해주는 것이 안전합니다.
@Slf4j
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class AlarmController {

    private final AlarmService alarmService;

    @PreAuthorize("hasAuthority('alarm.read')")
    @Operation(summary = "알람 sse 구독", description = "알람 sse 구독, sseEmitter객체 반환.")
    @GetMapping(value = "/alarm/subscribe", produces = "text/event-stream")
    public SseEmitter alarmSubscribe(
        @ApiParam(hidden = true) @AuthenticationPrincipal UserDetails user,
        @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId,
        HttpServletResponse response) {
        //nginx리버스 프록시에서 버퍼링 기능으로 인한 오동작 방지
        response.setHeader("X-Accel-Buffering", "no");
        LocalDateTime now = CustomTimeUtils.nowWithoutNano();
        return alarmService.subscribe(user.getUsername(), lastEventId, now);
    }
}
@Slf4j
@RequiredArgsConstructor
@Transactional(readOnly = true)
@Service
public class AlarmService {
    @Value("${sse.timeout}")
    private String sseTimeout;
    private static final String UNDER_SCORE = "_";
    private static final String CONNECTED = "CONNECTED";
    private final AlarmRepository alarmRepository;
    private final UserRepository userRepository;
    private final SSEInMemoryRepository sseRepository;
    private final RedisTemplate<String, String> redisTemplate;

    public SseEmitter subscribe(String username, String lastEventId, LocalDateTime now) {
        Long userId = getUserByUsernameOrException(username).getId();
        SseEmitter sse = new SseEmitter(Long.parseLong(sseTimeout));
        String key = new SseRepositoryKeyRule(userId, SseEventName.ALARM_LIST,
            now).toCompleteKeyWhichSpecifyOnlyOneValue();

        sse.onCompletion(() -> {
            log.info("onCompletion callback");
            sseRepository.remove(key);
        });
        sse.onTimeout(() -> {
            log.info("onTimeout callback");
            //만료 시 Repository에서 삭제 되어야함.
            sse.complete();
        });

        sseRepository.put(key, sse);
        try {
            sse.send(SseEmitter.event()
                .name(CONNECTED)
                .id(getEventId(userId, now, SseEventName.ALARM_LIST))
                .data("subscribe"));
        } catch (IOException exception) {
            sseRepository.remove(key);
            log.info("SSE Exception: {}", exception.getMessage());
            throw new SseException(ErrorCode.SSE_SEND_ERROR);
        }

        // 중간에 연결이 끊겨서 다시 연결할 때, lastEventId를 통해 기존의 받지못한 이벤트를 받을 수 있도록 할 수 있음.
        // 한번의 알림이나 새로고침을 받으면 알림을 slice로 가져오기 때문에
        // 수신 못한 응답을 다시 보내는 로직을 구현하지 않음.
        return sse;
    }

    /**
     *  특정 유저의 특정 sse 이벤트에 대한 id를 생성한다.
     *  위 조건으로 여러개 정의 될 경우 now 로 구분한다.
     * @param userId
     * @param now
     * @param eventName
     * @return
     */
    private String getEventId(Long userId, LocalDateTime now, SseEventName eventName) {
        return userId + UNDER_SCORE + eventName.getValue() + UNDER_SCORE + now;
    }

    /**
     * redis pub시 userId와 sseEventName을 합쳐서 보낸다.
     * @param userId
     * @param sseEventName
     * @return
     */
    private String getRedisPubMessage(Long userId, SseEventName sseEventName) {
        return userId + UNDER_SCORE + sseEventName.getValue();
    }
    private User getUserByUsernameOrException(String username) {
        return userRepository.findByUsername(username)
            .orElseThrow(() -> new NoEntityException(
                ErrorCode.ENTITY_NOT_FOUND));
    }

}

2-3-2. SseEmitter저장

Inmemory ConcurrentHashMap.

  • SSE연결 시 생성된 객체를 통해 클라이언트에 응답을 보낼 수 있으므로 유저마다 이 객체를 조회 할 수 있도록 저장해야합니다.
  • 이 때 동시성 문제가 발생할 수 있기 때문에 일반 HashMap이 아니라 ConcurrentHashMap을 사용합니다.
@Slf4j
@Component
public class SSEInMemoryRepository implements SSERepository{
    private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    @Override
    public void put(String key, SseEmitter sseEmitter) {
        sseEmitterMap.put(key, sseEmitter);
    }
    @Override
    public Optional<SseEmitter> get(String key) {
        return Optional.ofNullable(sseEmitterMap.get(key));
    }
    @Override
    public List<SseEmitter> getListByKeyPrefix(String keyPrefix){
        return sseEmitterMap.keySet().stream()
            .filter(key -> key.startsWith(keyPrefix))
            .map(sseEmitterMap::get)
            .collect(Collectors.toList());
    }
    @Override
    public List<String> getKeyListByKeyPrefix(String keyPrefix){
        return sseEmitterMap.keySet().stream()
            .filter(key -> key.startsWith(keyPrefix))
            .collect(Collectors.toList());
    }
    @Override
    public void remove(String key) {
        sseEmitterMap.remove(key);
    }
}

Key값 구조

@RequiredArgsConstructor
@EqualsAndHashCode
public class SseRepositoryKeyRule {
    private static final String UNDER_SCORE = "_";

    private final Long userId;
    private final SseEventName sseEventName;
    private final LocalDateTime createdAt;

    /**
     * SSEInMemoryRepository에서 사용될
     * 특정 user에 대한 특정 브라우저,특정 SSEEventName에
     * 대한 SSEEmitter를 찾기 위한 key를 생성한다.
     * @return
     */
    public String toCompleteKeyWhichSpecifyOnlyOneValue() {

        String createdAtString = createdAt == null ? "" : createdAt.toString();
        return userId + UNDER_SCORE + sseEventName.getValue() + UNDER_SCORE + createdAtString;
    }


}
  • 또한 HTTP 1.1의 경우에 하나의 브라우저에서 6개까지의 연결을 할 수 있기 때문에 한명의 유저가 여러개의 SSE연결을 할 수 있습니다.
  • 따라서 Key값은 유저 정보와 생성정보를 포함하여 한명의 유저에 대해서 구분할 수 있도록 합니다.

2-4. 응답 보내기

  • 응답을 보낼 경우 sseRepository에서 SseEmitter를 찾고 id, name, data를 정하여 보냅니다.
  • id의 경우 보통 마지막으로 보내진 응답을 구분하여 비처 발송되지 못한 응답을 재응답해주는데에 활용합니다.
  • name은 SseEvent를 구분합니다.
  • data는 전송 내용을 의미하며 주의할 점은 객체가 아니라 String형태로 보내야합니다.
			SseEmitter emitter = sseRepository.get(key).get();
            try {
                emitter.send(SseEmitter.event()
                    .id(getEventId(userId, now, eventName))
                    .name(eventName.getValue())
                    .data(eventName.getValue()));
            } catch (IOException e) {
                sseRepository.remove(key);
                log.error("SSE send error", e);
                throw new SseException(ErrorCode.SSE_SEND_ERROR);
            }			

2-5. 클라이언트

  • EventSource객체를 생성하면 자동적으로 Connection이 수립됨.
  • 'connect' 라는 name이 SseEvent name과 같을 경우 event수신.
const sse = new EventSource("https://{domain}/connect");

sse.addEventListener('connect', (e) => {
	const { data: receivedConnectData } = e;
	console.log('connect event data: ',receivedConnectData);  // "connected!"
});

2-6. 트러블 슈팅

WAS Scale out시 문제

  • In-memory로 Emitter Repository를 구현할 경우 알림 응답을 보내는 것으로 선택된 WAS가 SseEmitter객체를
  • Redis Clustered Session처럼 SseEmitter객체를 분산 저장소에 저장하면 되지 않을까라는 생각을 했습니다.
  • 하지만 SSE의 경우 연결을 수립한 객체에서 응답을 보내야 하기 때문에 In memory에 저장하고 Redis pub/sub등을 활용하여 해당 WAS가 응답을 보내도록 하는 것이 일반적입니다.
  • 해당 내용은 뒤에서 설명하겠습니다.

SseEvent의 data전송시 String으로 보내야만함.

  • 원본객체를 그대로 보낼 경우 연결이 즉시 종료됩니다.

503 Service Unavailable

위에서 언급했듯이 처음에 SSE 응답을 할 때 아무런 이벤트도 보내지 않으면 재연결 요청을 보낼때나, 아니면 연결 요청 자체에서 오류가 발생합니다.

따라서 첫 SSE 응답을 보낼 시에는 반드시 더미 데이터라도 넣어서 데이터를 전달해야합니다.

헤더에 토큰 전달

SSE 연결 요청을 할 때 헤더에 JWT를 담아서 보내줘야했는데, EventSource 인터페이스는 기본적으로 헤더 전달을 지원하지 않는 문제가 있었습니다. event-source-polyfill 을 사용하면 헤더를 함께 보낼 수 있습니다.

JPA 사용시 Connection 고갈 문제

  • SSE 통신을 하는 동안은 HTTP Connection이 계속 열려있습니다. 만약 SSE 연결 응답 API에서 JPA를 사용하고 open-in-view 속성을 true로 설정했다면, HTTP Connection이 열려있는 동안 DB Connection도 유지됩니다.
  • 따라서 이 경우 open-in-view 설정을 반드시 false로 설정해야 합니다.

Nginx를 Reverse Proxy로 사용할 경우 주의할 점

HTTP 1.0

  • Nginx는 기본적으로 Upstream으로 요청을 보낼때 HTTP/1.0 버전을 사용합니다. 하지만 SSE는 HTTP/1.1 지속연결이 기본이기 때문에 이를 그대로 둘 경우 연결이 종료됩니다.
  • 따라서 다음과 같은 설정을 nginx설정에 추가해야합니다.
proxy_set_header Connection '';
proxy_http_version 1.1;

proxy buffering

  • nginx에는 버퍼링 기능이 있기 때문에 SSE의 실시간성이 떨어질 수 있습니다. 따라서 이 기능을 SSE응답에 대해서만 비활성화 해야합니다.
  • 백엔드의 응답 헤더에 X-accel로 시작하는 헤더가 있으면 Nginx가 이 정보를 이용해 내부적인 처리를 따로 하도록 만들 수 있습니다. 따라서 SSE 응답을 반환하는 API의 헤더에 X-Accel-Buffering: no를 붙여주면 SSE 응답만 버퍼링을 하지 않도록 설정할 수 있습니다.

3. Redis pub/sub을 활용하여 SSE Scale out시 문제 해결

  • 제가 구현한 어플리케이션은 WAS 두대 이상 존재하도록 Scale out되어 있기 때문에 SSE Event응답을 보낼 때 Redis pub/sub 활용하였습니다.

3-1.Redis pub/sub이란

기본적인 특징

redis 링크

  • subscribe시 특정 채널에 참여할 수 있습니다.
  • Redis pub/sub은 특정 채널에 publish하게 되면 구독한 Subscriber참여자들 모두에게 메시지가 보내집니다.
  • subscriber가 메시지 응답을 결정하는 것이 아니라 Redis 자체적으로 subscriber에게 응답하는 push모델 입니다.
  • 메시지를 보관하지 않습니다.

Kafka와의 차이점

  • Kafka의 경우 Consumer가 언제 어디부터 Consume할지를 능동적으로 정하여 이벤트를 수신받습니다.
  • 이벤트를 저장할 수 있고 유실되지 않도록 보장하는 시스템이 갖추어져 있습니다.
  • 해당 이벤트가 produce되었을 때 모든 Consumer Group에서 Consume하지 않을 수 있습니다.

Pub/Sub과 클러스터

클러스터에서 Publish하면 클론을 포함한 모든 노드에게 보냅니다. 따라서 마스터에서 publish한 메시지를 클론(슬레이브)에서 subscribe할 수 있습니다. 마스터 뿐만 아니라 클론에서도 publish할 수 있습니다. 클론에서 publish한 메시지를 다른 클론에서 subscribe할 수 있습니다.

자료구조

  • Redis pub/sub은 분산 메모리에 위치한 자료구조로 이해할 수도 있습니다.
  • 대략적으로 Dictionary자료구조에서 key값을 Channel이름, value에 Channel에 subscribe한 Client들의 정보를 LinkedList형태로 담고 있습니다.
  • subscribe시 value에 위치한 linkedlist에 Client정보가 추가되고 publish시 채널을 key값으로 Dictionary에서 Client정보가 담긴 LinkedList를 찾은 후 모든 Client들에게 메시지를 전달합니다.

3-2. 구현

Redis Connection 설정

  • Jedis보다는 Lettuce Client를 사용하는 것이 좋습니다.
  • RedisStaticMasterReplicaConfiguration에서는 pub/sub사용이 불가능하기 때문에 이를 활용하지 않아야합니다.
  • RedisClusterConfiguration Cluster연결 정보와 관련된 Bean이며 RedisConnectionFactory RedisClusterConfiguration를 바탕으로 실제 Redis 연결을 생성할 수 있는 Factory입니다.
  • MessageListenerAdapter는 RedisMessageSubscriber를 등록하고 이를 RedisMessageListenerContainer에 Topic과 함께 추가하면 해당 서버가 해당 Topic(Channel)에 구독하게 됩니다.
  • MessageListener인터페이스를 구현한 class에서 publish된 메시지를 읽습니다.
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisMessageSubscriber implements MessageListener {
  @Override
    public void onMessage(Message message, byte[] pattern) {}
    }
@Slf4j
@Configuration
@RequiredArgsConstructor
public class LettuceConnectionConfig {

    @Value("${spring.redis.cluster.nodes}")
    private String clusterNodes;
    @Value("${spring.redis.cluster.max-redirects}")
    private int maxRedirects;
    @Value("${spring.redis.password}")
    private String password;
    private final SSERepository sseRepository;


//    private final EntityManagerFactory entityManagerFactory;
//    private final DataSource dataSource;

    /*
     * Class <=> Json간 변환을 담당한다.
     *
     * json => object 변환시 readValue(File file, T.class) => json File을 읽어 T 클래스로 변환 readValue(Url url,
     * T.class) => url로 접속하여 데이터를 읽어와 T 클래스로 변환 readValue(String string, T.class) => string형식의
     * json데이터를 T 클래스로 변환
     *
     * object => json 변환시 writeValue(File file, T object) => object를 json file로 변환하여 저장
     * writeValueAsBytes(T object) => byte[] 형태로 object를 저장 writeValueAsString(T object) => string 형태로
     * object를 json형태로 저장
     *
     * json을 포매팅(개행 및 정렬) writerWithDefaultPrettyPrint().writeValueAs... 를 사용하면 json파일이 포맷팅하여 저장된다.
     * object mapper로 date값 변환시 timestamp형식이 아니라 JavaTimeModule() 로 변환하여 저장한다.
     */

    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        mapper.registerModules(new JavaTimeModule(), new Jdk8Module());
        return mapper;
    }


    /**
     * RedisStaticMasterReplicaConfiguration를 사용할 경우 pub/sub사용 불가
     * @return
     */
    @Bean
    public RedisClusterConfiguration redisClusterConfiguration() {
        List<String> clusterNodeList = Arrays.stream(StringUtils.split(clusterNodes, ','))
            .map(String::trim)
            .collect(Collectors.toList());
        RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(clusterNodeList);
        redisClusterConfiguration.setMaxRedirects(maxRedirects);
        redisClusterConfiguration.setPassword(password);
        return redisClusterConfiguration;
    }

    /*
     * Redis Connection Factory library별 특징
     * 1. Jedis - 멀티쓰레드환경에서 쓰레드 안전을 보장하지 않는다.
     *          - Connection pool을 사용하여 성능, 안정성 개선이 가능하지만 Lettuce보다 상대적으로 하드웨어적인 자원이 많이 필요하다.
     *          - 비동기 기능을 제공하지 않는다.
     *
     * 2. Lettuce - Netty 기반 redis client library
     *            - 비동기로 요청하기 때문에 Jedis에 비해 높은 성능을 가지고 있다.
     *            - TPS, 자원사용량 모두 Jedis에 비해 우수한 성능을 보인다는 테스트 사례가 있다.
     *
     * Jedis와 Lettuce의 성능 비교  https://jojoldu.tistory.com/418
     */
    @Bean
    public RedisConnectionFactory redisConnectionFactory(
        final RedisClusterConfiguration redisClusterConfiguration) {
        final SocketOptions socketOptions =
            SocketOptions.builder().connectTimeout(Duration.of(10, ChronoUnit.MINUTES)).build();

        final var clientOptions =
            ClientOptions.builder().socketOptions(socketOptions).autoReconnect(true).build();

        var clientConfig =
            LettuceClientConfiguration.builder()
                .clientOptions(clientOptions)
                .readFrom(REPLICA_PREFERRED);
//        if (useSSL) {
//            // aws elasticcache uses in-transit encryption therefore no need for providing certificates
//            clientConfig = clientConfig.useSsl().disablePeerVerification().and();
//        }

        return new LettuceConnectionFactory(
            redisClusterConfiguration, clientConfig.build());
    }


//    @Bean // 만약 PlatformTransactionManager 등록이 안되어 있다면 해야함, 되어있다면 할 필요 없음
//    public PlatformTransactionManager transactionManager() throws SQLException {
//        // 사용하고 있는 datasource 관련 내용, 아래는 JDBC
////        return new DataSourceTransactionManager(dataSource);
//
//        // JPA 사용하고 있다면 아래처럼 사용하고 있음
//        return new JpaTransactionManager(entityManagerFactory);
//    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(ObjectMapper objectMapper) {
        GenericJackson2JsonRedisSerializer serializer =
            new GenericJackson2JsonRedisSerializer(objectMapper);

        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory(redisClusterConfiguration()));
        // json 형식으로 데이터를 받을 때
        // 값이 깨지지 않도록 직렬화한다.
        // 저장할 클래스가 여러개일 경우 범용 JacksonSerializer인 GenericJackson2JsonRedisSerializer를 이용한다
        // 참고 https://somoly.tistory.com/134
        // setKeySerializer, setValueSerializer 설정해주는 이유는 RedisTemplate를 사용할 때 Spring - Redis 간 데이터 직렬화, 역직렬화 시 사용하는 방식이 Jdk 직렬화 방식이기 때문입니다.
        // 동작에는 문제가 없지만 redis-cli을 통해 직접 데이터를 보려고 할 때 알아볼 수 없는 형태로 출력되기 때문에 적용한 설정입니다.
        // 참고 https://wildeveloperetrain.tistory.com/32
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(serializer);
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(serializer);
        redisTemplate.setEnableTransactionSupport(true); // transaction 허용

        return redisTemplate;
    }
    @Bean
    ChannelTopic topic() {
        return new ChannelTopic(SseEventName.ALARM_LIST.getValue());
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(redisMessageSubscriber);
    }
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory(redisClusterConfiguration()));
        container.addMessageListener(messageListener(), topic());
        log.info("PubSubConfig init");
        return container;
    }

}

메시지 publish

    @Transactional
    public void send(Long alarmReceiverId, AlarmType alarmType, AlarmArgs alarmArgs, SseEventName sseEventName) {
   		redisTemplate.convertAndSend(sseEventName.getValue(),
            getRedisPubMessage(alarmReceiverId, sseEventName));
    }
    
    private String getRedisPubMessage(Long userId, SseEventName sseEventName) {
        return userId + UNDER_SCORE + sseEventName.getValue();
    }

메시지 subscribe(SSE응답 발송)

  • 모든 WAS가 메시지를 수신받고 알림을 보내려는 User와 SSE연결을 수립하고 SseEmitter객체를 저장하고 있는 WAS한대가 SSE 응답을 하게 됩니다.

@Slf4j
@RequiredArgsConstructor
@Service
public class RedisMessageSubscriber implements MessageListener {
    private static final String UNDER_SCORE = "_";
    private final SSERepository sseRepository;
    /**
     * 여러 서버에서 SSE를 구현하기 위한 Redis Pub/Sub
     * subscribe해두었던 topic에 publish가 일어나면 메서드가 호출된다.
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("Redis Pub/Sub message received: {}", message.toString());
        String[] strings = message.toString().split(UNDER_SCORE);
        Long userId = Long.parseLong(strings[0]);
        SseEventName eventName = SseEventName.getEnumFromValue(strings[1]);
        String keyPrefix = new SseRepositoryKeyRule(userId, eventName,
            null).toCompleteKeyWhichSpecifyOnlyOneValue();

        LocalDateTime now = CustomTimeUtils.nowWithoutNano();

        sseRepository.getKeyListByKeyPrefix(keyPrefix).forEach(key -> {
            SseEmitter emitter = sseRepository.get(key).get();
            try {
                emitter.send(SseEmitter.event()
                    .id(getEventId(userId, now, eventName))
                    .name(eventName.getValue())
                    .data(eventName.getValue()));
            } catch (IOException e) {
                sseRepository.remove(key);
                log.error("SSE send error", e);
                throw new SseException(ErrorCode.SSE_SEND_ERROR);
            }
        });
    }

    /**
     *  특정 유저의 특정 sse 이벤트에 대한 id를 생성한다.
     *  위 조건으로 여러개 정의 될 경우 now 로 구분한다.
     * @param userId
     * @param now
     * @param eventName
     * @return
     */
    private String getEventId(Long userId, LocalDateTime now, SseEventName eventName) {
        return userId + UNDER_SCORE + eventName.getValue() + UNDER_SCORE + now;
    }
}

트러블 슈팅

RedisStaticMasterReplicaConfiguration는 Redis pub/sub을 지원하지 않음.

RedisMessageListenerContainer에 MessageListenerAdapter를 등록할 때 CommandLineRunner에서 등록하면 통합테스트시 무한 로딩.

  • 빈 생성 메서드에서 등록되도록 변경하였습니다.
@Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory(redisClusterConfiguration()));
        container.addMessageListener(messageListener(), topic());
        log.info("PubSubConfig init");
        return container;
    }

4. Kafka로 알림

  • Redis pub/sub과 SSE를 활용하여 알림기능을 많은 부분에서 개선 시켰지만 몇가지 아쉬운 점이 있습니다.

4-1. 현재까지 구현의 문제점

주가 되는 비지니스 로직과 알림 생성 로직이 동기적으로 이루어 지는 것은 비효율적이며 의존성이 증가함.

@Transactional
public void comment(){
	// comment 생성
    commentRepository.save(comment);
    // 알림 생성
    alarmRepository.save(alarm);
    // 알림 SSE 응답 발생
    redisTemplate.convertAndSend()
}
  • 알림을 썼을 때 하나의 트랜잭션 내에서 동기적으로 댓글, 알림 생성, 알림 응답 발생 동안 DB커넥션과 트랜잭션을 유지한다면 서버의 자원 낭비가 발생할 수 있습니다.
  • 알림생성 및 통보와 관련된 기능에 댓글 기능 또한 의존성을 가지게 되어 알림영역에서의 문제가 발생 시 댓글이 정상적으로 작성되지 않을 수 있습니다.
  • 또한 알림 때문에 사용자 관점에서 Latency가 비교적 증가할 것입니다.

DB 상태 변경과 redis의 상태 변경이 동시에 성공하거나 동시에 실패함이 보장되지 않음.

  • 트랜잭션이 실패하여 DB는 롤백되더라도 redis에 pub/sub메시지가 전송되기 때문에 알림은 발송되는 이상한 현상이 발생할 수 있습니다.
  • 물론 현재 기능에서는 이와 같은 상황이 발생해도 기능상 큰 문제를 발생 시키지는 않지만 기술적으로 의도하지 않은 방향으로 동작하는 것은 분명합니다.

4-2 kafka를 도입하여 개선

Spring Boot 설정

@KafkaListener

  • Event Consume시 실행할 로직을 구현합니다.
  • @KafkaListener는 ConcurrentKafkaListenerContainerFactory를 특정하여 실행되는 방식이기 때문에 이를 고려하여 작성하면 됩니다.
  • 똑같은 produce된 event에 대해서 두가지 방식으로 (RDB, Redis)에서 Consumer되어야 하기 때문에 두가지 Consumer Group을 정의하고 각각 KafkaListener를 등록합니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {

    private final AlarmService alarmService;

    /**
     * offset을 최신으로 설정.
     * https://stackoverflow.com/questions/57163953/kafkalistener-consumerconfig-auto-offset-reset-doc-earliest-for-multiple-listene
     * @param alarmEvent
     * @param ack
     */
    @KafkaListener(topics = "${kafka.topic.alarm.name}", groupId = "${kafka.consumer.alarm.rdb-group-id}",
        properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"}, containerFactory = "kafkaListenerContainerFactoryRDB")
    public void createAlarmInRDBConsumerGroup(AlarmEvent alarmEvent, Acknowledgment ack) {
        log.info("createAlarmInRDBConsumerGroup");
        alarmService.createAlarm(alarmEvent.getUserId(), alarmEvent.getType(), alarmEvent.getArgs());
        ack.acknowledge();
    }

    @KafkaListener(topics = "${kafka.topic.alarm.name}", groupId = "${kafka.consumer.alarm.redis-group-id}",
        properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"}, containerFactory = "kafkaListenerContainerFactoryRedis")
    public void redisPublishConsumerGroup(AlarmEvent alarmEvent, Acknowledgment ack) {
        log.info("redisPublishConsumerGroup");
        alarmService.send(alarmEvent.getUserId(),
            alarmEvent.getEventName());
        ack.acknowledge();
    }
}
profile
Fail Fast

0개의 댓글