전 포스팅에서 알림 기능을 위해서 채택한 방법과 SSE가 무엇인지 SSE의 동작 방식, 구현 방법에 대해서 적어봤다.
이번 포스팅에서는 이를 바탕으로 현재 진행중인 캡스톤디자인에 적용해보는 시간을 가져보겠다.
일단 이번 포스팅에서는 모임에 가입한 사람이 있다면 해당 모임원들에게 알림을 보내는 기능을 개발해보겠다.
const sse = new EventSource("http://localhost:8080/connect");
@RestController
@Slf4j
@RequiredArgsConstructor
public class SseController {
private final NotificationService notificationService;
// SSE 연결 요청
@GetMapping(value="/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> connect(@PathVariable Long userId) {
return ResponseEntity.ok(notificationService.connect(userId));
}
// 모임 가입 공지 이벤트
@PostMapping("/join-club")
public void notifyJoinClub(@RequestBody RequestJoinClub requestJoinClub) {
notificationService.notifyJoinClub(requestJoinClub);
}
}
@Service
@RequiredArgsConstructor
public class SseService {
// 기본 타임아웃 설정
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
private final ClubServiceClient clubServiceClient;
private final UserServiceClient userServiceClient;
/**
* 클라이언트가 연결을 위해 호출하는 메서드.
*
* @param userId - 구독하는 클라이언트의 사용자 아이디.
* @return SseEmitter - 서버에서 보낸 이벤트 Emitter
*/
public SseEmitter connect(Long userId) {
SseEmitter emitter = createEmitter(userId);
sendToClient(userId, "Connected! [userId=" + userId + "]");
return emitter;
}
/**
* 서버의 이벤트를 클라이언트에게 보내는 메서드
* 다른 서비스 로직에서 이 메서드를 사용해 데이터를 Object event에 넣고 전송하면 된다.
*
* @param requestJoinClub - 메세지를 전송할 사용자의 모임 아이디, 사용자 아이디
*/
public void notifyJoinClub(RequestJoinClub requestJoinClub) {
sendToClubMembers(requestJoinClub);
}
/**
* 클라이언트에게 데이터를 전송
*
* @param id - 데이터를 받을 사용자의 아이디.
* @param data - 전송할 데이터.
*/
private void sendToClient(Long id, Object data) {
SseEmitter emitter = emitterRepository.get(id);
if (emitter != null) {
try {
emitter.send(SseEmitter.event().name("connect").data(data));
} catch (IOException exception) {
emitterRepository.deleteById(id);
emitter.completeWithError(exception);
}
}
}
/**
* 클라이언트에게 데이터를 전송
*
* @param requestJoinClub - 메세지를 전송할 사용자의 모임 아이디, 사용자 아이디
*/
private void sendToClubMembers(RequestJoinClub requestJoinClub) {
ResponseClubMemberIdsByClubId clubMemberIds = clubServiceClient.getClubMemberIds(requestJoinClub.getClubId());
String clubName = clubServiceClient.getClubNameById(requestJoinClub.getClubId());
clubMemberIds.getUserIdList().stream()
.forEach(userId -> {
SseEmitter emitter = emitterRepository.get(userId);
String userName = userServiceClient.getUsernameById(userId);
if(emitter != null) {
try {
emitter.send(SseEmitter.event().name("notifyJoinClubMember").data(userName + "님 께서" + clubName + "에 가입했습니다."));
} catch (IOException exception) {
emitterRepository.deleteById(userId);
emitter.completeWithError(exception);
}
}
});
}
/**
* 사용자 아이디를 기반으로 이벤트 Emitter를 생성
*
* @param userId - 사용자 아이디.
* @return SseEmitter - 생성된 이벤트 Emitter.
*/
private SseEmitter createEmitter(Long userId) {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitterRepository.save(userId, emitter);
// Emitter가 완료될 때(모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
emitter.onCompletion(() -> emitterRepository.deleteById(userId));
// Emitter가 타임아웃 되었을 때(지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때) Emitter를 삭제한다.
emitter.onTimeout(() -> emitterRepository.deleteById(userId));
return emitter;
}
}
@Repository
@RequiredArgsConstructor
public class EmitterRepository {
// 모든 Emitters를 저장하는 ConcurrentHashMap
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
/**
* 주어진 아이디와 이미터를 저장
*
* @param userId - 사용자 아이디.
* @param emitter - 이벤트 Emitter.
*/
public void save(Long userId, SseEmitter emitter) {
emitters.put(userId, emitter);
}
/**
* 주어진 아이디의 Emitter를 제거
*
* @param userId - 사용자 아이디.
*/
public void deleteById(Long userId) {
emitters.remove(userId);
}
/**
* 주어진 아이디의 Emitter를 가져옴.
*
* @param userId - 사용자 아이디.
* @return SseEmitter - 이벤트 Emitter.
*/
public SseEmitter get(Long userId) {
return emitters.get(userId);
}
}
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitterRepository.save(userId, emitter);
SseEmitter
객체는 향후 이벤트가 발생했을 때 해당 클라이언트로 이벤트를 전송하기 위해 사용되므로 서버에서 저장하고 있어야 한다.sendToClient(userId, "Connected! [userId=" + userId + "]");
server 코드
emitter.send(SseEmitter.event().name("connect").data(data));
client 코드
const sse = new EventSource("http://localhost:8080/connect");
sse.addEventListener('connect', (e) => {
const { data: receivedConnectData } = e;
console.log('connect event data: ',receivedConnectData);
});
// Emitter가 완료될 때(모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
emitter.onCompletion(() -> emitterRepository.deleteById(userId));
SseEmitter
를 생성할 때는 비동기 요청이 완료되거나 타임아웃 발생 시 실행할 콜백을 등록할 수 있다. 타임아웃이 발생하면 브라우저에서 재연결 요청을 보내는데, 이때 새로운 Emitter 객체를 다시 생성하기 때문에 기존의 Emitter
를 제거해주어야 한다. 따라서 onCompletion 콜백에서 자기 자신을 지우도록 등록해야한다.
// 모든 Emitters를 저장하는 ConcurrentHashMap
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
주의할 점은 이 콜백이 SseEmitter
를 관리하는 다른 스레드에서 실행된다는 것이다. 따라서 thread-safe
한 자료구조를 사용하지 않으면 ConcurrnetModificationException
이 발생할 수 있다. 여기서는 thread-safe
한 자료구조인 ConcurrentHashMap
를 사용했다.
마지막으로 모임원 가입 알림 기능을 설명해보겠다.
club-service의 ClubMemberServiceImpl
@Transactional
@Override
public Long createClubMember(RequestJoinClubMember requestJoinClubMember) {
Club club = clubRepository.findById(requestJoinClubMember.getClubId())
.orElseThrow(() -> new NoSuchElementException("Club not found with id" + requestJoinClubMember.getClubId()));
alarmServiceClient.notifyJoinClub(RequestJoinClub.builder()
.clubId(club.getId())
.userId(requestJoinClubMember.getUserId())
.build());
return clubMemberRepository.save(requestJoinClubMember.joinClubMember(club)).getId();
}
위 코드는 club-service
의 ClubMemberServiceImpl
클래스이다. createClubMember 메서드를 통해 모임원 가입이 이루어지면 FeignClient를 통해 alarm-service의 notifyJoinClub() 메서드를 실행시켜준다.
alarm-service의 SseController
@PostMapping("/join-club")
public void notifyJoinClub(@RequestBody RequestJoinClub requestJoinClub) {
notificationService.notifyJoinClub(requestJoinClub);
}
alram-service의 SseService
public void notifyJoinClub(RequestJoinClub requestJoinClub) {
sendToClubMembers(requestJoinClub);
}
private void sendToClubMembers(RequestJoinClub requestJoinClub) {
ResponseClubMemberIdsByClubId clubMemberIds = clubServiceClient.getClubMemberIds(requestJoinClub.getClubId());
String clubName = clubServiceClient.getClubNameById(requestJoinClub.getClubId());
clubMemberIds.getUserIdList().stream()
.forEach(userId -> {
SseEmitter emitter = emitterRepository.get(userId);
String userName = userServiceClient.getUsernameById(userId);
if(emitter != null) {
try {
emitter.send(SseEmitter.event().name("notifyJoinClubMember").data(userName + "님 께서" + clubName + "에 가입했습니다."));
} catch (IOException exception) {
emitterRepository.deleteById(userId);
emitter.completeWithError(exception);
}
}
});
}
club-service
에서 해당 모임의 모임원들의 Id 리스트와 해당 모임의 이름을 가져온다.notifyJoinClubMember
이벤트를 통해 user-service
를 통해 가져온 해당 모임원의 이름으로 해당 모임원이 모임에 가입했다는 알림을 Client로 전달한다.이렇게 SSE를 통해 모임가입 알림을 만들어봤다. 이를 통해 SSE에 대해서 알게 됐고 앞으로도 잘 활용할 수 있을 것 같다. 이제 남은 알림 기능들을 구현해봐야겠다.