
์ง๋ ํฌ์คํ ์ ์ด์ด ์ฑํ ๋ง์ดํฌ๋ก ์๋น์ค ์๋ฆผ ๊ธฐ๋ฅ ๊ณผ์ ์ ํฌ์คํ ํ๋๋ก ํ๊ฒ ์ต๋๋ค. ์ฃผ์ ์ฝ๋๋ง ๋ค๋ฃฐ ์์ ์ด๋ผ ๋ค๋ฃจ์ง ์๋ ์ฝ๋๋ค์ ์๋ github๋ฅผ ํตํด ์ฐธ๊ณ ํ์๊ธธ ๋ฐ๋๋๋ค๐๐(msa-master ๋ธ๋์น)
๐ ์ฐธ๊ณ ์ฝ๋ : https://github.com/LminWoo99/PlantBackend/tree/msa-master
์๊ตฌํ์ ํ๋ก์ ํธ์ ์๋ฆผ ๊ธฐ๋ฅ์ 1:1 ์ฑํ ์์ ์ฝ์ง ์์ ๋ฉ์์ง์ ๋ํด ์๋ฆผ์ ์ ๊ณตํ๋ ๊ฒ์ด ํต์ฌ์ ๋๋ค.
1. Stomp์ ChannelInterceptor ํ์ฉ
2. Redis ์ฌ์ฉ ๊ฒฐ์  ์ด์
3. SSE(Server-Sent Events) ๋์
@RestController
@RequestMapping("/notification")
@Slf4j
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;
    /**
     * @title ๋ก๊ทธ์ธ ํ ์ ์  SSE ์ฐ๊ฒฐ
     * @param : String lastEventId,String jwtToken
     **/
    @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> connect(
            @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId,
            @RequestHeader("Authorization") String jwtToken) {
        SseEmitter emitter = notificationService.subscribe(lastEventId, jwtToken);
        return ResponseEntity.ok(emitter);
    }
    /**
     * @title ๋ก๊ทธ์ธ ํ ์ ์ ์ ๋ชจ๋  ์๋ฆผ ์กฐํ
     * @param : Long memberNo(ํ์ฌ ์ ์ํ ๋ฉค๋ฒ)
     **/
    @GetMapping("/all")
    public ResponseEntity<List<NotificationResponse>> notifications(Long memberNo) {
        return ResponseEntity.ok(notificationService.findAllById(memberNo));
    }
    /**
     * @title ์๋ฆผ ์ฝ์ ์ํ๋ง ๋ณ๊ฒฝ
     * @param : Long id(์๋ฆผ id)
     **/
    @PatchMapping("/checked/{id}")
    public ResponseEntity<StatusResponseDto> readNotification(@PathVariable("id") Long id) {
        notificationService.readNotification(id);
        return ResponseEntity.ok(StatusResponseDto.success());
    }
    /**
     * @title ์๋ฆผ ์ญ์ 
     * @param : NotificationDeleteRequest request
     **/
    @DeleteMapping
    public ResponseEntity<StatusResponseDto> deleteNotification(@RequestBody NotificationDeleteRequest request) {
        notificationService.deleteNotification(request.getIdList());
        return ResponseEntity.ok(StatusResponseDto.success());
    }
}

