[캡스톤디자인] SSE를 통한 알림 기능 구현 (2)

Dev_Sanizzang·2023년 8월 17일
0

캡스톤디자인

목록 보기
13/15

📕 서론

전 포스팅에서 알림 기능을 위해서 채택한 방법과 SSE가 무엇인지 SSE의 동작 방식, 구현 방법에 대해서 적어봤다.
이번 포스팅에서는 이를 바탕으로 현재 진행중인 캡스톤디자인에 적용해보는 시간을 가져보겠다.

🤔 어떤 알림을 구현할 것이냐

일단 이번 포스팅에서는 모임에 가입한 사람이 있다면 해당 모임원들에게 알림을 보내는 기능을 개발해보겠다.

🌐 Client SSE 연결 요청

const sse = new EventSource("http://localhost:8080/connect");
  • 클라이언트에서는 EventSource라는 인터페이스로 SSE 요청을 할 수있다.

📃 SseController

@RestController
@Slf4j
@RequiredArgsConstructor
public class SseController {

    private final NotificationService notificationService;

	// SSE 연결 요청
    @GetMapping(value="/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> connect(@PathVariable Long userId) {
        return ResponseEntity.ok(notificationService.connect(userId));
    }

    // 모임 가입 공지 이벤트
    @PostMapping("/join-club")
    public void notifyJoinClub(@RequestBody RequestJoinClub requestJoinClub) {
        notificationService.notifyJoinClub(requestJoinClub);
    }

}
  • spring framework 4.2부터 SSE 통신을 지원하는 SseEmitter API를 제공한다. 이를 이용해 SSE 구독 요청에 대한 응답을 할 수 있다.
  • 클라이언트에서 구독을 위한 connect 메서드와 모임 가입 공지 이벤트 메서드이다.

📃 SseService

@Service
@RequiredArgsConstructor
public class SseService {
    // 기본 타임아웃 설정
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;

    private final EmitterRepository emitterRepository;
    private final ClubServiceClient clubServiceClient;
    private final UserServiceClient userServiceClient;

    /**
     * 클라이언트가 연결을 위해 호출하는 메서드.
     *
     * @param userId - 구독하는 클라이언트의 사용자 아이디.
     * @return SseEmitter - 서버에서 보낸 이벤트 Emitter
     */
    public SseEmitter connect(Long userId) {
        SseEmitter emitter = createEmitter(userId);

        sendToClient(userId, "Connected! [userId=" + userId + "]");
        return emitter;
    }

    /**
     * 서버의 이벤트를 클라이언트에게 보내는 메서드
     * 다른 서비스 로직에서 이 메서드를 사용해 데이터를 Object event에 넣고 전송하면 된다.
     *
     * @param requestJoinClub - 메세지를 전송할 사용자의 모임 아이디, 사용자 아이디
     */
    public void notifyJoinClub(RequestJoinClub requestJoinClub) {
        sendToClubMembers(requestJoinClub);
    }

    /**
     * 클라이언트에게 데이터를 전송
     *
     * @param id   - 데이터를 받을 사용자의 아이디.
     * @param data - 전송할 데이터.
     */
    private void sendToClient(Long id, Object data) {
        SseEmitter emitter = emitterRepository.get(id);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event().name("connect").data(data));
            } catch (IOException exception) {
                emitterRepository.deleteById(id);
                emitter.completeWithError(exception);
            }
        }
    }

    /**
     * 클라이언트에게 데이터를 전송
     *
     * @param requestJoinClub - 메세지를 전송할 사용자의 모임 아이디, 사용자 아이디
     */
    private void sendToClubMembers(RequestJoinClub requestJoinClub) {
        ResponseClubMemberIdsByClubId clubMemberIds = clubServiceClient.getClubMemberIds(requestJoinClub.getClubId());
        String clubName = clubServiceClient.getClubNameById(requestJoinClub.getClubId());

        clubMemberIds.getUserIdList().stream()
                .forEach(userId -> {
                    SseEmitter emitter = emitterRepository.get(userId);
                    String userName = userServiceClient.getUsernameById(userId);
                    if(emitter != null) {
                        try {
                            emitter.send(SseEmitter.event().name("notifyJoinClubMember").data(userName + "님 께서" + clubName + "에 가입했습니다."));
                        } catch (IOException exception) {
                            emitterRepository.deleteById(userId);
                            emitter.completeWithError(exception);
                        }
                    }
                });
    }

    /**
     * 사용자 아이디를 기반으로 이벤트 Emitter를 생성
     *
     * @param userId - 사용자 아이디.
     * @return SseEmitter - 생성된 이벤트 Emitter.
     */
    private SseEmitter createEmitter(Long userId) {
        SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
        emitterRepository.save(userId, emitter);

        // Emitter가 완료될 때(모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
        emitter.onCompletion(() -> emitterRepository.deleteById(userId));
        // Emitter가 타임아웃 되었을 때(지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때) Emitter를 삭제한다.
        emitter.onTimeout(() -> emitterRepository.deleteById(userId));

        return emitter;
    }
}

📃 EmitterRepository

@Repository
@RequiredArgsConstructor
public class EmitterRepository {
    // 모든 Emitters를 저장하는 ConcurrentHashMap
    private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();

    /**
     * 주어진 아이디와 이미터를 저장
     *
     * @param userId      - 사용자 아이디.
     * @param emitter - 이벤트 Emitter.
     */
    public void save(Long userId, SseEmitter emitter) {
        emitters.put(userId, emitter);
    }

    /**
     * 주어진 아이디의 Emitter를 제거
     *
     * @param userId - 사용자 아이디.
     */
    public void deleteById(Long userId) {
        emitters.remove(userId);
    }

