
마일스톤 3.0 목표로 마지막 작업인 알림 기능 구현 업무가 남았다..!
알림 기능을 어떻게 구현할 지 찾아보다가 sse 방식을 이용해서 구현하기로 했다.
sse방식을 택한 이유는 2가지가 있다.
firebase를 이용해서 웹 푸시를 보내려 했으나, 유저가 삭제하거나 특정 삭제 조건이 아니라면 이전 알림들을 다 저장해서 보내주기로 했기때문에 firebase를 이용한 알림 방법은 적절하지 않다고 판단했다.
다른 기능들 (게시판 글 등록같은..) 처럼 HTTP 표준 위에 구축한다면 이벤트 발생 시에 접속하지 않은 유저에게 알림을 보내줄 수 없을것이라 생각했다.
따라서 Server-Sent-Events 방식인 SSE 방법을 이용하여 특정 이벤트 발생 시 유저에게 알림을 보내주기로 결정했다.
SSE 방식에 대해 간단하게 정리하면, 클라이언트가 서버를 구독하면 (SSE Connection을 맺으면) 서버는 변동사항이 생길때마다 구독한 클라이언트들에게 데이터를 전송하는 방식이다.
물론 HTTP를 통한 SSE (HTTP/2가 아닐경우) 는 브라우저 당 6개로 연결이 제한되므로, 사용자가 웹사이트의 여러 탭을 열면 첫 6개의 탭 이후에는 SSE가 동작하지 않는다는 단점이 있다.
HTTP/2에서는 100개까지 접속을 허용한다고한다.
이런 단점이 있지만, SSE는 클라가 서버와 처음 연결만되면 크게 통신할 필요없이 그저 업데이트된 데이터만 할때 좋은 방법이라고 판단했다.
SSE방식 외에 다른 통신 방법은
reference1
reference2
를 참고해서 공부했다.
코드는 다음과 같다.
Controller
@RestController
@RequestMapping("/members/alrams")
@RequiredArgsConstructor
public class AlarmController {
private final AlarmService alarmService;
private final MemberService memberService;
private final AlarmMapper mapper;
/**
* 특정 멤버 모든 알림 불러오기
*
*/
@GetMapping
public ResponseEntity getAlarm(){
Long memberId = extractMemberId();
List<Alarm> alarms = alarmService.getAlarms(memberId);
return new ResponseEntity<>(mapper.alarmToAlarmResponseDto(alarms), HttpStatus.OK);
}
/**
* 알림 구독.
* 클라에서 구독하는 요청을 보내면,
* 컨트롤러에서 SseEmitter를 만들어주는 서비스 레이어를 통해 전달 받은 SseEmitter를 반환한다.
*/
@GetMapping(value = "/sub", produces = "text/event-stream")
public SseEmitter subscribe(
@RequestParam(value = "lastEventId", required = false, defaultValue = "") String lastEventId) {
Long memberId = extractMemberId();
return alarmService.subscribe(memberId, lastEventId);
}
//알림 모두 삭제
@DeleteMapping
public void deleteAlarmAll(){
Long memberId = extractMemberId();
alarmService.deleteAllAlarm(memberId);
}
//memberId 추출
private Long extractMemberId() {
Object memberIdObject = memberService.extractMemberInfo().get("id");
if (memberIdObject instanceof Long) {
return (Long) memberIdObject;
} else if (memberIdObject instanceof Integer) {
return ((Integer) memberIdObject).longValue();
} else {
throw new BusinessLogicException(ExceptionCode.INVALID_MEMBER_ID);
}
}
}
DTO
@Getter
@Setter
public class AlarmResponseDto {
private Long memberId;
private Long boardId;
private String content;
private Alarm.AlarmStatus alarmStatus;
}
유저가 받은 모든 알림을 Get 요청할 때 프론트에 response로 보내주기 위해 작성했다.
entity
@NoArgsConstructor
@Entity
@Getter
@Setter
public class Alarm {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
@Enumerated(value = EnumType.STRING)
private AlarmStatus alarmStatus;
private String content;
@ManyToOne
@JoinColumn(name = "MEMBER_ID")
@JsonBackReference
private Member member;
@ManyToOne
@JoinColumn(name = "Board_ID")
private Board board;
public enum AlarmStatus {
BOARD_CREATED("모임 생성"),
BOARD_UPDATE("모임 인원 증가"),
BOARD_CLOSED("모임 마감");
@Getter
private String alarmStatus;
AlarmStatus(String alarmStatus){
this.alarmStatus = alarmStatus;
}
}
@Builder
public Alarm (Member member, Board board, AlarmStatus alarmStatus, String content){
this.member = member;
this.board = board;
this.alarmStatus = alarmStatus;
this.content = content;
}
public static Alarm create (Member member, Board board, AlarmStatus alarmStatus, String content){
return Alarm.builder().member(member).board(board).alarmStatus(alarmStatus).content(content).build();
}
}
mapper
@Mapper(componentModel = "spring")
public interface AlarmMapper {
@Mapping(source = "member.id", target = "memberId")
@Mapping(source = "board.id", target = "boardId")
AlarmResponseDto mapToAlarmResponse(Alarm alarm);
List<AlarmResponseDto> alarmToAlarmResponseDto (List<Alarm> alarms);
}
repository
public interface AlarmRepository extends JpaRepository<Alarm, Long> {
void deleteAllByMember_Id(Long memberId);
List<Alarm> findAlarmByMember_Id(Long memberId);
}
public interface EmitterRepository {
//emitter 저장
SseEmitter save (String emitterId, SseEmitter sseEmitter);
//이벤트 저장
void saveEventCache(String emitterId, Object event);
//memberId와 관련된 모든 emitter를 찾음
Map<String, SseEmitter> findAllEmittersStartWithByMemberId (Long memberId);
//memberId와 관련된 모든 이벤트를 찾음
Map<String, Object> findAllEventCacheStartWithByMemberId (Long memberId);
//emitter 삭제
void deleteById(String id);
//memberId와 관련된 모든 emitter 삭제
void deleteAllEmitterStartWithByMemberId (Long memberId);
//memberId와 관련된 모든 이벤트 삭제
void deleteAllEventCacheStartWithByMemberId(Long memberId);
}
@Repository
@NoArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository{
private final Map<String, SseEmitter>emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
@Override
public SseEmitter save (String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
@Override
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId,event);
}
@Override
public Map<String, SseEmitter> findAllEmittersStartWithByMemberId(Long memberId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(String.valueOf(memberId)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, Object> findAllEventCacheStartWithByMemberId(Long memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(String.valueOf(memberId)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void deleteById(String id) {
emitters.remove(id);
}
@Override
public void deleteAllEmitterStartWithByMemberId(Long memberId) {
emitters.forEach((key, emitter) -> {
if (key.startsWith(String.valueOf(memberId))) {
emitters.remove(key);
}
});
}
@Override
public void deleteAllEventCacheStartWithByMemberId(Long memberId) {
eventCache.forEach((key, emitter) -> {
if (key.startsWith(String.valueOf(memberId))){
eventCache.remove(key);
}
});
}
}
Service
/**
* 클라이언트에서 SSE 연결 요청을 보낸다.
* 서버에서는 클라이언트와 매핑되는 SSE 통신 객체를 만든다.
* 서버에서 이벤트가 발생하면 해당 객체를 통해 클라이언트로 데이터를 전달한다.
*/
@Service
@RequiredArgsConstructor
public class AlarmService {
//기본 타임아웃 설정
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
private final AlarmRepository alarmRepository;
/**
* 클라가 구독을 위해 호출하는 메서드
*/
public SseEmitter subscribe(Long memberId, String lastEventId) {
String id = makeTimeIncludedId(memberId);
SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));
//emitter 가 완료될 때 emitter를 삭제 (모든 데이터가 성공적으로 전송된 상태)
emitter.onCompletion(() -> emitterRepository.deleteById(id));
//emitter 가 타임아웃되었으면 emitter 삭제 (지정된 시간동안 어떤 이벤트도 전송 x)
emitter.onTimeout(() -> emitterRepository.deleteById(id));
//503 에러 방지하고자 더미 이벤트 전달
sendToClient(emitter, id, "EventStream Created. [memberId=" + memberId + "]");
//클라가 미수신한 event가 존재할 경우 전송 (유실 방지)
if (hasLostData(lastEventId)){
sendLostdata(lastEventId, memberId, emitter);
}
return emitter;
}
/**
* 알림 생성, 전송
* 사용자의 모든 알람을 읽음처리
*/
@Async
@Transactional
public void sendAlarm(Member member, Board board, Alarm.AlarmStatus alarmStatus, String content){
Alarm alarm = Alarm.create(member, board, alarmStatus,content);
alarmRepository.save(alarm);
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmittersStartWithByMemberId(member.getId());
String eventId = makeTimeIncludedId(member.getId());
System.out.println(eventId);
sseEmitters.forEach((key, emitter) -> {
//데이터 캐시 저장 (유실 데이터 처리를 위해)
emitterRepository.saveEventCache(key,alarm);
//데이터 전송
sendToClient(emitter,eventId, content);
});
}
/**
* 클라에 데이터 전송 (id -> 데이터를 전달받을 사용자의 id)
*/
private void sendToClient(SseEmitter emitter, String eventId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventId).data(data));
}catch (IOException exception){
emitterRepository.deleteById(eventId);
throw new RuntimeException("연결오류");
}
}
public List<Alarm> getAlarms (Long memberId){
return alarmRepository.findAlarmByMember_Id(memberId);
}
//알림 모두 삭제
@Transactional
public void deleteAllAlarm (Long memberId) {
alarmRepository.deleteAllByMember_Id(memberId);
}
private String makeTimeIncludedId(Long memberId){
return memberId+"_"+ System.currentTimeMillis();
}
private boolean hasLostData(String lastEventId){
return !lastEventId.isEmpty();
}
// 유실 데이터 확인하는 기능
private void sendLostdata(String lastEventId, Long memberId, SseEmitter emitter){
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(memberId);
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey())<0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
}
}
boardService
//모임글 등록
public Board createBoard(BoardDto.Post postDto) {
Member member = findMember(extractMemberId());
Board board = processCreateBoard(postDto, member);
saveApplicantForBoardCreat(board, member);
System.out.println("[모임글 생성에 들어가는 날짜]"+ postDto.getDate());
//알림 발송
alarmService.sendAlarm(member,board, Alarm.AlarmStatus.BOARD_CREATED,"["+board.getTitle()+"] 모임이 등록되었습니다!🔥");
return boardRepository.save(board);
}
모임 글 생성이 완료되었다는 알림을 보내기 위해 boardService에 모임 글 등록하는 비즈니스 로직에 추가했다.
그외에도 모임에 참여하면, 글 작성자와 참여한 사람에게 참여했다는 알림을 보내줬다.
바로 위에서 쓴거처럼 모임글 생성시와, 모임 참여시 작성자와 참여자에게 알림을 보냈다.
이제 여기서 문제가 생겼다..
모임 참여시에는 알림이 정상적으로 저장도되고, 발송도 되는데
모임글 작성시에 정상적으로 작동을 안하는 것이다.. 10번 글을 생성하면 7번은 알림이 안오고 3번만 알림이 오는 상황이 발생했다.
모임 글을 등록하면 콘솔에서
Caused by: org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException: Referential integrity constraint violation: "FKMBNCIAPLP47YG01X9DMWF2TDE: PUBLIC.ALARM FOREIGN KEY(BOARD_ID) REFERENCES PUBLIC.BOARD(ID) (CAST(2 AS BIGINT))"; SQL statement:
이런 에러 로그를 확인할 수 있었다.
gpt한테 물어보니
이 메시지는 PUBLIC.ALARM 테이블의 BOARD_ID 열에서 참조하려는 PUBLIC.BOARD 테이블의 ID 열의 값이 존재하지 않아 참조 무결성 제약 조건 위반으로 인해 오류가 발생했음을 나타냅니다.
라고 알려줬다.
여러 글을 찾아보고 멘토 님께 질문해서 나온 결론은..
알림 보내는 기능에서 에러가 나서 글 작성이나 모임 참여가 안되는걸 막기위헤 알림 보내는 기능을 비동기 처리하기위해 @Async 처리했는데
알림 보내는 로직을 글 생성하는 로직안에 넣어놔서 글 생성이 되기전에 알림을 보내려해서 생기는 문제였다..
여기서 멘토님이 예시를 들어주신게,
팔로워가 5만명인 사람이 글을 작성하고 그 팔로워들에게 알림을 다 보내준다고할 때, 5만명에게 알림을 다 보내고 나서야 글 작성이 완료된다고하면 그 유저는 팔로워가 많다는 이유로 글 생성이 무진장 늦어지게 되는 것이다.
하나의 기능을 구현하기 위해서 이런 상황을 모두 확인하고 각 경우에 따라 어떻게 처리할지를 다 생각해서 구현해야한다는걸 또 느껴버렸다.. (이래서 테스트 케이스를 잘 작성해야한다..)
그래서 결론은 비동기 처리 방식을 위해 알림을 보내는 로직을 분리시키려한다.
메서드를 분리하거나 리스너를 이용하는 방법이 있다고 설명해주셨는데, 이렇게 비동기 처리하는 과정은 프로젝트 이후에 리팩토링을 하며 정리하려한다.
또 하나의 서버가 메세지를 5만명에게도 보내고 다른 기능들도 처리한다고 하면 메모리 과부화로 서버가 다운될 수 있다고도 하셨다. 그래서 보통 메신저나 알림 보내는 기능은 서버를 따로 분리해서 사용하거나 API를 아예 따로 분리한다고 하셨다.
비동기 처리를 하면서 알림 서버를 따로 구현하는 것도 도전해볼 생각이다.
reference
https://dkswnkk.tistory.com/702
https://gilssang97.tistory.com/69
https://velog.io/@readnthink/%EC%8B%A4%EC%8B%9C%EA%B0%84-%EC%95%8C%EB%A6%BC%EA%B8%B0%EB%8A%A5-SSE%EB%A1%9C-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0
https://taemham.github.io/posts/Implementing_Notification/