알람 SSE (Server-Sent-Events)

Jung In Lee·2024년 4월 4일
0

Novelit

목록 보기
4/7

Novelit 복구

알람

(1) SSE

  • 알림 구현방식에는 웹소켓 방식을 고려하기도하는데, 사실 서버에서 발생하는 이벤트를 일방적으로 전송만하면되서, SSE 구현방식이 더 적합하다고 보았다.

  • AlarmController

@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/api")
public class AlarmController {

    private final AlarmService alarmService;

    @GetMapping(value = "/alarm/subscribe", produces = "text/event-stream")
    public SseEmitter alarmSubscribe(
            @RequestParam("username") String username,
            @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId,
            HttpServletResponse response){
        response.setHeader("X-Accel-Buffering", "no");
        LocalDateTime now = LocalDateTime.now();
        return alarmService.subscribe(username, lastEventId, now);
    }
}
  • 우선적으로 DB에 있는 회원을 상대로 알람을 구독하고, 자신이 받는 로직으로 테스트를 해볼것이다. 시큐리티는 적용하지않는다.

  • AlarmService

package com.example.practice.service.alarm;

import com.example.practice.domain.sse.SseEventName;
import com.example.practice.domain.member.Member;
import com.example.practice.exception.ErrorCode;
import com.example.practice.exception.NoEntityException;
import com.example.practice.exception.SseException;
import com.example.practice.repository.alarm.AlarmRepository;
import com.example.practice.repository.alarm.SseInMemoryRepository;
import com.example.practice.repository.alarm.SseRepositoryKeyRule;
import com.example.practice.repository.member.MemberRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Map;

@Slf4j
@RequiredArgsConstructor
@Transactional(readOnly = true)
@Service
public class AlarmService {

    private Long sseTimeout = 60L * 60 * 1000;
    private static final String UNDER_SCORE = "_";
    private static final String CONNECTED = "CONNECTED";
    private final AlarmRepository alarmRepository;
    private final MemberRepository memberRepository;
    private final SseInMemoryRepository sseRepository;
    //private final RedisTemplate<String, String> redisTemplate;


    public SseEmitter subscribe(String username, String lastEventId, LocalDateTime now) {
        //Long userId = getMemberByUsernameOrException(username).getId();
        Long userId = memberRepository.findByUsername(username).get().getId();
        System.out.println(userId);
        SseEmitter sse = new SseEmitter(sseTimeout);
        String key = new SseRepositoryKeyRule(userId, SseEventName.ALARM_LIST,
                now).toCompleteKeyWhichSpecifyOnlyOneValue();

        sse.onCompletion(()->{
            log.info("onCompletion callback");
            sseRepository.remove(key);
        });
        sse.onTimeout(()->{
            log.info("onTimeout callback");
            sse.complete();
        });

        sseRepository.put(key, sse);
        try {
            sse.send(SseEmitter.event()
                    .name(CONNECTED)
                    .id(getEventId(userId, now, SseEventName.ALARM_LIST))
                    .data("subscribe"));
        } catch (IOException exception) {
            sseRepository.remove(key);
            log.info("SSE Excetpion: {}", exception.getMessage());
            throw new SseException(ErrorCode.SSE_SEND_ERROR);
        }

        return sse;
    }

    public void send(String username) {
        Long userId = memberRepository.findByUsername(username).get().getId();
        System.out.println(userId);
        LocalDateTime now = LocalDateTime.now();

        String keyPrefix = new SseRepositoryKeyRule(userId, SseEventName.ALARM_LIST,
                null).toCompleteKeyWhichSpecifyOnlyOneValue();

        sseRepository.getKeyListByKeyPrefix(keyPrefix).forEach(key ->{
            SseEmitter emitter = sseRepository.get(key).get();
            try{
                emitter.send(SseEmitter.event()
                        .id(getEventId(userId,now,SseEventName.ALARM_LIST))
                        .name(CONNECTED)
                        .data("알림성공"));
            }
            catch (IOException e){
                sseRepository.remove(key);
                log.error("SSE send error", e);
                throw new SseException(ErrorCode.SSE_SEND_ERROR);
            }
        });
    }

    private String getEventId(Long userId, LocalDateTime now, SseEventName eventName) {
        return userId + UNDER_SCORE + eventName.getValue() + UNDER_SCORE + now;
    }

