메시지 큐를 활용한, 유연한 전송 구조 만들기

주싱·2023년 3월 2일
6

더 나은 코드

목록 보기
11/14
post-custom-banner

특정 하드웨어 시스템을 제어하기 위해 Netty 기반 메시징 서버를 개발중입니다. 시험 중 특정 조건이 되면 타겟 시스템에서 일부 메시지를 처리하지 못하는 문제가 발생했는데요. 문제를 해결하기 위해 메시징 서버의 구조를 개선한 사례를 소개합니다.

기존 시스템 구조

문제가 발생한 서버 시스템은 아래와 같이 메시지 요청과 전송이라는 두 계층으로 추상화해서 표현할 수 있습니다.

  • 요청 계층에는 다양한 비지니스 요구사항을 구현한 N개의 모듈이 존재하고 필요에 따라 전송 계층으로 메시지 전송을 요청합니다.
  • 전송 계층은 요청된 메시지가 Netty 채널 파이프라인의 처리를 거쳐 최종적으로 타겟으로 전송되도록 합니다.

문제 현상

테스트 중 상황에 따라 N개의 메시지 전송 요청이 연속적으로 발생하였는데 이때 타겟에서 메시지를 처리하지 못하는 문제가 발생하였습니다.

원인 분석

네트워크 상에서 패킷을 캡처해 분석해 보면 메시징 서버에서 타겟 시스템으로 전달되는 메시지는 모두 정상적으로 전달되고 있습니다. 따라서 메시지를 연속적으로 전송하면 서버에서 처리하지 못하는 버그가 있다는 가설을 세워 볼 수 있습니다. 메시지를 의도적으로 연속해서 전송하는 테스트 코드를 실행해 문제가 재현되는지 보고, 반대로 메시지 사이 사이에 일정 간격을 두면 문제가 제거되는지 확인하면 가설을 검증할 수 있을 것 같습니다. 아래와 같은 테스트 코드를 작성해서 테스트 해보면 메시지를 연속으로 전송하면 서버의 메시지 처리 문제가 재현됩니다.

// 문제 재현 
while(..) {
	send(data);
}

// 지연 시, 문제 제거
while(..) {
	send(data);
	delay(100); 
}

우회하기

문제 상황에서 전송 메시지는 통신 스펙에 맞게 잘 전달되고 있음으로 타겟 시스템의 명령 처리에 문제가 있는 것으로 결론을 내릴 수 있습니다. 그러나 타겟 시스템은 해외 제조사에서 기성품으로 제공하고 있어 즉시 수정할 수 있는 상황이 아니었습니다. 어쩔 수 없이 우리쪽에서 문제를 회피할 수 있도록 조치를 취해야 했습니다. 그래서 우리 쪽 전송 계층에서 메시지와 메시지 사이에 간격을 줄 수 있도록 코드를 수정하기로 합니다.

유연한 메시지 전송 구조 설계

기존의 전송 계층을 살펴보면 Netty의 I/O 쓰레드(EventLoop)에 의해 처리되고 있습니다. Netty의 I/O 쓰레드에서는 Blocking이 발생하면 모든 I/O 동작이 멈추기 때문에 메시지 전송 타이밍 조절에 필요한 지연 동작을 처리할 만한 곳이 없습니다. 일종의 실시간 전송 구조를 가진다고 할 수 있는데 다음과 같은 특징을 가집니다.

  • 요청은 곧 전송을 의미함으로 실시간성이 높습니다.
  • 요청자가 메시지를 전송하는 시간 타이밍을 주도합니다.
  • 시스템 구성 후 메시지 전송에 대한 요구가 변경되거나, (지금과 같은) 시스템 제약사항이 생길 경우 유연하게 대처할 수 없습니다.

이제 전송하는 메시지의 타이밍을 제어할 수 있도록 메시지큐를 요청 계층과 전송 계층 사이에 두고 별도의 스케줄러 쓰레드가 메시지와 메시지 사이의 간격을 조정해 주도록 다음과 같이 설계를 변경합니다.

위와 같이 요청-전송 계층 간 메시지 큐를 두고 분리된 스케줄링 쓰레드에서 전송 메시지를 처리하게 하면 다음과 같은 특징을 가지게 됩니다.

  • 전송 요청과는 독립적으로 메시지 전송 주기 및 전송 패턴을 유연하게 조정할 수 있습니다.
  • 메시지 전송의 타이밍은 전송 계층에서 전적으로 주도합니다.
  • 요구사항의 변화, 시스템의 제약사항 유연하게 대처할 수 있습니다.
  • 반면에 메시지 전송의 실시간성이 떨어지지만 도메인에서 용인되는 수준인 경우 문제가 되지 않습니다.

코드로 구현하기

구현에는 Java에서 기본적으로 제공하는 BlockingQueue, ScheduledExecutorService를 사용합니다.

메시지큐

먼저 요청 계층과 전송 계층 사이에 위치할 큐를 생성합니다. 큐는 멀티스레드 환경에서 동작함으로 BlockingQueue를 사용합니다.

private final BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();

요청 인터페이스

기존에 요청 계층에서 직접 메시지 전송을 시도하던 send() 메서드 대신에 Queue에 메시지를 삽입하며 전송을 요청하는 requestToSend 메서드를 정의합니다.

public boolean requestToSend(String message) {
    return sendQueue.offer(message);
}

전송 스케줄러

이제 큐에서 메시지를 꺼내어 처리할 스케줄러 쓰레드를 생성합니다. 그리고 100 msec 마다 한 번씩 메시지큐에서 메시지를 꺼내어 메시지를 전송해 주도록 합니다.

executor = Executors.newSingleThreadScheduledExecutor(); 
future = executor.scheduleWithFixedDelay(() -> {
    String command = sendQueue.poll();
    if (command != null) {
        send(command);
    }
}, 0, 100, TimeUnit.MILLISECONDS);

마치며

기존의 메시지 전송 구조는 단순하고 실시간성이 높지만 여러가지 변화에 유연하게 대처할 수 없었습니다. 우리가 가진 문제와 제약을 해결하기 위해서 메시지큐와 스케줄러를 요청 계층과 전송 계층 사이에 추가함으로서 보다 유연한 메시지 전송 계층을 구현하고 문제를 해결할 수 있었습니다. 이후에 알고 보니 Netty I/O 쓰레드에서 Blocking이 발생하는 동작을 위한 별도의 쓰레드를 할당할 수 있었습니다. 그리고 내부적으로 메시지큐로 이미 분리되어 있음도 알게되었습니다. 안타깝지만 위 구현은 나중에 Netty의 서비스를 활용하도록 모두 제거되었습니다. 그래도 유익한 경험이었습니다. 감사합니다.

profile
소프트웨어 엔지니어, 일상
post-custom-banner

2개의 댓글

comment-user-thumbnail
2023년 3월 2일

재밌게 보았습니다

1개의 답글