유연한 메시지 전송 구조로 리팩토링하기

joosing·2021년 12월 26일
0

Messaging

목록 보기
3/3

특정 시스템을 제어하기 위한 메시징 서버를 개발중입니다. 시험 중 특정 조건이 되면 타겟 시스템에서 일부 메시지를 처리하지 못하는 문제가 발생했는데요. 문제를 해결하기 위해 메시징 서버의 구조를 유연하게 개선한 사례를 소개합니다.

기존 시스템 구조

문제가 발생한 기존 시스템은 아래와 같이 메시지 요청과 전송 두 계층으로 간략화 할 수 있습니다.

  • 요청 계층에는 다양한 비지니스 요구사항을 구현한 N개의 쓰레드가 존재하고 서로 비동기적으로 전송 계층으로 메시지 전송을 요청합니다.
  • 전송 계층은 요청된 메시지가 Netty 채널 파이프라인의 처리를 거쳐 최종적으로 전송되도록 합니다.

문제 현상

테스트 중 상황에 따라 요청 계층에서 전송 계층으로 연속적인 N개의 메시지 전송 요청이 발생하였는데 이때 중간 중간 타겟 시스템에서 메시지가 처리되지 않는 문제가 발생하였습니다.

원인 분석

네트워크 상에서 패킷을 분석해 보니 메시징 서버에서 타겟 시스템으로 전달되는 메시지는 손실이나 변형 없이 정상 전달되고 있었고, 타겟 시스템 역시 수신한 메시지에 대해 정상 응답하고 있었습니다. 그래서 다음과 같은 문제 가설을 세우고 테스트를 진행했습니다.

타겟 시스템은 수신하는 메시지와 메시지 사이의 간격이 매우 짧으면(거의 연속적으로) 메시지를 처리하지 못한다.

가설 검증을 위해 테스트 코드를 작성했습니다. 우선 문제 상황을 재현하기 위해 메시지와 메시지 사이에 간격 없이 연속적으로 메시지를 전송해 봅니다. 역시 네트워크 상에서 명령 및 응답 메시지 모두 손실이나 변형은 발생하지 않고 타겟 시스템에서 동일한 문제가 발생합니다.

while(..) {
	send(data);
}

이제는 장비로 전달되는 메시지 사이 간격을 100msec로 설정해 봅니다. 이번에는 문제가 발생하지 않습니다.

while(..) {
	send(data);
	delay(100); // 100 msec 간격을 두면 정상 처리
}

문제 상황에서 명령-응답 메시지는 통신 스펙에 맞게 잘 전달되고 있음으로 타겟 시스템이 문제가 있는 것으로 결론을 내립니다. 연동되는 시스템에서 발생한 문제였지만, 해당 시스템은 해외 제조사에서 기성품으로 납품된 장비라 수정이 쉽지 않았고, 수정하더라도 오랜 기간이 걸리는 상황이었습니다. 이런 경우에는 우리쪽에서 문제를 회피해 나갈 수 있도록 임시 조치를 해야합니다. 번거롭고 조금 억울해도 어쩔 수 없는 일입니다.

유연한 메시지 전송 구조 설계

기존 시스템을 수정하기 위해 살펴보니 전송 타이밍을 제어할 수 있는 부분이 전혀 없는 실시간 시스템 구조를 가지고 있습니다. 실시간 시스템 구조의 특징은 다음과 같습니다.

  • 요청은 곧 전송을 의미함으로 실시간성이 높습니다.
  • 요청자가 메시지를 전송하는 시간 타이밍을 주도합니다.
  • 시스템 구성 후 메시지 전송에 대한 요구가 변경되거나, 시스템에 제약사항이 생길 경우 유연하게 대처할 수 없습니다.

이제 전송하는 메시지의 타이밍을 제어할 수 있도록 메시지큐를 요청 계층과 전송 계층 사이에 두고 스케줄러가 메시지와 메시지 사이의 간격을 조정해 주도록 다음과 같이 설계를 변경합니다.

위와 같은 구조를 가지면 다음과 같은 특징을 가지게 됩니다.

  • 전송 요청과는 독립적으로 메시지 전송 주기 및 전송 패턴을 유연하게 조정할 수 있습니다.
  • 메시지 전송의 타이밍은 전송 레이어에서 전적으로 주도합니다.
  • 요구사항의 변화, 시스템의 제약사항 유연하게 대처할 수 있습니다.
  • 반면에 메시지 전송의 실시간성이 떨어집니다.

코드 개선하기

실제 구현에는 Java에서 기본적으로 제공하는 BlockingQueue, SingleThreadExecutor, CountDownLatch 3가지 라이브러리 조합을 사용하였습니다.

BlockingQueue

먼저 요청 계층과 전송 계층 사이에 위치할 큐를 생성합니다. 큐는 멀티스레드 환경에서 동작함으로 BlockingQueue를 사용합니다.

private final BlockingQueue<SendData<SEND>> sendQueue = new LinkedBlockingQueue<>();

Producer

기존에 직접 메시지 전송을 시도하던 send() 메쏘드 대신에 Queue에 메시지를 삽입해 주는 requestToSend() 메쏘드를 정의합니다.

public boolean requestToSend(String message) {
    return sendQueue.offer(message);
}

Consumer

큐에서 메시지를 꺼내어 처리할 Consumer 쓰레드를 SingleThreadExecutor를 사용해 생성하고 쓰레드에 종료 이벤트를 전달하기 위한 Event 객체를 CountDownLatch 를 사용해 생성합니다.

private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final CountDownLatch cancelEvent = new CountDownLatch(1);

Queue에서 메시지를 꺼내어 기존에 구현된 send() 메쏘드를 호출할 스케줄러 쓰레드 코드를 작성합니다. 쓰레드 루프 조건문에서 cancelEvent 발생을 100msec 대기하게 되는데 이를 통해 자연스럽게 메시지간 100msec 간격이 생기게 됩니다. 물론 중간에 cancel 요청이 들어오면 쓰레드가 종료되게 됩니다.

private final long interMessageDelay = 100;
private final long queuePollTimeout = 1000;
...

public void init() {
    future = executor.submit(() -> {
        while (!cancelEvent.await(100, TimeUnit.MILLISECONDS)) {
            var data = sendQueue.poll(queuePollTimeout, TimeUnit.MILLISECONDS);
            if (data != null) {
                send(data);
            }
        }
        return null;
    });
}

Task End

서비스 Shutdown 시, 안전하게 태스크가 종료될 수 있는 end() 메쏘드까지 구현해 주면 모든 구현이 완료됩니다.

public void end() throws ExecutionException, InterruptedException, TimeoutException {
    long cancelCompleteTimeout = queuePollTimeout + interMessageDelay + 100;

    cancelEvent.countDown(); // 종료 신호 
    future.get(cancelCompleteTimeout, TimeUnit.MILLISECONDS); // 실제 종료 대기
}

마치며

기존의 레거시 메시지 전송 구조는 단순하고 실시간성이 높지만 여러가지 변화에 유연하게 대처할 수 없었습니다. 우리는 단순하게 큐와 스케줄러를 요청 계층과 전송 계층 사이에 추가함으로서 보다 유연한 메시지 전송 계층을 구현할 수 있었습니다.

profile
System Software Developer

0개의 댓글