    /*private String getRedisPubMessage(Long userId, SseEventName sseEventName) {
        return userId + UNDER_SCORE + sseEventName.getValue();
    }

    private Member getMemberByUsernameOrException(String username) {
        return memberRepository.findByUsername(username)
                .orElseThrow(() -> new NoEntityException(
                        ErrorCode.ENTITY_NOT_FOUND
                ));
    }*/
}
  • 향후 Redis pub/sub -> kafka를 사용하려 고도화를 하려고한다. redis관련 코드는 주석처리하였다.

  • SSE Timeout 시간은 1시간이다. (60분 x 60초 x 1000ms)

  • SSE Eventname은 "CONNECT"이다. 이에 대해서 통신을 주고받는다.

  • subscribe()
    - 우선 username을 바탕으로 기본키를 받아온다.
    기본키+ENUM+시간 을 묶어 key로 만들어준다. 이것은 SseEmitter의 key에 해당한다.
    - SseEmitter는 Map형태로 메모리에 저장되는데, <key,sseEmitter> 형태를갖는다.
    - subscribe 함수를 통해 sseemitter를 생성해, 메모리에 저장한다.

  • send()
    send함수에서 메모리에 저장되어있는 key+Enum으로 시작하는 모든 emitter을 찾아 이벤트 발생시, 알림을 보낸다.

  • 알림을 보내고싶은 api에 넣는 방식으로 이렇게 사용하면된다.

	@GetMapping("/test") // react 연동
    public String hello(){
        alarmService.send("hasung");
        return "테스트입니다";
    }
  • SseInMemoryRepository
package com.example.practice.repository.alarm;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Slf4j
@Component
public class SseInMemoryRepository implements SseRepository{
    private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    @Override
    public void put(String key, SseEmitter sseEmitter) {
        sseEmitterMap.put(key, sseEmitter);
    }

    @Override
    public Optional<SseEmitter> get(String key) {
        return Optional.ofNullable(sseEmitterMap.get(key));
    }

    @Override
    public List<SseEmitter> getListByKeyPrefix(String keyPrefix) {
        return sseEmitterMap.keySet().stream()
                .filter(key -> key.startsWith(keyPrefix))
                .map(sseEmitterMap::get)
                .collect(Collectors.toList());
    }

    @Override
    public List<String> getKeyListByKeyPrefix(String keyPrefix) {
        return sseEmitterMap.keySet().stream()
                .filter(key -> key.startsWith(keyPrefix))
                .collect(Collectors.toList());
    }

    @Override
    public void remove(String key) {
        sseEmitterMap.remove(key);
    }
}
  • SseInMemoryRepository다. 여기서 기존과 다른 것은 Prefix로 가져오는 방식이 List로 되어있다는 점인데, 기존의 Novelit에서 구현했던 코드와는 다르게 이쪽이 더 직관적이다.
  • Repository로부터 가져온 Emitter를 저장하고 forEach로 돌리는 것보다는, 아예 처음부터 List로 가져오고 forEach로 돌리는 방식이 뭔가 보기 편하다.
Map<String,SseEmitter> sseEmitters = emitterRepository.findAllEmittersStartWithId(subscriberUUID);


        sseEmitters.forEach(
            (key, emitter) -> {
                // 데이터 캐시 저장 (유실된 데이터 처리 위함)
                emitterRepository.saveEventCache(key, notificationResponseDto);

                sendToClient(emitter, key, "alertComment", notificationResponseDto);

                // 알림 레디스에 저장
                alarmRedisService.save(AlarmRedisRequestDto.builder()
                    //                    .pubUUID(publisherUUID)
                    .pubName(commentNickname)
                    .subUUID(subscriberUUID)
                    .directoryName(directoryName)
                    .build());
            }
        );
sseRepository.getKeyListByKeyPrefix(keyPrefix).forEach(key ->{
            SseEmitter emitter = sseRepository.get(key).get();
            try{
                emitter.send(SseEmitter.event()
                        .id(getEventId(userId,now,SseEventName.ALARM_LIST))
                        .name(CONNECTED)
                        .data("알림성공"));
            }
            catch (IOException e){
                sseRepository.remove(key);
                log.error("SSE send error", e);
                throw new SseException(ErrorCode.SSE_SEND_ERROR);
            }
        });

알림 테스트 확인

  • 코드 분석은 끝났고, 알림 테스트를 해보자. 먼저, 포스트맨으로 회원가입을 해주고,

    통신을 subscribe시킨다.

    sse응답을 위한 api를 실행시키고,

    이전 포스트맨에 들어가서 정상적으로 메세지가 출력되면, sse를 통한 알림방식이 정상적으로 구현된것이다.
profile
Spring Backend Developer

0개의 댓글