실시간 통신 분석 프로젝트 (각 방법의 구현 코드)

진주원(JooWon Jin)·2024년 6월 17일
0

REAL-TIME-COMMUNICATION

목록 보기
2/3
post-thumbnail

서론

앞서 실시간 통신 방법에 대한 이론을 전체적으로 다뤄봤다. 이번에는 각 실시간 통신을 Spring 환경에서 구현하는 방법에 대해 설명하고 비교 분석 테스트를 위한 코드를 설명하겠다.

테스트 상황

‘그룹별 실시간 위치 공유’ 로직을 위한 테스트 코드를 작성할 것이다. 사용자가 각 좌표를 공유하는 상황이고 순수하게 실시간 통신 기술의 비교를 위해 데이터 베이스를 추가하지는 않았다.

Common Logic


LocationDto

각 사용자의 좌표에 대한 dto다. 해당 데이터를 공유되는 사용자들에게 전송할 것이다.

@Getter
@NoArgsConstructor
@AllArgsConstructor
public class LocationDto {
    private double x;
    private double y;
}

Make Location

클라이언트의 실시간 위치 공유를 위해서는 부가적인 추가가 필요해보였다. 단순히 비교만을 위해 서버에서 Random 함수를 통해 임의의 좌표값을 생성하도록 했다.

/* Math.random() 함수를 통해 임의의 (x,y)좌표를 생성 */
private LocationDto makeRandomLocation() {
     double randomX = Math.random() * 100;
     double randomY = Math.random() * 100;

     return new LocationDto(randomX, randomY);
}

구현 방법 (code)


Polling


  • Polling을 Spring에서 구현하는 방법은 생각보다 간단하다.
    • Polling이란 ‘특정 주기마다 반복적 호출’이 핵심이다.
    • ‘반복’의 기능을 Spring 자체에서 구현하는 것보다는 클라이언트 로직에서 추가하는 것이 올바르다 판단해서 Spring에서는 단순한 API로만 남겨놓았다.

Controller

@GetMapping("/cur")
public ResponseEntity<LocationDto> shareCurLocation() {
     return ResponseEntity.ok(locationService.shareCurLocation());
}

Service

  • 그룹별 위치 정보를 관리하기 위해 Map<Long, List> 를 선언하였다.
  • 로직에 대해서는 사용자는 자신이 속한 groupId를 인자로 전달한다. 해당 그룹이 존재하지 않으면 생성하고 그룹에 랜덤한 위치 정보를 추가한다.
private final Map<Long, List<LocationDto>> locations = new ConcurrentHashMap<>();

public LocationDto shareCurLocation(Long groupId) {
	   List<LocationDto> groupLocations = locations.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>());

     // 새로운 위치 정보 생성 및 추가
     LocationDto newLocation = makeRandomLocation();
     groupLocations.add(newLocation);

     return newLocation;
}

Long Polling


  • Spring에서 Long Polling 기능을 구현하는 방법 중 대표적인 것은 DeferredResult를 이용하는 것이다.
private final Map<Long, BlockingQueue<DeferredResult<LocationDto>>> groupRequests =
            new ConcurrentHashMap<>();
  1. 사용자가 자신의 그룹 id에 해당하는 Queue에 DefferedResult 객체를 넣어 놓는다.
  2. 다른 그룹원이 해당 객체를 큐에서 빼 setResult() 메서드를 통해 정보를 갱신한다.
  3. 그럴 경우 해당 데이터가 큐를 집어넣은 사용자에게 반환된다.

Controller

poll() : long polling API

notifyGroup() : long polling에 있어서 event 발생 API

@GetMapping("/long/{groupId}")
public DeferredResult<LocationDto> poll(@PathVariable final Long groupId) {
    return locationService.longPoll(groupId);
}

@PostMapping("/long/{groupId}/notify")
public void notifyGroup(@PathVariable final Long groupId) {
    locationService.notifyGroup(groupId);
}

Service

public DeferredResult<LocationDto> longPoll(final Long groupId) {
		 // TIMEOUT에 대한 deferredResult를 생성한다. Timeout이 발생하면 error를 반환하도록 한다.
     final DeferredResult<LocationDto> deferredResult = new DeferredResult<>(TIMEOUT);
		 deferredResult.onTimeout(() -> deferredResult.setErrorResult("Request timeout"));
		 
		 // 그룹 id에 대한 큐가 존재하지 않으면 안에 Queue를 생성하고 객체를 저장한다.
     groupRequests
             .computeIfAbsent(groupId, k -> new LinkedBlockingQueue<>())
             .add(deferredResult);

     return deferredResult;
}