    /**
     * 주어진 아이디의 Emitter를 가져옴.
     *
     * @param userId - 사용자 아이디.
     * @return SseEmitter - 이벤트 Emitter.
     */
    public SseEmitter get(Long userId) {
        return emitters.get(userId);
    }
}

🔍 세부 설명

SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
  • 생성자를 통해 만료시간을 설정할 수 있다.
  • 스프링 부트의 내장 톰캣을 사용하면 30초로 설정된다.
  • 만료 시간이 되면 브라우저에서 자동으로 서버에 재연결 요청을 보낸다.
emitterRepository.save(userId, emitter);
  • 이때 생성된 SseEmitter 객체는 향후 이벤트가 발생했을 때 해당 클라이언트로 이벤트를 전송하기 위해 사용되므로 서버에서 저장하고 있어야 한다.
sendToClient(userId, "Connected! [userId=" + userId + "]");
  • Emitter를 생성하고 나서 만료시간까지 아무런 데이터도 보내지 않으면 재연결 요청시 503 Service Unavailable 에러가 발생할 수 있다. 따라서 처음 SSE 연결 시 더미 데이터를 전달해주는 것이 안전하다.

server 코드

emitter.send(SseEmitter.event().name("connect").data(data));

client 코드

const sse = new EventSource("http://localhost:8080/connect");

sse.addEventListener('connect', (e) => {
	const { data: receivedConnectData } = e;
	console.log('connect event data: ',receivedConnectData);
});
  • 이벤트 이름을 설정해주면 클라이언트에서 해당 이름으로 이벤트를 받을 수 있다.
// Emitter가 완료될 때(모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
emitter.onCompletion(() -> emitterRepository.deleteById(userId));

SseEmitter를 생성할 때는 비동기 요청이 완료되거나 타임아웃 발생 시 실행할 콜백을 등록할 수 있다. 타임아웃이 발생하면 브라우저에서 재연결 요청을 보내는데, 이때 새로운 Emitter 객체를 다시 생성하기 때문에 기존의 Emitter를 제거해주어야 한다. 따라서 onCompletion 콜백에서 자기 자신을 지우도록 등록해야한다.

// 모든 Emitters를 저장하는 ConcurrentHashMap
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();

주의할 점은 이 콜백이 SseEmitter를 관리하는 다른 스레드에서 실행된다는 것이다. 따라서 thread-safe한 자료구조를 사용하지 않으면 ConcurrnetModificationException이 발생할 수 있다. 여기서는 thread-safe한 자료구조인 ConcurrentHashMap를 사용했다.

💡 모임원 가입 알림 설명

마지막으로 모임원 가입 알림 기능을 설명해보겠다.

club-service의 ClubMemberServiceImpl

	@Transactional
    @Override
    public Long createClubMember(RequestJoinClubMember requestJoinClubMember) {
        Club club = clubRepository.findById(requestJoinClubMember.getClubId())
                .orElseThrow(() -> new NoSuchElementException("Club not found with id" + requestJoinClubMember.getClubId()));

        alarmServiceClient.notifyJoinClub(RequestJoinClub.builder()
                                            .clubId(club.getId())
                                            .userId(requestJoinClubMember.getUserId())
                                            .build());
        
        return clubMemberRepository.save(requestJoinClubMember.joinClubMember(club)).getId();
    }

위 코드는 club-serviceClubMemberServiceImpl 클래스이다. createClubMember 메서드를 통해 모임원 가입이 이루어지면 FeignClient를 통해 alarm-service의 notifyJoinClub() 메서드를 실행시켜준다.

alarm-service의 SseController

    @PostMapping("/join-club")
    public void notifyJoinClub(@RequestBody RequestJoinClub requestJoinClub) {
        notificationService.notifyJoinClub(requestJoinClub);
    }

alram-service의 SseService

    public void notifyJoinClub(RequestJoinClub requestJoinClub) {
        sendToClubMembers(requestJoinClub);
    }
    
        private void sendToClubMembers(RequestJoinClub requestJoinClub) {
        ResponseClubMemberIdsByClubId clubMemberIds = clubServiceClient.getClubMemberIds(requestJoinClub.getClubId());
        String clubName = clubServiceClient.getClubNameById(requestJoinClub.getClubId());

        clubMemberIds.getUserIdList().stream()
                .forEach(userId -> {
                    SseEmitter emitter = emitterRepository.get(userId);
                    String userName = userServiceClient.getUsernameById(userId);
                    if(emitter != null) {
                        try {
                            emitter.send(SseEmitter.event().name("notifyJoinClubMember").data(userName + "님 께서" + clubName + "에 가입했습니다."));
                        } catch (IOException exception) {
                            emitterRepository.deleteById(userId);
                            emitter.completeWithError(exception);
                        }
                    }
                });
    }
  • FeignClient를 통해 club-service에서 해당 모임의 모임원들의 Id 리스트와 해당 모임의 이름을 가져온다.
  • 해당 모임원의 id에 해당하는 사람들에게 notifyJoinClubMember 이벤트를 통해 user-service를 통해 가져온 해당 모임원의 이름으로 해당 모임원이 모임에 가입했다는 알림을 Client로 전달한다.

🚪 마무리

이렇게 SSE를 통해 모임가입 알림을 만들어봤다. 이를 통해 SSE에 대해서 알게 됐고 앞으로도 잘 활용할 수 있을 것 같다. 이제 남은 알림 기능들을 구현해봐야겠다.

profile
기록을 통해 성장합니다.

0개의 댓글