@RequiredArgsConstructor
@Service
@Slf4j
public class NotificationService {
    private static final Long DEFAULT_TIMEOUT = 1000L * 60 * 29 ;// 29๋ถ
    public static final String PREFIX_URL = "๋๋ฉ์ธ url";
    private final NotificationRepository notificationRepository;
    private final EmitterRepository emitterRepository;
    private final PlantServiceClient plantServiceClient;
    private final CircuitBreakerFactory circuitBreakerFactory;
    /**
     * SSE ์ฐ๊ฒฐ ๋ฉ์๋
     * @param : MemberDto memberDto, String lastEnventId
     */
    @Transactional
    public SseEmitter subscribe(String lastEventId, String jwtToken) {
        ResponseEntity<MemberDto> joinMember = plantServiceClient.getJoinMember(jwtToken);
        Integer memberNo = joinMember.getBody().getId().intValue();
        //Emitter map์ ์ ์ฅํ๊ธฐ ์ํ key ์์ฑ
        String id = memberNo + "_" + System.currentTimeMillis();
        // SseEmitter map์ ์ ์ฅ
        SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));
        log.info("emitter add: {}", emitter);
        // emitter์ ์๋ฃ ๋๋ ํ์์์ Event๊ฐ ๋ฐ์ํ  ๊ฒฝ์ฐ, ํด๋น emitter๋ฅผ ์ญ์ 
        emitter.onCompletion(() -> emitterRepository.deleteById(id));
        emitter.onTimeout(() -> emitterRepository.deleteById(id));
        // 503 ์๋ฌ๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํ ๋๋ฏธ Event ์ ์ก
        sendToClient(emitter, id, "EventStream Created. [userId=" + memberNo + "]");
        // ํด๋ผ์ด์ธํธ๊ฐ ๋ฏธ์์ ํ Event ๋ชฉ๋ก์ด ์กด์ฌํ  ๊ฒฝ์ฐ ์ ์กํ์ฌ Event ์ ์ค์ ์๋ฐฉ
        if (!lastEventId.isEmpty()) {
            // id์ ํด๋นํ๋ cache ์ฐพ์
            Map<String, Object> events = emitterRepository.findAllCacheStartWithId(String.valueOf(memberNo));
            //๋ฏธ์์ ํ Event ๋ชฉ๋ก ์ ์ก
            events.entrySet().stream()
                    .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                    .forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
        }
        return emitter;
    }
    /**
     * ์๋ฆผ ์ ์ก ๋ฉ์๋
     * ์๋ฆผ ๋ฐ์ ์ ์  SseEmitter ๊ฐ์ ธ์์ ์๋ฆผ ์ ์ก
     * @param : MemberDto sender, MemberDto receiver, NotifiTypeEnum type, String resource, String content
     */
    @Transactional
    public void send(MemberDto sender, MemberDto receiver, NotifiTypeEnum type, String resource, String content) {
        // ์๋ฆผ ์์ฑ
        Notification notification = Notification.builder()
                .senderNo(sender.getId().intValue())
                .receiverNo(receiver.getId().intValue())
                .typeEnum(type)
                .url(PREFIX_URL + type.getPath() + resource)
                .content(content)
                .isRead(false)
                .isDel(false)
                .build();
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        //๋ณด๋ธ ์ฌ๋ ์ด๋ฆ ์ฐพ๊ธฐ ์ํด feignClient๋ฅผ ํตํด ํธ์ถ
        ResponseEntity<MemberDto> findMember = circuitBreaker.run(() -> plantServiceClient.findById(sender.getId()),
                throwable -> ResponseEntity.ok(null));
        notification.setSenderName(findMember.getBody().getNickname());
        // SseEmitter ์บ์ ์กฐํ๋ฅผ ์ํด key์ prefix ์์ฑ
        String id = String.valueOf(notification.getReceiverNo());
        //์๋ฆผ ์ ์ฅ
        notificationRepository.save(notification);
        // ๋ก๊ทธ์ธ ํ ์ ์ ์ SseEmitter ๋ชจ๋ ๊ฐ์ ธ์ค๊ธฐ
        Map<String, SseEmitter> sseEmitterMap = emitterRepository.findAllStartWithById(id);
        sseEmitterMap.forEach(
                (key, emitter) -> {
                    //์บ์ ์ ์ฅ(์ ์คํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌ)
                    emitterRepository.saveCache(key, notification);
                    //๋ฐ์ํฐ ์ ์ก
                    sendToClient(emitter, key, NotificationResponse.toDto(notification));
                }
        );
    }
    /**
     * ํด๋ผ์ด์ธํธ์ SSE + ์๋ฆผ ๋ฐ์ดํฐ ์ ์ก ๋ฉ์๋
     * @param : SseEmitter emitter, String id, Object data
     */
    private void sendToClient(SseEmitter emitter, String id, Object data) {
        try {
            log.info("event : " + data);
            emitter.send(SseEmitter.event()
                    .id(id)
                    .name("sse")
                    .data(data));
            log.info("event call: " + emitter);
        } catch (IOException ex) {
            emitterRepository.deleteById(id);
            log.error("--- SSE ์ฐ๊ฒฐ ์ค๋ฅ ----", ex);
        }
    }
    /**
     * ๋ก๊ทธ์ธํ ๋ฉค๋ฒ ์๋ฆผ ์ ์ฒด ์กฐํ
     * ์กฐํ์ฉ ๋ฉ์๋ => (readOnly = true)
     * @param : MemberDto memberDto
     */
    public List<NotificationResponse> findAllById(Long memberNo) {
        // ์ฑํ
์ ๋ง์ง๋ง ์๋ฆผ ์กฐํ
        List<Notification> chat
                = notificationRepository.findChatByReceiver(memberNo.intValue());
        return chat.stream()
                .map(NotificationResponse::toDto)
                .sorted(Comparator.comparing(NotificationResponse::getId).reversed())
                .collect(Collectors.toList());
    }
    /**
     * ์ผ๋ฆผ ์ฝ์ ์ฒ๋ฆฌ ๋ฉ์๋
     * @param : Long id
     */
    @Transactional
    public void readNotification(Long id) {
        notificationRepository.updateIsReadById(id);
    }
    /**
     * ์ผ๋ฆผ ์ญ์  ์ฒ๋ฆฌ ๋ฉ์๋
     * @param : Long id
     */
    public void deleteNotification(Long[] idList) {
        for (Long id : idList) {
            Notification notification = getNotification(id);
            notificationRepository.delete(notification);
        }
    }
    //== ๊ฐ๋ณ ์๋ฆผ ์กฐํ ==//
    private Notification getNotification(Long id) {
        return notificationRepository.findById(id)
                .orElseThrow(ErrorCode::throwNotificationNotFound);
    }
}