public void notifyGroup(final Long groupId) {
		 // 자신의 그룹 id에 대한 Queue를 반환한다.
     final BlockingQueue<DeferredResult<LocationDto>> queue = groupRequests.get(groupId);
     
     // 큐에서 DefferedResult 객체를 빼고 해당 객체에 random을 통해 생성한 객체를 setResult()를 통해 저장
     Optional.ofNullable(queue)
              .ifPresent(
                     q -> {
                         while (!q.isEmpty()) {
                             final DeferredResult<LocationDto> connection = q.poll();
                             if (connection != null) {
                                 connection.setResult(makeRandomLocation());
                             }
                         }
     });
}

부록 : SSE


WebSocket


  • Spring에서 WebSocket을 사용하기 위해서는 몇가지 작업이 필요하다.
    1. WebSocketHandler를 구현
      • afterConnectionEstablished : websocket 연결이 생성되었을 때 수행 로직
      • afterConnectionClosed : websocket 연결이 종료되었을 때 수행 로직
      • handleTextMessage : 메시지 통신 과정의 로직
    2. WebSocketConfigurer interface를 구현한 WebSocketConfig 등록

CustomWebSocketHandler

  • groupId에 대한 WebSocketSession을 관리하기 위해 Map을 사용
    • 사용자가 websocket에 연결되었을 때 자신의 group에 websocketsession을 등록한다.
    • 통신 : 자신의 그룹에 random 좌표 정보를 전달한다.
    • 연결을 종료할 경우 자신의 session 정보를 group에서 삭제한다.
@Component
@Slf4j
public class CustomWebSocketHandler extends TextWebSocketHandler {
    private final ObjectMapper objectMapper;
    private final LocationService locationService;

		// group id에 따른 세션 set
    private final Map<Long, Set<WebSocketSession>> groupSessions =  new ConcurrentHashMap<>();

    public CustomWebSocketHandler(LocationService locationService, ObjectMapper objectMapper) {
        this.locationService = locationService;
        this.objectMapper = objectMapper;
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message)
            throws Exception {
        Long groupId = extractGroupIdFromSession(session);
        LocationDto location = locationService.makeRandomLocation();

        Set<WebSocketSession> set = groupSessions.getOrDefault(groupId,new CopyOnWriteArraySet<>());
        set.forEach(
                s -> {
                    if(!s.isOpen()) {
                        set.remove(s);
                        return;
                    }

                    try {
                        log.info("WEBSOCKET : "+location.getX()+" "+location.getY());
                        s.sendMessage(new TextMessage(objectMapper.writeValueAsString(location)));
                    }catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
        );

    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Long groupId = (long)(Math.random() * 10);
        Set set = groupSessions.getOrDefault(groupId,new CopyOnWriteArraySet<>());
        set.add(session);
        groupSessions.put(groupId,set);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
            throws Exception {
        Long groupId = extractGroupIdFromSession(session);
        if (groupId != null) {
            Set<WebSocketSession> sessions = groupSessions.get(groupId);
            if (sessions != null) {
                sessions.remove(session);
                if (sessions.isEmpty()) {
                    groupSessions.remove(groupId);
                }
            }
        }
    }
		
		// random group id 생성
    private Long extractGroupIdFromSession(WebSocketSession session) {
        Long groupId = (long)(Math.random() * 10);
        return groupId;
    }
}

WebSocketConfig

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    private final CustomWebSocketHandler customWebSocketHandler;

    public WebSocketConfig(CustomWebSocketHandler customWebSocketHandler) {
        this.customWebSocketHandler = customWebSocketHandler;
    }
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(customWebSocketHandler,"/ws").setAllowedOrigins("*");
    }
}

STOMP


STOMP는 기본적으로 내장된 Message Brocker를 제공한다. RabbitMQ를 추가하여 테스트할 수도 있지만 순수한 성능 분석을 위해 내장 message brocker를 사용했다.

  • WebSocketMessageBrokerConfigurer를 구현한 StompConfig 등록
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(final StompEndpointRegistry registry) {
        registry.addEndpoint("/location").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(final MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/pub"); // 그룹에 메시지 전달 경로
        registry.enableSimpleBroker("/sub"); // 그룹 구독 경로
    }
}
  • Stomp controller
    • groupid를 입력받아 해당 group에 위치 정보를 전달한다.
@Controller
@Slf4j
public class StompController {

    private final SimpMessagingTemplate template;
    private final LocationService locationService;

    public StompController(
            final SimpMessagingTemplate template, final LocationService locationService) {
        this.template = template;
        this.locationService = locationService;
    }

    @MessageMapping("/share/{id}")
    public void shareCurLocationByStomp(@DestinationVariable final Long id) {
        log.info("STOMP");
        template.convertAndSend(
                String.format("/sub/location/%d", id), locationService.makeRandomLocation());
    }
}
profile
Young , Wild , Free

0개의 댓글