앞서 실시간 통신 방법에 대한 이론을 전체적으로 다뤄봤다. 이번에는 각 실시간 통신을 Spring 환경에서 구현하는 방법에 대해 설명하고 비교 분석 테스트를 위한 코드를 설명하겠다.
‘그룹별 실시간 위치 공유’ 로직을 위한 테스트 코드를 작성할 것이다. 사용자가 각 좌표를 공유하는 상황이고 순수하게 실시간 통신 기술의 비교를 위해 데이터 베이스를 추가하지는 않았다.
각 사용자의 좌표에 대한 dto다. 해당 데이터를 공유되는 사용자들에게 전송할 것이다.
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class LocationDto {
private double x;
private double y;
}
클라이언트의 실시간 위치 공유를 위해서는 부가적인 추가가 필요해보였다. 단순히 비교만을 위해 서버에서 Random 함수를 통해 임의의 좌표값을 생성하도록 했다.
/* Math.random() 함수를 통해 임의의 (x,y)좌표를 생성 */
private LocationDto makeRandomLocation() {
double randomX = Math.random() * 100;
double randomY = Math.random() * 100;
return new LocationDto(randomX, randomY);
}
@GetMapping("/cur")
public ResponseEntity<LocationDto> shareCurLocation() {
return ResponseEntity.ok(locationService.shareCurLocation());
}
private final Map<Long, List<LocationDto>> locations = new ConcurrentHashMap<>();
public LocationDto shareCurLocation(Long groupId) {
List<LocationDto> groupLocations = locations.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>());
// 새로운 위치 정보 생성 및 추가
LocationDto newLocation = makeRandomLocation();
groupLocations.add(newLocation);
return newLocation;
}
private final Map<Long, BlockingQueue<DeferredResult<LocationDto>>> groupRequests =
new ConcurrentHashMap<>();
poll() : long polling API
notifyGroup() : long polling에 있어서 event 발생 API
@GetMapping("/long/{groupId}")
public DeferredResult<LocationDto> poll(@PathVariable final Long groupId) {
return locationService.longPoll(groupId);
}
@PostMapping("/long/{groupId}/notify")
public void notifyGroup(@PathVariable final Long groupId) {
locationService.notifyGroup(groupId);
}
public DeferredResult<LocationDto> longPoll(final Long groupId) {
// TIMEOUT에 대한 deferredResult를 생성한다. Timeout이 발생하면 error를 반환하도록 한다.
final DeferredResult<LocationDto> deferredResult = new DeferredResult<>(TIMEOUT);
deferredResult.onTimeout(() -> deferredResult.setErrorResult("Request timeout"));
// 그룹 id에 대한 큐가 존재하지 않으면 안에 Queue를 생성하고 객체를 저장한다.
groupRequests
.computeIfAbsent(groupId, k -> new LinkedBlockingQueue<>())
.add(deferredResult);
return deferredResult;
}
public void notifyGroup(final Long groupId) {
// 자신의 그룹 id에 대한 Queue를 반환한다.
final BlockingQueue<DeferredResult<LocationDto>> queue = groupRequests.get(groupId);
// 큐에서 DefferedResult 객체를 빼고 해당 객체에 random을 통해 생성한 객체를 setResult()를 통해 저장
Optional.ofNullable(queue)
.ifPresent(
q -> {
while (!q.isEmpty()) {
final DeferredResult<LocationDto> connection = q.poll();
if (connection != null) {
connection.setResult(makeRandomLocation());
}
}
});
}
@Component
@Slf4j
public class CustomWebSocketHandler extends TextWebSocketHandler {
private final ObjectMapper objectMapper;
private final LocationService locationService;
// group id에 따른 세션 set
private final Map<Long, Set<WebSocketSession>> groupSessions = new ConcurrentHashMap<>();
public CustomWebSocketHandler(LocationService locationService, ObjectMapper objectMapper) {
this.locationService = locationService;
this.objectMapper = objectMapper;
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
Long groupId = extractGroupIdFromSession(session);
LocationDto location = locationService.makeRandomLocation();
Set<WebSocketSession> set = groupSessions.getOrDefault(groupId,new CopyOnWriteArraySet<>());
set.forEach(
s -> {
if(!s.isOpen()) {
set.remove(s);
return;
}
try {
log.info("WEBSOCKET : "+location.getX()+" "+location.getY());
s.sendMessage(new TextMessage(objectMapper.writeValueAsString(location)));
}catch (IOException e) {
throw new RuntimeException(e);
}
}
);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Long groupId = (long)(Math.random() * 10);
Set set = groupSessions.getOrDefault(groupId,new CopyOnWriteArraySet<>());
set.add(session);
groupSessions.put(groupId,set);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
throws Exception {
Long groupId = extractGroupIdFromSession(session);
if (groupId != null) {
Set<WebSocketSession> sessions = groupSessions.get(groupId);
if (sessions != null) {
sessions.remove(session);
if (sessions.isEmpty()) {
groupSessions.remove(groupId);
}
}
}
}
// random group id 생성
private Long extractGroupIdFromSession(WebSocketSession session) {
Long groupId = (long)(Math.random() * 10);
return groupId;
}
}
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final CustomWebSocketHandler customWebSocketHandler;
public WebSocketConfig(CustomWebSocketHandler customWebSocketHandler) {
this.customWebSocketHandler = customWebSocketHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(customWebSocketHandler,"/ws").setAllowedOrigins("*");
}
}
STOMP는 기본적으로 내장된 Message Brocker를 제공한다. RabbitMQ를 추가하여 테스트할 수도 있지만 순수한 성능 분석을 위해 내장 message brocker를 사용했다.
- WebSocketMessageBrokerConfigurer를 구현한 StompConfig 등록
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(final StompEndpointRegistry registry) {
registry.addEndpoint("/location").setAllowedOrigins("*");
}
@Override
public void configureMessageBroker(final MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/pub"); // 그룹에 메시지 전달 경로
registry.enableSimpleBroker("/sub"); // 그룹 구독 경로
}
}
@Controller
@Slf4j
public class StompController {
private final SimpMessagingTemplate template;
private final LocationService locationService;
public StompController(
final SimpMessagingTemplate template, final LocationService locationService) {
this.template = template;
this.locationService = locationService;
}
@MessageMapping("/share/{id}")
public void shareCurLocationByStomp(@DestinationVariable final Long id) {
log.info("STOMP");
template.convertAndSend(
String.format("/sub/location/%d", id), locationService.makeRandomLocation());
}
}