subscribe()๋ฅผ ๋ณด๋ฉด id๊ฐ์ {user_id}_{System.currentTimeMillis()} ํํ๋ก ์ฌ์ฉํ๋ ๊ฒ์ ๋ณผ ์ ์์ต๋๋ค. ์ด๋ ๊ฒ ์ฌ์ฉํ๋ ์ด์ ๊ฐ Last-Event-ID ํค๋์ ์๊ด์ด ์์ต๋๋ค.
Last-Event-IDํค๋๋ ํด๋ผ์ด์ธํธ๊ฐ ๋ง์ง๋ง์ผ๋ก ์์ ํ ๋ฐ์ดํฐ์ id๊ฐ์ ์๋ฏธํ๋ค๊ณ ํ์ต๋๋ค. id๊ฐ๊ณผ ์ ์ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ณ ์์ผ๋ฉด ์ด ๊ฐ์ ์ด์ฉํ์ฌ ์ ์ค๋ ๋ฐ์ดํฐ ์ ์ก์ ๋ค์ ํด์ค ์ ์์ต๋๋ค. ํ์ง๋ง ๋ง์ฝ id๊ฐ์ ๊ทธ๋๋ก ์ฌ์ฉํ๋ค๋ฉด ์ด๋ค ๋ฌธ์ ๊ฐ ์์๊น?
id๊ฐ์ ๊ทธ๋๋ก ์ฌ์ฉํ๋ค๋ฉด Last-Event-Id๊ฐ์ด ์๋ฏธ๊ฐ ์์ด์ง๋๋ค.
ํด๋ผ์ด์ธํธ์ sse์ฐ๊ฒฐ ์์ฒญ์ ์๋ตํ๊ธฐ ์ํด์๋ SseEmitter ๊ฐ์ฒด๋ฅผ ๋ง๋ค์ด ๋ฐํํด์ค์ผํฉ๋๋ค. SseEmitter ๊ฐ์ฒด๋ฅผ ๋ง๋ค ๋ ์ ํจ ์๊ฐ์ ์ค ์ ์๊ณ ์ด๋ ์ฃผ๋ ์๊ฐ ๋งํผ sse ์ฐ๊ฒฐ์ด ์ ์ง๋๊ณ , ์๊ฐ์ด ์ง๋๋ฉด ์๋์ผ๋ก ํด๋ผ์ด์ธํธ์์ ์ฌ์ฐ๊ฒฐ ์์ฒญ์ ๋ณด๋ด๊ฒ ๋ฉ๋๋ค.
id๋ฅผ key๋ก, SseEmitter๋ฅผ value๋ก ์ ์ฅํ๊ณ , SseEmitter์ ์๊ฐ ์ด๊ณผ ๋ฐ ๋คํธ์ํฌ ์ค๋ฅ๋ฅผ ํฌํจํ ๋ชจ๋ ์ด์ ๋ก ๋น๋๊ธฐ ์์ฒญ์ด ์ ์ ๋์ํ ์ ์๋ค๋ฉด ์ ์ฅํด๋ SseEmitter๋ฅผ ์ญ์ ํ๊ฒ ๋ฉ๋๋ค.

@Repository
@Slf4j
public class EmitterRepository {
    //Map์ ํ์๊ณผ ์ฐ๊ฒฐ๋ SSE SseEmitter ๊ฐ์ฒด๋ฅผ ์ ์ฅ. ๋์์ฑ ๋ณด์ฅ
    public final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    //Event๋ฅผ ์บ์์ ์ ์ฅ
    public final Map<String, Object> cache = new ConcurrentHashMap<>();
    //id, sseEmitter map์ ์ ์ฅ
    public SseEmitter save(String id, SseEmitter sseEmitter) {
        emitters.put(id, sseEmitter);
        return sseEmitter;
    }
    //id์ event๋ฅผ ๋งค๊ฐ๋ณ์๋ก ๋ฐ์ cache ๋งต์ ์ ์ฅ
    public void saveCache(String id, Object event) {
        cache.put(id, event);
    }
    //id๋ก ์์ํ๋ ํค๋ฅผ ๊ฐ์ง emitters ๋งต ํญ๋ชฉ์ ํํฐ๋งํ์ฌ ๋งต์ผ๋ก ๋ฐํ
    public Map<String, SseEmitter> findAllStartWithById(String id) {
        log.info("map: " + emitters);
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(id))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }
    //id๋ก ์์ํ๋ ํค๋ฅผ ๊ฐ์ง eventCache ๋งต ํญ๋ชฉ์ ํํฐ๋งํ์ฌ ๋งต์ผ๋ก ๋ฐํ
    public Map<String, Object> findAllCacheStartWithId(String id) {
        return cache.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(id))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }
    // id๋ก ์์ํ๋ ํค๋ฅผ ๊ฐ์ง emitters ๋งต ํญ๋ชฉ์ ๋ชจ๋ ์ ๊ฑฐ
    public void deleteAllStartWithId(String id) {
        emitters.forEach(
                (key, emitter) -> {
                    if (key.startsWith(id)) {
                        emitters.remove(key);
                    }
                }
        );
    }
    //  emitters ๋งต์์ ํด๋น id๋ฅผ ๊ฐ์ง ํญ๋ชฉ์ ์ญ์ 
    public void deleteById(String id) {
        emitters.remove(id);
    }
    
}
๐ emitters์ eventCache๊ฐ ๋งต ํํ์ธ ์ด์ ?
emitters์ ๊ฒฝ์ฐ key์ value์ ๊ฐ๊ฐ emitterId์ SseEmitter๊ฐ์ฒด๋ฅผ ์ ์ฅํ๊ณ
eventCache๋ key์ value์ ๊ฐ๊ฐ eventCacheId์ euentCache๊ฐ์ฒด๋ฅผ ์ ์ฅํ๋ค.
์ด๋ฅผ ํตํด ์ฃผ์ด์ง ํค๋ฅผ ์ฌ์ฉํ์ฌ ๋น ๋ฅด๊ฒ ๋ฐ์ดํฐ๋ฅผ ๊ฒ์ํ ์ ์๊ณ , ์ค๋ณต๋ ํค๋ฅผ ๊ฐ์ง ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ์ ์๋ค.
concurrentHashMap์ ์ฌ๋ฌ ์ค๋ ๋์์ ๋์์ ์์ ํ๊ฒ ๋ฐ์ดํฐ์ ์ ๊ทผํ ์ ์๋ ๋งต ๊ตฌํ์ฒด์ด๋ค.
์๋ฆผ ์์คํ ์์๋ ์ฌ๋ฌ ํด๋ผ์ด์ธํธ๊ฐ ๋์์ ๊ตฌ๋ ํ๊ณ ์ด๋ฒคํธ๋ฅผ ์ ์กํ ์ ์์ผ๋ฏ๋ก, ๋์์ฑ์ ์ ์ดํ๋ ๊ฒ์ด ์ค์
๋ฐ๋ผ์ ๋งต ํํ๋ฅผ ์ฌ์ฉํจ์ผ๋ก์ ๋ฐ์ดํฐ ์ ์ฅ, ๊ณ ์ ์ฑ ๋ณด์ฅ, ์ฑ๋ฅ ๋ฑ์ ์ด์ ์ ์ป์ผ๋ฉฐ ํจ์จ์ ์ผ๋ก ๊ฐ์ฒด๋ฅผ ๊ด๋ฆฌํ ์ ์๋ค.
  @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        // StompCommand์ ๋ฐ๋ผ์ ๋ก์ง์ ๋ถ๊ธฐํด์ ์ฒ๋ฆฌํ๋ ๋ฉ์๋๋ฅผ ํธ์ถํ๋ค.
        String username = verifyAccessToken(getAccessToken(accessor));
        log.info("StompAccessor = {}", accessor);
        handleMessage(accessor.getCommand(), accessor, username);
        return message;
    }
