์ง๋ ํฌ์คํ ์ ์ด์ด ์ฑํ ๋ง์ดํฌ๋ก ์๋น์ค ์๋ฆผ ๊ธฐ๋ฅ ๊ณผ์ ์ ํฌ์คํ ํ๋๋ก ํ๊ฒ ์ต๋๋ค. ์ฃผ์ ์ฝ๋๋ง ๋ค๋ฃฐ ์์ ์ด๋ผ ๋ค๋ฃจ์ง ์๋ ์ฝ๋๋ค์ ์๋ github๋ฅผ ํตํด ์ฐธ๊ณ ํ์๊ธธ ๋ฐ๋๋๋ค๐๐(msa-master ๋ธ๋์น)
๐ ์ฐธ๊ณ ์ฝ๋ : https://github.com/LminWoo99/PlantBackend/tree/msa-master
์๊ตฌํ์ ํ๋ก์ ํธ์ ์๋ฆผ ๊ธฐ๋ฅ์ 1:1 ์ฑํ ์์ ์ฝ์ง ์์ ๋ฉ์์ง์ ๋ํด ์๋ฆผ์ ์ ๊ณตํ๋ ๊ฒ์ด ํต์ฌ์ ๋๋ค.
1. Stomp์ ChannelInterceptor ํ์ฉ
2. Redis ์ฌ์ฉ ๊ฒฐ์ ์ด์
3. SSE(Server-Sent Events) ๋์
@RestController
@RequestMapping("/notification")
@Slf4j
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
/**
* @title ๋ก๊ทธ์ธ ํ ์ ์ SSE ์ฐ๊ฒฐ
* @param : String lastEventId,String jwtToken
**/
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> connect(
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId,
@RequestHeader("Authorization") String jwtToken) {
SseEmitter emitter = notificationService.subscribe(lastEventId, jwtToken);
return ResponseEntity.ok(emitter);
}
/**
* @title ๋ก๊ทธ์ธ ํ ์ ์ ์ ๋ชจ๋ ์๋ฆผ ์กฐํ
* @param : Long memberNo(ํ์ฌ ์ ์ํ ๋ฉค๋ฒ)
**/
@GetMapping("/all")
public ResponseEntity<List<NotificationResponse>> notifications(Long memberNo) {
return ResponseEntity.ok(notificationService.findAllById(memberNo));
}
/**
* @title ์๋ฆผ ์ฝ์ ์ํ๋ง ๋ณ๊ฒฝ
* @param : Long id(์๋ฆผ id)
**/
@PatchMapping("/checked/{id}")
public ResponseEntity<StatusResponseDto> readNotification(@PathVariable("id") Long id) {
notificationService.readNotification(id);
return ResponseEntity.ok(StatusResponseDto.success());
}
/**
* @title ์๋ฆผ ์ญ์
* @param : NotificationDeleteRequest request
**/
@DeleteMapping
public ResponseEntity<StatusResponseDto> deleteNotification(@RequestBody NotificationDeleteRequest request) {
notificationService.deleteNotification(request.getIdList());
return ResponseEntity.ok(StatusResponseDto.success());
}
}
@RequiredArgsConstructor
@Service
@Slf4j
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 1000L * 60 * 29 ;// 29๋ถ
public static final String PREFIX_URL = "๋๋ฉ์ธ url";
private final NotificationRepository notificationRepository;
private final EmitterRepository emitterRepository;
private final PlantServiceClient plantServiceClient;
private final CircuitBreakerFactory circuitBreakerFactory;
/**
* SSE ์ฐ๊ฒฐ ๋ฉ์๋
* @param : MemberDto memberDto, String lastEnventId
*/
@Transactional
public SseEmitter subscribe(String lastEventId, String jwtToken) {
ResponseEntity<MemberDto> joinMember = plantServiceClient.getJoinMember(jwtToken);
Integer memberNo = joinMember.getBody().getId().intValue();
//Emitter map์ ์ ์ฅํ๊ธฐ ์ํ key ์์ฑ
String id = memberNo + "_" + System.currentTimeMillis();
// SseEmitter map์ ์ ์ฅ
SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));
log.info("emitter add: {}", emitter);
// emitter์ ์๋ฃ ๋๋ ํ์์์ Event๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ, ํด๋น emitter๋ฅผ ์ญ์
emitter.onCompletion(() -> emitterRepository.deleteById(id));
emitter.onTimeout(() -> emitterRepository.deleteById(id));
// 503 ์๋ฌ๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํ ๋๋ฏธ Event ์ ์ก
sendToClient(emitter, id, "EventStream Created. [userId=" + memberNo + "]");
// ํด๋ผ์ด์ธํธ๊ฐ ๋ฏธ์์ ํ Event ๋ชฉ๋ก์ด ์กด์ฌํ ๊ฒฝ์ฐ ์ ์กํ์ฌ Event ์ ์ค์ ์๋ฐฉ
if (!lastEventId.isEmpty()) {
// id์ ํด๋นํ๋ cache ์ฐพ์
Map<String, Object> events = emitterRepository.findAllCacheStartWithId(String.valueOf(memberNo));
//๋ฏธ์์ ํ Event ๋ชฉ๋ก ์ ์ก
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
}
return emitter;
}
/**
* ์๋ฆผ ์ ์ก ๋ฉ์๋
* ์๋ฆผ ๋ฐ์ ์ ์ SseEmitter ๊ฐ์ ธ์์ ์๋ฆผ ์ ์ก
* @param : MemberDto sender, MemberDto receiver, NotifiTypeEnum type, String resource, String content
*/
@Transactional
public void send(MemberDto sender, MemberDto receiver, NotifiTypeEnum type, String resource, String content) {
// ์๋ฆผ ์์ฑ
Notification notification = Notification.builder()
.senderNo(sender.getId().intValue())
.receiverNo(receiver.getId().intValue())
.typeEnum(type)
.url(PREFIX_URL + type.getPath() + resource)
.content(content)
.isRead(false)
.isDel(false)
.build();
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
//๋ณด๋ธ ์ฌ๋ ์ด๋ฆ ์ฐพ๊ธฐ ์ํด feignClient๋ฅผ ํตํด ํธ์ถ
ResponseEntity<MemberDto> findMember = circuitBreaker.run(() -> plantServiceClient.findById(sender.getId()),
throwable -> ResponseEntity.ok(null));
notification.setSenderName(findMember.getBody().getNickname());
// SseEmitter ์บ์ ์กฐํ๋ฅผ ์ํด key์ prefix ์์ฑ
String id = String.valueOf(notification.getReceiverNo());
//์๋ฆผ ์ ์ฅ
notificationRepository.save(notification);
// ๋ก๊ทธ์ธ ํ ์ ์ ์ SseEmitter ๋ชจ๋ ๊ฐ์ ธ์ค๊ธฐ
Map<String, SseEmitter> sseEmitterMap = emitterRepository.findAllStartWithById(id);
sseEmitterMap.forEach(
(key, emitter) -> {
//์บ์ ์ ์ฅ(์ ์คํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌ)
emitterRepository.saveCache(key, notification);
//๋ฐ์ํฐ ์ ์ก
sendToClient(emitter, key, NotificationResponse.toDto(notification));
}
);
}
/**
* ํด๋ผ์ด์ธํธ์ SSE + ์๋ฆผ ๋ฐ์ดํฐ ์ ์ก ๋ฉ์๋
* @param : SseEmitter emitter, String id, Object data
*/
private void sendToClient(SseEmitter emitter, String id, Object data) {
try {
log.info("event : " + data);
emitter.send(SseEmitter.event()
.id(id)
.name("sse")
.data(data));
log.info("event call: " + emitter);
} catch (IOException ex) {
emitterRepository.deleteById(id);
log.error("--- SSE ์ฐ๊ฒฐ ์ค๋ฅ ----", ex);
}
}
/**
* ๋ก๊ทธ์ธํ ๋ฉค๋ฒ ์๋ฆผ ์ ์ฒด ์กฐํ
* ์กฐํ์ฉ ๋ฉ์๋ => (readOnly = true)
* @param : MemberDto memberDto
*/
public List<NotificationResponse> findAllById(Long memberNo) {
// ์ฑํ
์ ๋ง์ง๋ง ์๋ฆผ ์กฐํ
List<Notification> chat
= notificationRepository.findChatByReceiver(memberNo.intValue());
return chat.stream()
.map(NotificationResponse::toDto)
.sorted(Comparator.comparing(NotificationResponse::getId).reversed())
.collect(Collectors.toList());
}
/**
* ์ผ๋ฆผ ์ฝ์ ์ฒ๋ฆฌ ๋ฉ์๋
* @param : Long id
*/
@Transactional
public void readNotification(Long id) {
notificationRepository.updateIsReadById(id);
}
/**
* ์ผ๋ฆผ ์ญ์ ์ฒ๋ฆฌ ๋ฉ์๋
* @param : Long id
*/
public void deleteNotification(Long[] idList) {
for (Long id : idList) {
Notification notification = getNotification(id);
notificationRepository.delete(notification);
}
}
//== ๊ฐ๋ณ ์๋ฆผ ์กฐํ ==//
private Notification getNotification(Long id) {
return notificationRepository.findById(id)
.orElseThrow(ErrorCode::throwNotificationNotFound);
}
}
subscribe()๋ฅผ ๋ณด๋ฉด id๊ฐ์ {user_id}_{System.currentTimeMillis()} ํํ๋ก ์ฌ์ฉํ๋ ๊ฒ์ ๋ณผ ์ ์์ต๋๋ค. ์ด๋ ๊ฒ ์ฌ์ฉํ๋ ์ด์ ๊ฐ Last-Event-ID ํค๋์ ์๊ด์ด ์์ต๋๋ค.
Last-Event-IDํค๋๋ ํด๋ผ์ด์ธํธ๊ฐ ๋ง์ง๋ง์ผ๋ก ์์ ํ ๋ฐ์ดํฐ์ id๊ฐ์ ์๋ฏธํ๋ค๊ณ ํ์ต๋๋ค. id๊ฐ๊ณผ ์ ์ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ณ ์์ผ๋ฉด ์ด ๊ฐ์ ์ด์ฉํ์ฌ ์ ์ค๋ ๋ฐ์ดํฐ ์ ์ก์ ๋ค์ ํด์ค ์ ์์ต๋๋ค. ํ์ง๋ง ๋ง์ฝ id๊ฐ์ ๊ทธ๋๋ก ์ฌ์ฉํ๋ค๋ฉด ์ด๋ค ๋ฌธ์ ๊ฐ ์์๊น?
id๊ฐ์ ๊ทธ๋๋ก ์ฌ์ฉํ๋ค๋ฉด Last-Event-Id๊ฐ์ด ์๋ฏธ๊ฐ ์์ด์ง๋๋ค.
ํด๋ผ์ด์ธํธ์ sse์ฐ๊ฒฐ ์์ฒญ์ ์๋ตํ๊ธฐ ์ํด์๋ SseEmitter ๊ฐ์ฒด๋ฅผ ๋ง๋ค์ด ๋ฐํํด์ค์ผํฉ๋๋ค. SseEmitter ๊ฐ์ฒด๋ฅผ ๋ง๋ค ๋ ์ ํจ ์๊ฐ์ ์ค ์ ์๊ณ ์ด๋ ์ฃผ๋ ์๊ฐ ๋งํผ sse ์ฐ๊ฒฐ์ด ์ ์ง๋๊ณ , ์๊ฐ์ด ์ง๋๋ฉด ์๋์ผ๋ก ํด๋ผ์ด์ธํธ์์ ์ฌ์ฐ๊ฒฐ ์์ฒญ์ ๋ณด๋ด๊ฒ ๋ฉ๋๋ค.
id๋ฅผ key๋ก, SseEmitter๋ฅผ value๋ก ์ ์ฅํ๊ณ , SseEmitter์ ์๊ฐ ์ด๊ณผ ๋ฐ ๋คํธ์ํฌ ์ค๋ฅ๋ฅผ ํฌํจํ ๋ชจ๋ ์ด์ ๋ก ๋น๋๊ธฐ ์์ฒญ์ด ์ ์ ๋์ํ ์ ์๋ค๋ฉด ์ ์ฅํด๋ SseEmitter๋ฅผ ์ญ์ ํ๊ฒ ๋ฉ๋๋ค.
@Repository
@Slf4j
public class EmitterRepository {
//Map์ ํ์๊ณผ ์ฐ๊ฒฐ๋ SSE SseEmitter ๊ฐ์ฒด๋ฅผ ์ ์ฅ. ๋์์ฑ ๋ณด์ฅ
public final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
//Event๋ฅผ ์บ์์ ์ ์ฅ
public final Map<String, Object> cache = new ConcurrentHashMap<>();
//id, sseEmitter map์ ์ ์ฅ
public SseEmitter save(String id, SseEmitter sseEmitter) {
emitters.put(id, sseEmitter);
return sseEmitter;
}
//id์ event๋ฅผ ๋งค๊ฐ๋ณ์๋ก ๋ฐ์ cache ๋งต์ ์ ์ฅ
public void saveCache(String id, Object event) {
cache.put(id, event);
}
//id๋ก ์์ํ๋ ํค๋ฅผ ๊ฐ์ง emitters ๋งต ํญ๋ชฉ์ ํํฐ๋งํ์ฌ ๋งต์ผ๋ก ๋ฐํ
public Map<String, SseEmitter> findAllStartWithById(String id) {
log.info("map: " + emitters);
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(id))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
//id๋ก ์์ํ๋ ํค๋ฅผ ๊ฐ์ง eventCache ๋งต ํญ๋ชฉ์ ํํฐ๋งํ์ฌ ๋งต์ผ๋ก ๋ฐํ
public Map<String, Object> findAllCacheStartWithId(String id) {
return cache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(id))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// id๋ก ์์ํ๋ ํค๋ฅผ ๊ฐ์ง emitters ๋งต ํญ๋ชฉ์ ๋ชจ๋ ์ ๊ฑฐ
public void deleteAllStartWithId(String id) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(id)) {
emitters.remove(key);
}
}
);
}
// emitters ๋งต์์ ํด๋น id๋ฅผ ๊ฐ์ง ํญ๋ชฉ์ ์ญ์
public void deleteById(String id) {
emitters.remove(id);
}
}
๐ emitters์ eventCache๊ฐ ๋งต ํํ์ธ ์ด์ ?
emitters์ ๊ฒฝ์ฐ key์ value์ ๊ฐ๊ฐ emitterId์ SseEmitter๊ฐ์ฒด๋ฅผ ์ ์ฅํ๊ณ
eventCache๋ key์ value์ ๊ฐ๊ฐ eventCacheId์ euentCache๊ฐ์ฒด๋ฅผ ์ ์ฅํ๋ค.
์ด๋ฅผ ํตํด ์ฃผ์ด์ง ํค๋ฅผ ์ฌ์ฉํ์ฌ ๋น ๋ฅด๊ฒ ๋ฐ์ดํฐ๋ฅผ ๊ฒ์ํ ์ ์๊ณ , ์ค๋ณต๋ ํค๋ฅผ ๊ฐ์ง ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ์ ์๋ค.
concurrentHashMap์ ์ฌ๋ฌ ์ค๋ ๋์์ ๋์์ ์์ ํ๊ฒ ๋ฐ์ดํฐ์ ์ ๊ทผํ ์ ์๋ ๋งต ๊ตฌํ์ฒด์ด๋ค.
์๋ฆผ ์์คํ ์์๋ ์ฌ๋ฌ ํด๋ผ์ด์ธํธ๊ฐ ๋์์ ๊ตฌ๋ ํ๊ณ ์ด๋ฒคํธ๋ฅผ ์ ์กํ ์ ์์ผ๋ฏ๋ก, ๋์์ฑ์ ์ ์ดํ๋ ๊ฒ์ด ์ค์
๋ฐ๋ผ์ ๋งต ํํ๋ฅผ ์ฌ์ฉํจ์ผ๋ก์ ๋ฐ์ดํฐ ์ ์ฅ, ๊ณ ์ ์ฑ ๋ณด์ฅ, ์ฑ๋ฅ ๋ฑ์ ์ด์ ์ ์ป์ผ๋ฉฐ ํจ์จ์ ์ผ๋ก ๊ฐ์ฒด๋ฅผ ๊ด๋ฆฌํ ์ ์๋ค.
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
// StompCommand์ ๋ฐ๋ผ์ ๋ก์ง์ ๋ถ๊ธฐํด์ ์ฒ๋ฆฌํ๋ ๋ฉ์๋๋ฅผ ํธ์ถํ๋ค.
String username = verifyAccessToken(getAccessToken(accessor));
log.info("StompAccessor = {}", accessor);
handleMessage(accessor.getCommand(), accessor, username);
return message;
}
private void handleMessage(StompCommand stompCommand, StompHeaderAccessor accessor, String username) {
log.info(stompCommand.toString());
switch (stompCommand) {
case CONNECT:
connectToChatRoom(accessor, username);
break;
case SUBSCRIBE:
case SEND:
verifyAccessToken(getAccessToken(accessor));
break;
}
}
private void connectToChatRoom(StompHeaderAccessor accessor, String username) {
// ์ฑํ
๋ฐฉ ๋ฒํธ๋ฅผ ๊ฐ์ ธ์จ๋ค.
Integer chatRoomNo = getChatRoomNo(accessor);
// ์ฑํ
๋ฐฉ ์
์ฅ ์ฒ๋ฆฌ -> Redis์ ์
์ฅ ๋ด์ญ ์ ์ฅ
chatRoomService.connectChatRoom(chatRoomNo, username);
// // ์ฝ์ง ์์ ์ฑํ
์ ์ ๋ถ ์ฝ์ ์ฒ๋ฆฌ
chatService.updateCountAllZero(chatRoomNo, username);
// ํ์ฌ ์ฑํ
๋ฐฉ์ ์ ์์ค์ธ ์ธ์์ด ์๋์ง ํ์ธํ๋ค.
boolean isConnected = chatRoomService.isConnected(chatRoomNo);
if (isConnected) {
chatService.updateMessage(username, chatRoomNo);
}
}
๐ Note
connectToChatRoom ๋ฉ์๋๋ ํฌ๊ฒ 3๊ฐ์ง ๋์์ ํ๊ฒ ๋ฉ๋๋ค.
1. Redis์ ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์ํ๋ ค๋ ํ์์ ์ ์ฅํ๋ค.
2. ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์ํ๋ ค๋ ํ์์ด ์ฝ์ง ์์ ์ฑํ ์ด ์๋ค๋ฉด, ์ ๋ถ ์ฝ์ ์ฒ๋ฆฌ ํด์ค๋ค(readcount 0์ผ๋ก ์ ๋ฐ์ดํธ)
3. ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์์ค์ธ ํ์์ด ์๋ค๋ฉด, ์ฑํ ๋ฆฌ์คํธ ๋ค์ ์๋ฒ์ ์์ฒญํด์ ๋ฐ๋๋ก ์ฒ๋ฆฌํ๋ค.
ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์์ค์ธ ํ์์ด ์๋ค๋ฉด, ์ฑํ ๋ฆฌ์คํธ๋ฅผ ๋ค์ ์๋ฒ์ ์์ฒญํด์ ๋ฐ๋๋ก ํ ์ด์ ๋ ์๋์ ๊ฐ์ต๋๋ค.
ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์์ค์ธ ํ์์ A, ํ์ฌ ์ฑํ ๋ฐฉ์ ์ ์ํ๋ ค๋ ํ์์ B๋ผ๊ณ ๊ฐ์ ํ๊ฒ ์ต๋๋ค.
A์ ํ๋ฉด์์ B๊ฐ ์ฝ์ง ์์ ์ฑํ ๋ค์ด ์์ฝ์ ํ์๋๊ฒ ๋ฉ๋๋ค.
B๊ฐ ์ฑํ ๋ฐฉ์ ์ ์์ ์๋ํ๋ฉด, ChannelInterceptor์ Connect ์ด๋ฒคํธ๊ฐ ์กํ๊ฒ ๋๊ณ , B๊ฐ ์ฝ์ง ์์ ์ฑํ ๋ค์ ๋ชจ๋ ์ฝ์ ์ฒ๋ฆฌ ๋ฉ๋๋ค.
์ฆ, A๋ B๊ฐ ์ฑํ ๋ฐฉ์ ์ ์ํ๋ฉด์ ์ฑํ ์ ๋ชจ๋ ์ฝ์๋๋ฐ ์ฝ์ง ์์ ์ํ์ ๋ฐ์ดํฐ๋ฅผ ์์ง ๊ฐ์ง๊ณ ์๋ ๊ฒ์ ๋๋ค.
๋ฐ๋ผ์ B๊ฐ ์ ์ํ๋ฉด์ ๋ชจ๋ ์ฑํ ์ ์ฝ์์ฒ๋ฆฌ ํ๊ธฐ๋๋ฌธ์, A์๊ฒ B๊ฐ ๋ชจ๋ ์ฑํ ์ ์ฝ์์ผ๋ ์ต์ ๋ฒ์ ์ ์ฑํ ๋ฐ์ดํฐ๋ฅผ ์๋ฒ์์ ๊ฐ์ ธ์ฌ ์ ์๋๋ก ์๋ฒ์์ ์๋ ค์ฃผ๊ฒ ์ฒ๋ฆฌํ์์ต๋๋ค.
/**
* ์ฑํ
๋ฐฉ์ 1๋ช
์ฐ๊ฒฐ๋๋์ง ํ์ธ ๋ฉ์๋
* @param : Long chatRoomNo
*/
public boolean isConnected(Integer chatRoomNo) {
List<ChatRoom> connectedList = chatRoomRepository.findByChatroomNo(chatRoomNo);
return connectedList.size() == 1;
}
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@RedisHash(value = "chatRoom")
public class ChatRoom {
@Id
private String id;
@Indexed
private Integer chatroomNo;
@Indexed
private String nickname;
@Builder
public ChatRoom(Integer chatroomNo, String nickname) {
this.chatroomNo = chatroomNo;
this.nickname = nickname;
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {
private final ChatRepository chatRepository;
private final MongoChatRepository mongoChatRepository;
private final MessageSender sender;
private final AggregationSender aggregationSender;
private final MongoTemplate mongoTemplate;
private final ChatRoomService chatRoomService;
private final PlantServiceClient plantServiceClient;
private final CircuitBreakerFactory circuitBreakerFactory;
private final TokenHandler tokenHandler;
private final NotificationService notificationService;
/**
* ์ฑํ
๋ฐฉ ์์ฑ ๋ฉ์๋
* ๊ฑฐ๋ ๊ฒ์๊ธ์ ์ฌ๋ฆฌ์ง ์์ ์ฌ๋๋ง ํธ์ถํ๋ ๋ฉ์๋
* ๊ตฌ๋งค ํฌ๋ง์๋ง ์ฑํ
๋ฐฉ์ ์์ฑ ๊ฐ๋ฅ
* FeignCLient๋ฅผ ํตํด plant-service์์ ๊ฑฐ๋๊ฐ๋ฅ ์ฌ๋ถ ํ์ธ
* @param : MemberDto memberDto, ChatRequestDto requestDto
*/
@Transactional
public Chat makeChatRoom(Integer memberNo, ChatRequestDto requestDto) {
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
// ์ฑํ
์ ๊ฑธ๋ ค๊ณ ํ๋ ๊ฑฐ๋๊ธ์ด ๊ฑฐ๋ ๊ฐ๋ฅ ์ํ์ธ์ง ์กฐํํด๋ณธ๋ค.
ResponseEntity<ResponseTradeBoardDto> tradeBoardDto = circuitBreaker.run(() ->
plantServiceClient.boardContent(requestDto.getTradeBoardNo().longValue()),
throwable -> ResponseEntity.ok(null));
// ์กฐํํด์จ ๊ฑฐ๋๊ธ ์ํ๊ฐ ๊ฑฐ๋์๋ฃ ์ด๋ผ๋ฉด ๊ฑฐ๋๊ฐ ๋ถ๊ฐ๋ฅํ ์ํ์ด๋ค.
if (tradeBoardDto.getBody().getStatus().equals("๊ฑฐ๋์๋ฃ")) {
throw new IllegalStateException("ํ์ฌ ๊ฑฐ๋๊ฐ๋ฅ ์ํ๊ฐ ์๋๋๋ค.");
}
Integer tradeBoardNo = tradeBoardDto.getBody().getId().intValue();
//์ด๋ฏธ ํด๋น๊ธ ๊ธฐ์ค์ผ๋ก ์ฑํ
์ ์์ฒญํ ์ฌ๋๊ณผ ๋ฐ๋ ์ฌ๋์ด ์ผ์นํ ๊ฒฝ์ฐ ์ฒดํฌ
if (chatRepository.existChatRoomByBuyer(tradeBoardNo, requestDto.getCreateMember(), memberNo)) {
Chat existedChat = chatRepository.findByTradeBoardNoAndChatNo(tradeBoardNo, requestDto.getCreateMember());
return existedChat;
}
if (!chatRepository.existChatRoomByBuyer(tradeBoardNo, requestDto.getCreateMember(), memberNo)) {
Chat chat = Chat.builder()
.tradeBoardNo(requestDto.getTradeBoardNo())
.createMember(requestDto.getCreateMember())
.joinMember(memberNo)
.regDate(LocalDateTime.now())
.build();
Chat savedChat = chatRepository.save(chat);
// ์ฑํ
๋ฐฉ ์นด์ดํธ ์ฆ๊ฐ
AggregationDto aggregationDto = AggregationDto
.builder()
.isIncrease("true")
.target(AggregationTarget.CHAT)
.tradeBoardNo(requestDto.getTradeBoardNo())
.build();
aggregationSender.send(KafkaUtil.KAFKA_AGGREGATION, aggregationDto);
return savedChat;
}
throw new IllegalArgumentException("์กด์ฌํ์ง ์๋ ๊ฒ์๊ธ ์
๋๋ค");
}
/**
* ์ฑํ
๋ฐฉ ๋ฆฌ์คํธ ์กฐํ ๋ฉ์๋
* FeignCLient๋ฅผ ํตํด plant-service์์ ์ ์ ์ ๋ณด ์กฐํํ ์ฑํ
๋ฐฉ ๋ง๋ ์ฌ๋์ธ์ง ํ์ธ
* mongodb์์ ์ฑํ
๋ฉ์ธ์ง ๋ณด๋ธ ์๊ฐ์ ๋ด๋ฆผ์ฐจ์์ผ๋ก ์ ๋ ฌํ ์ฒซ๋ฒ์งธ ๊ฐ ๋ง์ง๋ง ๋ฉ์ธ์ง๋ก ์ธํ
* @param : Integer memberNo, Integer tradeBoardNo
*/
public List<ChatRoomResponseDto> getChatList(Integer memberNo, Integer tradeBoardNo) {
List<ChatRoomResponseDto> chatRoomList = chatRepository.getChattingList(memberNo, tradeBoardNo);
//Participant ์ฑ์์ผ๋จ(username)
chatRoomList
.forEach(chatRoomDto -> {
//param์ผ๋ก ๋์ด์จ ๋ฉค๋ฒ๊ฐ ์ฑํ
๋ง๋ ๋ฉค๋ฒ์ผ ๊ฒฝ์ฐ => Participant์ ์ฐธ๊ฐํ ๋ฉค๋ฒ
// ResponseEntity<MemberDto> byId = plantServiceClient.findById(chatRoomDto.getCreateMember().longValue());
if (memberNo.equals(chatRoomDto.getCreateMember())) {
ResponseEntity<MemberDto> memberDtoResponse = plantServiceClient.findById(chatRoomDto.getJoinMember().longValue());
chatRoomDto.setParticipant(new ChatRoomResponseDto.Participant(memberDtoResponse.getBody().getUsername(), memberDtoResponse.getBody().getNickname()));
}
//param์ผ๋ก ๋์ด์จ ๋ฉค๋ฒ๊ฐ ์ฑํ
๋ฐฉ์ ์ฐธ๊ฐํ ๋ฉค๋ฒ์ผ ๊ฒฝ์ฐ => Participant์ ์ฑํ
๋ฐฉ ๋ง๋ ๋ฉค๋ฒ
if (!memberNo.equals(chatRoomDto.getCreateMember())){
ResponseEntity<MemberDto> memberDtoResponse = plantServiceClient.findById(chatRoomDto.getCreateMember().longValue());
chatRoomDto.setParticipant(new ChatRoomResponseDto.Participant(memberDtoResponse.getBody().getUsername(), memberDtoResponse.getBody().getNickname()));
}
// // ์ฑํ
๋ฐฉ๋ณ๋ก ์ฝ์ง ์์ ๋ฉ์์ง ๊ฐ์๋ฅผ ์
ํ
long unReadCount = countUnReadMessage(chatRoomDto.getChatNo(), memberNo);
chatRoomDto.setUnReadCount(unReadCount);
// ์ฑํ
๋ฐฉ๋ณ๋ก ๋ง์ง๋ง ์ฑํ
๋ด์ฉ๊ณผ ์๊ฐ์ ์
ํ
Page<Chatting> chatting =
mongoChatRepository.findByChatRoomNoOrderBySendDateDesc(chatRoomDto.getChatNo(), PageRequest.of(0, 1));
if (chatting.hasContent()) {
Chatting chat = chatting.getContent().get(0);
ChatRoomResponseDto.LatestMessage latestMessage = ChatRoomResponseDto.LatestMessage.builder()
.context(chat.getContent())
.sendAt(chat.getSendDate())
.build();
chatRoomDto.setLatestMessage(latestMessage);
}
});
return chatRoomList;
}
/**
* ์ฑํ
๋ฉ์ธ์ง ์กฐํ ๋ฉ์๋
* ์ฑํ
๋ฉ์ธ์ง ์กฐํ์ ํด๋น ๋ฉ์ธ์ง๋ฅผ ์ฝ์ ๊ฒ์ด๋ฏ๋ก ๋ฉ์ธ์ง ์ฝ์ ์ฒ๋ฆฌ๋ ์งํ
* @param : Integer chatRoomNo, Integer memberNo
*/
public ChattingHistoryResponseDto getChattingList(Integer chatRoomNo, Integer memberNo) {
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
// member id๋ก ์กฐํ
ResponseEntity<MemberDto> memberDto = circuitBreaker.run(() -> plantServiceClient.findById(memberNo.longValue()),
throwable -> ResponseEntity.ok(null));
updateCountAllZero(chatRoomNo, memberDto.getBody().getUsername());
List<ChatResponseDto> chattingList = mongoChatRepository.findByChatRoomNo(chatRoomNo)
.stream()
.map(chat -> new ChatResponseDto(chat, memberNo)
)
.collect(Collectors.toList());
return ChattingHistoryResponseDto.builder()
.chatList(chattingList)
.email(memberDto.getBody().getEmail())
.build();
}
/**
* ๋ฉ์ธ์ง ์ ์ก ๋ฉ์๋
* jwt ํ ํฐ์์ username ์ถ์ถ
* ์นดํ์นด ํ ํฝ์ผ๋ก ๋ฉ์ธ ์ ์ก
* @param : Message message, String accessToken
*/
public void sendMessage(Message message, String accessToken) {
// member id๋ก ์กฐํ
ResponseEntity<MemberDto> memberDto = plantServiceClient.findByUsername(tokenHandler.getUid(accessToken));
// ์ฑํ
๋ฐฉ์ ๋ชจ๋ ์ ์ ๊ฐ ์ฐธ์ฌ์ค์ธ์ง ํ์ธํ๋ค.
boolean isConnectedAll = chatRoomService.isAllConnected(message.getChatNo());
// 1:1 ์ฑํ
์ด๋ฏ๋ก 2๋ช
์ ์์ readCount 0, ํ๋ช
์ ์์ 1
Integer readCount = isConnectedAll ? 0 : 1;
// message ๊ฐ์ฒด์ ๋ณด๋ธ์๊ฐ, ๋ณด๋ธ์ฌ๋ memberNo, ๋๋ค์์ ์
ํ
ํด์ค๋ค.
message.setSendTimeAndSender(LocalDateTime.now(), memberDto.getBody().getId().intValue(), memberDto.getBody().getNickname(), readCount);
// ๋ฉ์์ง๋ฅผ ์ ์กํ๋ค.
sender.send(KafkaUtil.KAFKA_TOPIC, message);
}
/**
* ์๋ฆผ ์ ์ก ๋ฐ ๋ฉ์ธ์ง ์ ์ฅ ๋ฉ์๋
* FeignCLient๋ฅผ ํตํด plant-service์์ ๋ฉ์ธ์ง ๋ณด๋ธ ์ ์ ์ ๋ณด ์กฐํ
* ์๋ฆผ์ ์๋๋ฐฉ์ด ์ฝ์ง ์์ ๊ฒฝ์ฐ๋ง ์ ์ก
* @param : Message message
*/
@Transactional
public Message sendNotificationAndSaveMessage(Message message) {
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
//๋ฉ์ธ์ง ์ ์ฅ๊ณผ ์๋ฆผ ๋ฐ์ก์ ์ํด ๋ฉ์ธ์ง ๋ณด๋ธ ํ์ ์กฐํ
ResponseEntity<MemberDto> memberDto = circuitBreaker.run(() -> plantServiceClient.findById(message.getSenderNo().longValue()),
throwable -> ResponseEntity.ok(null));
// ์๋๋ฐฉ์ด ์ฝ์ง ์์ ๊ฒฝ์ฐ์๋ง ์๋ฆผ ์ ์ก
if (message.getReadCount().equals(1)) {
// ์๋ ์ ์ก์ ์ํด ๋ฉ์์ง๋ฅผ ๋ฐ๋ ์ฌ๋์ ์กฐํํ๋ค.
Integer memberNo = chatRepository.getReceiverMember(message.getChatNo(), message.getSenderNo());
ResponseEntity<MemberDto> receiveMember = circuitBreaker.run(() -> plantServiceClient.findById(memberNo.longValue()),
throwable -> ResponseEntity.ok(null));
String content = message.getContentType().equals("image")
? "image" : message.getContent();
// ์๋์ ๋ณด๋ผ URL์ ์์ฑํ๋ค.
String sendUrl = getNotificationUrl(message.getTradeBoardNo(), message.getChatNo());
//์๋ฆผ ์ ์ก
notificationService.send(memberDto.getBody(), receiveMember.getBody(), NotifiTypeEnum.CHAT, sendUrl, content);
}
// ๋ณด๋ธ ์ฌ๋์ผ ๊ฒฝ์ฐ์๋ง ๋ฉ์์ง๋ฅผ ์ ์ฅ -> ์ค๋ณต ์ ์ฅ ๋ฐฉ์ง
if (message.getSenderEmail().equals(memberDto.getBody().getEmail())) {
// Message ๊ฐ์ฒด๋ฅผ ์ฑํ
์ํฐํฐ๋ก ๋ณํ
Chatting chatting = message.convertEntity();
// ์ฑํ
๋ด์ฉ์ ์ ์ฅ
Chatting savedChat = mongoChatRepository.save(chatting);
// ์ ์ฅ๋ ๊ณ ์ ID๋ฅผ ๋ฐํ
message.setId(savedChat.getId());
}
return message;
}
/**
* ์ฐธ๊ฐ์ ์
์ฅ ์๋ฆผ ๋ฉ์๋
* @param : String email, Integer chatRoomNo
*/
public void updateMessage(String email, Integer chatRoomNo) {
Message message = Message.builder()
.contentType("notice")
.chatNo(chatRoomNo)
.content(email + " ๋์ด ๋์์ค์
จ์ต๋๋ค.")
.build();
sender.send(KafkaUtil.KAFKA_TOPIC, message);
}
/**
* ์ฝ์ง ์์ ๋ฉ์์ง ์ฑํ
์ฅ ์
์ฅ์ ์ฝ์ ์ฒ๋ฆฌ ๋ฉ์๋
* @param : Integer chatNo, String username
*/
public void updateCountAllZero(Integer chatNo, String username) {
CircuitBreaker circuitBreaker = circuitBreakerFactory.create("circuitbreaker");
ResponseEntity<MemberDto> findMember = circuitBreaker.run(() -> plantServiceClient.findByUsername(username),
throwable -> ResponseEntity.ok(null));
//MongoDb Update Query
Update update = new Update().set("readCount", 0);
//ne-> not equal
Query query = new Query(where("chatRoomNo").is(chatNo)
.and("senderNo").ne(findMember.getBody().getId().intValue()));
mongoTemplate.updateMulti(query, update, Chatting.class);
}
/**
* ์ฝ์ง ์์ ๋ฉ์์ง ์นด์ดํธ ๋ฉ์๋
* @param : Integer chatNo, Integer senderNo
*/
long countUnReadMessage(Integer chatRoomNo, Integer senderNo) {
Query query = new Query(where("chatRoomNo").is(chatRoomNo)
.and("readCount").is(1)
.and("senderNo").ne(senderNo));
return mongoTemplate.count(query, Chatting.class);
}
private String getNotificationUrl(Integer tradeBoardNo, Integer chatNo) {
return chatNo +
"/" +
tradeBoardNo;
}
/**
* ํ๋งค์๊ฐ ์ฐธ๊ฐํ ์ฑํ
๋ฐฉ์ด ์กด์ฌํ๋์ง ์ ๋ฌด ์ฒ๋ฆฌ ๋ฉ์๋
* ๋จ์ ์กฐํ์ฉ ๋ฉ์๋๋ผ readOnly = true
*
* @param : Integer tradeBoardNo, Integer memberNo
*/
@Transactional(readOnly = true)
public Boolean existChatRoomBySeller(Integer tradeBoardNo, Integer memberNo) {
return chatRepository.existChatRoomBySeller(tradeBoardNo, memberNo);
}
/**
* ์ฑํ
๋ฐฉ ์ญ์ ๋ฉ์๋
* plant-service์์ kafka๋ฅผ ํตํด ๊ฑฐ๋ ๊ฒ์๊ธ ์ญ์ ์์ฒญ์ ๋ฐ์ผ๋ฉด ์ญ์
*
* @param : Integer tradeBoardNo
*/
@Transactional
public void deleteChatRoom(Integer tradeBoardNo) {
List<Integer> chatRoomNoList = chatRepository.deleteChatRoomAndReturnChatNo(tradeBoardNo);
deleteChatting(chatRoomNoList);
}
@Transactional
public void deleteChatting(List<Integer> chatRoomNoList) {
// ์ค๋ณต๋ chatNo ์ ๊ฑฐ
Set<Integer> uniqueChatNoSet = new HashSet<>(chatRoomNoList);
// ์ค๋ณต ์ ๊ฑฐ ํ์ chatNo์ ํด๋นํ๋ ์ฑํ
๋ฐ์ดํฐ ์ญ์
mongoTemplate.remove(query(where("chatRoomNo").in(uniqueChatNoSet)), Chatting.class);
}
}
// ์ฑํ
๋ฐฉ์ ๋ชจ๋ ์ ์ ๊ฐ ์ฐธ์ฌ์ค์ธ์ง ํ์ธํ๋ค.
boolean isConnectedAll = chatRoomService.isAllConnected(message.getChatNo());
// 1:1 ์ฑํ
์ด๋ฏ๋ก 2๋ช
์ ์์ readCount 0, ํ๋ช
์ ์์ 1
Integer readCount = isConnectedAll ? 0 : 1;
/**
* ์ฑํ
๋ฐฉ ์ ์ 2๋ช
์ฐผ๋์ง ํ์ธ ๋ฉ์๋
* @param : Long chatRoomNo
*/
public boolean isAllConnected(Integer chatRoomNo) {
List<ChatRoom> connectedList = chatRoomRepository.findByChatroomNo(chatRoomNo);
return connectedList.size() == 2;
}
์์ ์์์ ๋ณด๋ฉด ํ๋ช ๋ง ์ ์์ค์ด๊ธฐ ๋๋ฌธ์ ์๋์ด ๋์ํ๊ณ ์๋ ๊ฑธ ํ์ธํ ์ ์์ต๋๋ค.
๋๋ช
๋ค ์ ์ํ์๋, ์๋ฆผ์ด ๊ฐ์ง ์๋ ๊ฑธ ํ์ธํ์ค ์ ์์ต๋๋ค!
์ด๋ฒ ํฌ์คํ ๊น์ง ์งํํ์ฌ ์ฑํ ๋ง์ดํฌ๋ก ์๋น์ค ๊ตฌํ ๊ณผ์ ์ ๋ชจ๋ ์ ๋ฆฌํ์ต๋๋ค. ๋ค์ ํฌ์คํ ๋ถํฐ๋ ๋ ๋ค๋ฅธ ๋ง์ดํฌ๋ก ์๋น์ค์ธ ๊ฒฐ์ ์๋น์ค ๊ตฌํ๊ณผ์ ์์ ํฌ์คํ ํ๋๋ก ํ๊ฒ ์ต๋๋ค ๐๐
https://develoyummer.tistory.com/112#3.2.%20Emitter%20RepositoryImpl
https://velog.io/@max9106/Spring-SSE-Server-Sent-Events%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%9C-%EC%8B%A4%EC%8B%9C%EA%B0%84-%EC%95%8C%EB%A6%BC