private void handleMessage(StompCommand stompCommand, StompHeaderAccessor accessor, String username) {
        log.info(stompCommand.toString());
        switch (stompCommand) {
            case CONNECT:
                connectToChatRoom(accessor, username);
                break;
            case SUBSCRIBE:
            case SEND:
                verifyAccessToken(getAccessToken(accessor));
                break;
        }
    }
 private void connectToChatRoom(StompHeaderAccessor accessor, String username) {
        // ์ฑํ
๋ฐฉ ๋ฒํธ๋ฅผ ๊ฐ์ ธ์จ๋ค.
        Integer chatRoomNo = getChatRoomNo(accessor);
        // ์ฑํ
๋ฐฉ ์
์ฅ ์ฒ๋ฆฌ -> Redis์ ์
์ฅ ๋ด์ญ ์ ์ฅ
        chatRoomService.connectChatRoom(chatRoomNo, username);
//        // ์ฝ์ง ์์ ์ฑํ
์ ์ ๋ถ ์ฝ์ ์ฒ๋ฆฌ
        chatService.updateCountAllZero(chatRoomNo, username);
        // ํ์ฌ ์ฑํ
๋ฐฉ์ ์ ์์ค์ธ ์ธ์์ด ์๋์ง ํ์ธํ๋ค.
        boolean isConnected = chatRoomService.isConnected(chatRoomNo);
        if (isConnected) {
            chatService.updateMessage(username, chatRoomNo);
        }
    }
๐ Note
connectToChatRoom ๋ฉ์๋๋ ํฌ๊ฒ 3๊ฐ์ง ๋์์ ํ๊ฒ ๋ฉ๋๋ค.
1. Redis์ ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์ํ๋ ค๋ ํ์์ ์ ์ฅํ๋ค.
2. ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์ํ๋ ค๋ ํ์์ด ์ฝ์ง ์์ ์ฑํ ์ด ์๋ค๋ฉด, ์ ๋ถ ์ฝ์ ์ฒ๋ฆฌ ํด์ค๋ค(readcount 0์ผ๋ก ์ ๋ฐ์ดํธ)
3. ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์์ค์ธ ํ์์ด ์๋ค๋ฉด, ์ฑํ ๋ฆฌ์คํธ ๋ค์ ์๋ฒ์ ์์ฒญํด์ ๋ฐ๋๋ก ์ฒ๋ฆฌํ๋ค.
ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์์ค์ธ ํ์์ด ์๋ค๋ฉด, ์ฑํ ๋ฆฌ์คํธ๋ฅผ ๋ค์ ์๋ฒ์ ์์ฒญํด์ ๋ฐ๋๋ก ํ ์ด์ ๋ ์๋์ ๊ฐ์ต๋๋ค.
ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์์ค์ธ ํ์์ A, ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์ํ๋ ค๋ ํ์์ B๋ผ๊ณ ๊ฐ์ ํ๊ฒ ์ต๋๋ค.
A์ ํ๋ฉด์์ B๊ฐ ์ฝ์ง ์์ ์ฑํ ๋ค์ด ์์ฝ์ ํ์๋๊ฒ ๋ฉ๋๋ค.
B๊ฐ ์ฑํ ๋ฐฉ์ ์ ์์ ์๋ํ๋ฉด, ChannelInterceptor์ Connect ์ด๋ฒคํธ๊ฐ ์กํ๊ฒ ๋๊ณ , B๊ฐ ์ฝ์ง ์์ ์ฑํ ๋ค์ ๋ชจ๋ ์ฝ์ ์ฒ๋ฆฌ ๋ฉ๋๋ค.
์ฆ, A๋ B๊ฐ ์ฑํ ๋ฐฉ์ ์ ์ํ๋ฉด์ ์ฑํ ์ ๋ชจ๋ ์ฝ์๋๋ฐ ์ฝ์ง ์์ ์ํ์ ๋ฐ์ดํฐ๋ฅผ ์์ง ๊ฐ์ง๊ณ ์๋ ๊ฒ์ ๋๋ค.
๋ฐ๋ผ์ B๊ฐ ์ ์ํ๋ฉด์ ๋ชจ๋ ์ฑํ ์ ์ฝ์์ฒ๋ฆฌ ํ๊ธฐ๋๋ฌธ์, A์๊ฒ B๊ฐ ๋ชจ๋ ์ฑํ ์ ์ฝ์์ผ๋ ์ต์ ๋ฒ์ ์ ์ฑํ ๋ฐ์ดํฐ๋ฅผ ์๋ฒ์์ ๊ฐ์ ธ์ฌ ์ ์๋๋ก ์๋ฒ์์ ์๋ ค์ฃผ๊ฒ ์ฒ๋ฆฌํ์์ต๋๋ค.
    /**
     * ์ฑํ
๋ฐฉ์ 1๋ช
 ์ฐ๊ฒฐ๋๋์ง ํ์ธ ๋ฉ์๋
     * @param : Long chatRoomNo
     */
    public boolean isConnected(Integer chatRoomNo) {
        List<ChatRoom> connectedList = chatRoomRepository.findByChatroomNo(chatRoomNo);
        return connectedList.size() == 1;
    }
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@RedisHash(value = "chatRoom")
public class ChatRoom {
    @Id
    private String id;
    @Indexed
    private Integer chatroomNo;
    @Indexed
    private String nickname;
    @Builder
    public ChatRoom(Integer chatroomNo, String nickname) {
        this.chatroomNo = chatroomNo;
        this.nickname = nickname;
    }
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {
    private final ChatRepository chatRepository;
    private final MongoChatRepository mongoChatRepository;
    private final MessageSender sender;
    private final AggregationSender aggregationSender;
    private final MongoTemplate mongoTemplate;
    private final ChatRoomService chatRoomService;
    private final PlantServiceClient plantServiceClient;
    private final CircuitBreakerFactory circuitBreakerFactory;
    private final TokenHandler tokenHandler;
    private final NotificationService notificationService;
    /**
     * ์ฑํ
๋ฐฉ ์์ฑ ๋ฉ์๋
     * ๊ฑฐ๋ ๊ฒ์๊ธ์ ์ฌ๋ฆฌ์ง ์์ ์ฌ๋๋ง ํธ์ถํ๋ ๋ฉ์๋
     * ๊ตฌ๋งค ํฌ๋ง์๋ง ์ฑํ
๋ฐฉ์ ์์ฑ ๊ฐ๋ฅ
     * FeignCLient๋ฅผ ํตํด plant-service์์ ๊ฑฐ๋๊ฐ๋ฅ ์ฌ๋ถ ํ์ธ
     * @param : MemberDto memberDto, ChatRequestDto requestDto
     */
    @Transactional
    public Chat makeChatRoom(Integer memberNo, ChatRequestDto requestDto) {
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        // ์ฑํ
์ ๊ฑธ๋ ค๊ณ  ํ๋ ๊ฑฐ๋๊ธ์ด ๊ฑฐ๋ ๊ฐ๋ฅ ์ํ์ธ์ง ์กฐํํด๋ณธ๋ค.
        ResponseEntity<ResponseTradeBoardDto> tradeBoardDto = circuitBreaker.run(() ->
                        plantServiceClient.boardContent(requestDto.getTradeBoardNo().longValue()),
                throwable -> ResponseEntity.ok(null));
        // ์กฐํํด์จ ๊ฑฐ๋๊ธ ์ํ๊ฐ ๊ฑฐ๋์๋ฃ ์ด๋ผ๋ฉด ๊ฑฐ๋๊ฐ ๋ถ๊ฐ๋ฅํ ์ํ์ด๋ค.
        if (tradeBoardDto.getBody().getStatus().equals("๊ฑฐ๋์๋ฃ")) {
            throw new IllegalStateException("ํ์ฌ ๊ฑฐ๋๊ฐ๋ฅ ์ํ๊ฐ ์๋๋๋ค.");
        }
        Integer tradeBoardNo = tradeBoardDto.getBody().getId().intValue();
        //์ด๋ฏธ ํด๋น๊ธ ๊ธฐ์ค์ผ๋ก ์ฑํ
์ ์์ฒญํ ์ฌ๋๊ณผ ๋ฐ๋ ์ฌ๋์ด ์ผ์นํ  ๊ฒฝ์ฐ ์ฒดํฌ
        if (chatRepository.existChatRoomByBuyer(tradeBoardNo, requestDto.getCreateMember(), memberNo)) {
            Chat existedChat = chatRepository.findByTradeBoardNoAndChatNo(tradeBoardNo, requestDto.getCreateMember());
            return existedChat;
        }
        if (!chatRepository.existChatRoomByBuyer(tradeBoardNo, requestDto.getCreateMember(), memberNo)) {
            Chat chat = Chat.builder()
                    .tradeBoardNo(requestDto.getTradeBoardNo())
                    .createMember(requestDto.getCreateMember())
                    .joinMember(memberNo)
                    .regDate(LocalDateTime.now())
                    .build();
            Chat savedChat = chatRepository.save(chat);
            // ์ฑํ
๋ฐฉ ์นด์ดํธ ์ฆ๊ฐ
            AggregationDto aggregationDto = AggregationDto
                    .builder()
                    .isIncrease("true")
                    .target(AggregationTarget.CHAT)
                    .tradeBoardNo(requestDto.getTradeBoardNo())
                    .build();
            aggregationSender.send(KafkaUtil.KAFKA_AGGREGATION, aggregationDto);
            return savedChat;
        }
        throw new IllegalArgumentException("์กด์ฌํ์ง ์๋ ๊ฒ์๊ธ ์
๋๋ค");
    }
    /**
     * ์ฑํ
๋ฐฉ ๋ฆฌ์คํธ ์กฐํ ๋ฉ์๋
     * FeignCLient๋ฅผ ํตํด plant-service์์ ์ ์  ์ ๋ณด ์กฐํํ ์ฑํ
๋ฐฉ ๋ง๋  ์ฌ๋์ธ์ง ํ์ธ
     * mongodb์์ ์ฑํ
 ๋ฉ์ธ์ง ๋ณด๋ธ ์๊ฐ์ ๋ด๋ฆผ์ฐจ์์ผ๋ก ์ ๋ ฌํ ์ฒซ๋ฒ์งธ ๊ฐ ๋ง์ง๋ง ๋ฉ์ธ์ง๋ก ์ธํ
     * @param : Integer memberNo, Integer tradeBoardNo
     */
    public List<ChatRoomResponseDto> getChatList(Integer memberNo, Integer tradeBoardNo) {
        List<ChatRoomResponseDto> chatRoomList = chatRepository.getChattingList(memberNo, tradeBoardNo);
        //Participant ์ฑ์์ผ๋จ(username)
            chatRoomList
                    .forEach(chatRoomDto -> {
                        //param์ผ๋ก ๋์ด์จ ๋ฉค๋ฒ๊ฐ ์ฑํ
 ๋ง๋  ๋ฉค๋ฒ์ผ ๊ฒฝ์ฐ => Participant์ ์ฐธ๊ฐํ ๋ฉค๋ฒ
//                        ResponseEntity<MemberDto> byId = plantServiceClient.findById(chatRoomDto.getCreateMember().longValue());
                        if (memberNo.equals(chatRoomDto.getCreateMember())) {
                            ResponseEntity<MemberDto> memberDtoResponse = plantServiceClient.findById(chatRoomDto.getJoinMember().longValue());
                            chatRoomDto.setParticipant(new ChatRoomResponseDto.Participant(memberDtoResponse.getBody().getUsername(), memberDtoResponse.getBody().getNickname()));
                        }
                        //param์ผ๋ก ๋์ด์จ ๋ฉค๋ฒ๊ฐ ์ฑํ
๋ฐฉ์ ์ฐธ๊ฐํ ๋ฉค๋ฒ์ผ ๊ฒฝ์ฐ => Participant์ ์ฑํ
๋ฐฉ ๋ง๋  ๋ฉค๋ฒ
                        if (!memberNo.equals(chatRoomDto.getCreateMember())){
                            ResponseEntity<MemberDto> memberDtoResponse = plantServiceClient.findById(chatRoomDto.getCreateMember().longValue());
                            chatRoomDto.setParticipant(new ChatRoomResponseDto.Participant(memberDtoResponse.getBody().getUsername(), memberDtoResponse.getBody().getNickname()));
                        }
//                      // ์ฑํ
๋ฐฉ๋ณ๋ก ์ฝ์ง ์์ ๋ฉ์์ง ๊ฐ์๋ฅผ ์
ํ
                        long unReadCount = countUnReadMessage(chatRoomDto.getChatNo(), memberNo);
                        chatRoomDto.setUnReadCount(unReadCount);
                        // ์ฑํ
๋ฐฉ๋ณ๋ก ๋ง์ง๋ง ์ฑํ
๋ด์ฉ๊ณผ ์๊ฐ์ ์
ํ
                        Page<Chatting> chatting =
                                mongoChatRepository.findByChatRoomNoOrderBySendDateDesc(chatRoomDto.getChatNo(), PageRequest.of(0, 1));
                        if (chatting.hasContent()) {
                            Chatting chat = chatting.getContent().get(0);
                            ChatRoomResponseDto.LatestMessage latestMessage = ChatRoomResponseDto.LatestMessage.builder()
                                    .context(chat.getContent())
                                    .sendAt(chat.getSendDate())
                                    .build();
                            chatRoomDto.setLatestMessage(latestMessage);
                        }
                    });
        return chatRoomList;
    }
    /**
     * ์ฑํ
 ๋ฉ์ธ์ง ์กฐํ ๋ฉ์๋
     * ์ฑํ
 ๋ฉ์ธ์ง ์กฐํ์ ํด๋น ๋ฉ์ธ์ง๋ฅผ ์ฝ์ ๊ฒ์ด๋ฏ๋ก ๋ฉ์ธ์ง ์ฝ์ ์ฒ๋ฆฌ๋ ์งํ
     * @param : Integer chatRoomNo, Integer memberNo
     */
    public ChattingHistoryResponseDto getChattingList(Integer chatRoomNo, Integer memberNo) {
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        // member id๋ก ์กฐํ
        ResponseEntity<MemberDto> memberDto = circuitBreaker.run(() -> plantServiceClient.findById(memberNo.longValue()),
                throwable -> ResponseEntity.ok(null));
        updateCountAllZero(chatRoomNo, memberDto.getBody().getUsername());
        List<ChatResponseDto> chattingList = mongoChatRepository.findByChatRoomNo(chatRoomNo)
                .stream()
                .map(chat -> new ChatResponseDto(chat, memberNo)
                )
                .collect(Collectors.toList());
        return ChattingHistoryResponseDto.builder()
                .chatList(chattingList)
                .email(memberDto.getBody().getEmail())
                .build();
    }
    /**
     * ๋ฉ์ธ์ง ์ ์ก ๋ฉ์๋
     * jwt ํ ํฐ์์ username ์ถ์ถ
     * ์นดํ์นด ํ ํฝ์ผ๋ก ๋ฉ์ธ ์ ์ก
     * @param : Message message, String accessToken
     */
    public void sendMessage(Message message, String accessToken) {
        // member id๋ก ์กฐํ
        ResponseEntity<MemberDto> memberDto = plantServiceClient.findByUsername(tokenHandler.getUid(accessToken));
        // ์ฑํ
๋ฐฉ์ ๋ชจ๋  ์ ์ ๊ฐ ์ฐธ์ฌ์ค์ธ์ง ํ์ธํ๋ค.
        boolean isConnectedAll = chatRoomService.isAllConnected(message.getChatNo());
        // 1:1 ์ฑํ
์ด๋ฏ๋ก 2๋ช
 ์ ์์ readCount 0, ํ๋ช
 ์ ์์ 1
        Integer readCount = isConnectedAll ? 0 : 1;
        // message ๊ฐ์ฒด์ ๋ณด๋ธ์๊ฐ, ๋ณด๋ธ์ฌ๋ memberNo, ๋๋ค์์ ์
ํ
ํด์ค๋ค.
        message.setSendTimeAndSender(LocalDateTime.now(), memberDto.getBody().getId().intValue(), memberDto.getBody().getNickname(), readCount);
        // ๋ฉ์์ง๋ฅผ ์ ์กํ๋ค.
        sender.send(KafkaUtil.KAFKA_TOPIC, message);
    }
    /**
     * ์๋ฆผ ์ ์ก ๋ฐ ๋ฉ์ธ์ง ์ ์ฅ ๋ฉ์๋
     * FeignCLient๋ฅผ ํตํด plant-service์์ ๋ฉ์ธ์ง ๋ณด๋ธ ์ ์  ์ ๋ณด ์กฐํ
     * ์๋ฆผ์ ์๋๋ฐฉ์ด ์ฝ์ง ์์ ๊ฒฝ์ฐ๋ง ์ ์ก
     * @param : Message message
     */
    @Transactional
    public Message sendNotificationAndSaveMessage(Message message) {
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        //๋ฉ์ธ์ง ์ ์ฅ๊ณผ ์๋ฆผ ๋ฐ์ก์ ์ํด ๋ฉ์ธ์ง ๋ณด๋ธ ํ์ ์กฐํ
        ResponseEntity<MemberDto> memberDto = circuitBreaker.run(() -> plantServiceClient.findById(message.getSenderNo().longValue()),
                throwable -> ResponseEntity.ok(null));
        // ์๋๋ฐฉ์ด ์ฝ์ง ์์ ๊ฒฝ์ฐ์๋ง ์๋ฆผ ์ ์ก
        if (message.getReadCount().equals(1)) {
            // ์๋ ์ ์ก์ ์ํด ๋ฉ์์ง๋ฅผ ๋ฐ๋ ์ฌ๋์ ์กฐํํ๋ค.
            Integer memberNo = chatRepository.getReceiverMember(message.getChatNo(), message.getSenderNo());
            ResponseEntity<MemberDto> receiveMember = circuitBreaker.run(() -> plantServiceClient.findById(memberNo.longValue()),
                    throwable -> ResponseEntity.ok(null));
            String content = message.getContentType().equals("image")
                                    ? "image" : message.getContent();
            // ์๋์ ๋ณด๋ผ URL์ ์์ฑํ๋ค.
            String sendUrl = getNotificationUrl(message.getTradeBoardNo(), message.getChatNo());
            //์๋ฆผ ์ ์ก
            notificationService.send(memberDto.getBody(), receiveMember.getBody(), NotifiTypeEnum.CHAT, sendUrl, content);
        }
        // ๋ณด๋ธ ์ฌ๋์ผ ๊ฒฝ์ฐ์๋ง ๋ฉ์์ง๋ฅผ ์ ์ฅ -> ์ค๋ณต ์ ์ฅ ๋ฐฉ์ง
        if (message.getSenderEmail().equals(memberDto.getBody().getEmail())) {
            // Message ๊ฐ์ฒด๋ฅผ ์ฑํ
 ์ํฐํฐ๋ก ๋ณํ
            Chatting chatting = message.convertEntity();
            // ์ฑํ
 ๋ด์ฉ์ ์ ์ฅ
            Chatting savedChat = mongoChatRepository.save(chatting);
            // ์ ์ฅ๋ ๊ณ ์  ID๋ฅผ ๋ฐํ
            message.setId(savedChat.getId());
        }
        return message;
    }
    /**
     * ์ฐธ๊ฐ์ ์
์ฅ ์๋ฆผ ๋ฉ์๋
     * @param : String email, Integer chatRoomNo
     */
    public void updateMessage(String email, Integer chatRoomNo) {
        Message message = Message.builder()
                .contentType("notice")
                .chatNo(chatRoomNo)
                .content(email + " ๋์ด ๋์์ค์
จ์ต๋๋ค.")
                .build();
        sender.send(KafkaUtil.KAFKA_TOPIC, message);
    }
    /**
     * ์ฝ์ง ์์ ๋ฉ์์ง ์ฑํ
์ฅ ์
์ฅ์ ์ฝ์ ์ฒ๋ฆฌ ๋ฉ์๋
     * @param : Integer chatNo, String username
     */
    public void updateCountAllZero(Integer chatNo, String username) {
        CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
        ResponseEntity<MemberDto> findMember = circuitBreaker.run(() -> plantServiceClient.findByUsername(username),
                throwable -> ResponseEntity.ok(null));
        //MongoDb Update Query
        Update update = new Update().set("readCount", 0);
        //ne-> not equal
        Query query = new Query(where("chatRoomNo").is(chatNo)
                .and("senderNo").ne(findMember.getBody().getId().intValue()));
        mongoTemplate.updateMulti(query, update, Chatting.class);
    }
    /**
     * ์ฝ์ง ์์ ๋ฉ์์ง ์นด์ดํธ ๋ฉ์๋
     * @param : Integer chatNo, Integer senderNo
     */
    long countUnReadMessage(Integer chatRoomNo, Integer senderNo) {
        Query query = new Query(where("chatRoomNo").is(chatRoomNo)
                .and("readCount").is(1)
                .and("senderNo").ne(senderNo));
        return mongoTemplate.count(query, Chatting.class);
    }
    private String getNotificationUrl(Integer tradeBoardNo, Integer chatNo) {
        return chatNo +
                "/" +
                tradeBoardNo;
    }
    /**
     * ํ๋งค์๊ฐ ์ฐธ๊ฐํ  ์ฑํ
๋ฐฉ์ด ์กด์ฌํ๋์ง ์ ๋ฌด ์ฒ๋ฆฌ ๋ฉ์๋
     * ๋จ์ ์กฐํ์ฉ ๋ฉ์๋๋ผ readOnly = true
     *
     * @param : Integer tradeBoardNo,  Integer memberNo
     */
    @Transactional(readOnly = true)
    public Boolean existChatRoomBySeller(Integer tradeBoardNo, Integer memberNo) {
        return chatRepository.existChatRoomBySeller(tradeBoardNo, memberNo);
    }
    /**
     * ์ฑํ
๋ฐฉ ์ญ์  ๋ฉ์๋
     * plant-service์์ kafka๋ฅผ ํตํด ๊ฑฐ๋ ๊ฒ์๊ธ ์ญ์  ์์ฒญ์ ๋ฐ์ผ๋ฉด ์ญ์ 
     *
     * @param : Integer tradeBoardNo
     */
    @Transactional
    public void deleteChatRoom(Integer tradeBoardNo) {
        List<Integer> chatRoomNoList = chatRepository.deleteChatRoomAndReturnChatNo(tradeBoardNo);
        deleteChatting(chatRoomNoList);
    }
    @Transactional
    public void deleteChatting(List<Integer> chatRoomNoList) {
        // ์ค๋ณต๋ chatNo ์ ๊ฑฐ
        Set<Integer> uniqueChatNoSet = new HashSet<>(chatRoomNoList);
        // ์ค๋ณต ์ ๊ฑฐ ํ์ chatNo์ ํด๋นํ๋ ์ฑํ
 ๋ฐ์ดํฐ ์ญ์ 
        mongoTemplate.remove(query(where("chatRoomNo").in(uniqueChatNoSet)), Chatting.class);
    }
}

 // ์ฑํ
๋ฐฉ์ ๋ชจ๋  ์ ์ ๊ฐ ์ฐธ์ฌ์ค์ธ์ง ํ์ธํ๋ค.
   boolean isConnectedAll = chatRoomService.isAllConnected(message.getChatNo());
        // 1:1 ์ฑํ
์ด๋ฏ๋ก 2๋ช
 ์ ์์ readCount 0, ํ๋ช
 ์ ์์ 1
        Integer readCount = isConnectedAll ? 0 : 1;
        
   /**
     * ์ฑํ
๋ฐฉ ์ ์ 2๋ช
 ์ฐผ๋์ง ํ์ธ ๋ฉ์๋
     * @param : Long chatRoomNo
     */
    public boolean isAllConnected(Integer chatRoomNo) {
        List<ChatRoom> connectedList = chatRoomRepository.findByChatroomNo(chatRoomNo);
        return connectedList.size() == 2;
    }

์์ ์์์ ๋ณด๋ฉด ํ๋ช ๋ง ์ ์์ค์ด๊ธฐ ๋๋ฌธ์ ์๋์ด ๋์ํ๊ณ ์๋ ๊ฑธ ํ์ธํ ์ ์์ต๋๋ค.

๋๋ช
 ๋ค ์ ์ํ์๋, ์๋ฆผ์ด ๊ฐ์ง ์๋ ๊ฑธ ํ์ธํ์ค ์ ์์ต๋๋ค! 
์ด๋ฒ ํฌ์คํ ๊น์ง ์งํํ์ฌ ์ฑํ ๋ง์ดํฌ๋ก ์๋น์ค ๊ตฌํ ๊ณผ์ ์ ๋ชจ๋ ์ ๋ฆฌํ์ต๋๋ค. ๋ค์ ํฌ์คํ ๋ถํฐ๋ ๋ ๋ค๋ฅธ ๋ง์ดํฌ๋ก ์๋น์ค์ธ ๊ฒฐ์  ์๋น์ค ๊ตฌํ๊ณผ์ ์์ ํฌ์คํ ํ๋๋ก ํ๊ฒ ์ต๋๋ค ๐๐
https://develoyummer.tistory.com/112#3.2.%20Emitter%20RepositoryImpl
https://velog.io/@max9106/Spring-SSE-Server-Sent-Events%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-%EC%8B%A4%EC%8B%9C%EA%B0%84-%EC%95%8C%EB%A6%BC