Netty 구조적 특징 (1) : 쓰레드 모델

주싱·2023년 3월 23일
3

Network Programming

목록 보기
13/21

Thread icons created by smalllikeart - Flaticon

지난 몇 년 동안 네티 프레임워크를 사용하여 우주 지상국 소프트웨어를 개발했습니다. 개발을 진행하면서 저와 동료들이 네티의 특징을 제대로 이해하지 못해 실수 했던 몇몇 경험들이 있는데 그 경험들을 바탕으로 네티의 중요한 구조적 특징들에 대해 나누어 정리해 보려고 합니다.

이번 글에서는 네티의 쓰레드 모델과 관련된 특징에 대해 정리합니다.

테스트 준비

먼저 네티의 동작을 테스트 할 수 있는 간단한 TCP 서버/클라이언트를 구현하고 테스트 컴퓨터에서 Loopback(127.0.0.1) 주소를 통해 직접 통신하도록 합니다. 전체 코드는 GitHub에서 확인할 수 있습니다. 아래 코드는 TCP 연결을 수립한 후, 서버에서 클라이언트로 메시지를 전송하고 클라이언트에서 메시지를 정상적으로 수신하는지 확인합니다.

public class ConnectionTest extends TcpLoopBackTest {

    @Test
    void normalConnection() throws Exception {
        // When : 서버의 메시지 전송
        server.sendAll("status");
        // Then : 클라이언트 메시지 수신
        await().until(() -> Objects.equals(client.read(), "status"));
    }
}

public class TcpLoopBackTest {
    protected TcpServer server;
    protected TcpClient client;

    @BeforeEach
    protected void beforeEach() throws Exception {
        // 비동기 테스트 프레임워크 설정
        Awaitility.setDefaultPollInterval(10, TimeUnit.MILLISECONDS); // 폴링 간격

        // 서버 생성 및 시작
        server = new LineBasedTcpServer();
        server.init();
        server.start(12345).get();

        // 클라이언트 생성 및 연결
        client = new LineBasedTcpClient();
        client.init();
        client.connect("localhost", 12345).get();

        // 서버가 클라이언트와 통신 가능한 상태가 될 때까지 대기
        await().atMost(1000, TimeUnit.MILLISECONDS)
                .until(() -> server.isActive(client.localAddress()));
    }

    @AfterEach
    protected void afterEach() {
        client.destroy();
        server.destroy();
    }
}

하나의 쓰레드로 안전한 입출력

그 동안 (네티의 도움 없이) 네트워크 프로그램을 직접 작성할 때는 채널로부터 메시지 수신을 반복하는 쓰레드와 사용자의 요청을 받아 메시지를 전송하는 쓰레드가 분리되어 있는 경우가 많았습니다. 그래서 만약 송신과 수신에서 공유하는 데이터가 있다면 경합 조건(Race Condition)이 발생하지 않도록 동기화 처리에 신경을 써야 했습니다. 네티를 처음 사용할 때는 네티 역시 그런 줄 알고 공유하는 데이터가 생기면 신경을 곤두세웠는데요. 알고 봤더니 네티는 메시지 수신과 송신이 하나의 쓰레드(EventLoop)에 의해 처리되고 있었습니다. 그래서 기본적으로 채널 파이프라인을 구성하는 핸들러들 사이에 공유하는 데이터가 있더라도 이를 동기화해 줄 필요가 없습니다.

Blocking 피하기

그러나 주의할 점이 하나 있습니다. 네티는 하나의 쓰레드를 사용해 송신과 수신을 함께 처리하기 때문에 어느 한 쪽에서 블락킹(Blocking)이 발생하면 다른 모든 동작이 함께 멈춥니다. 간단히 생각해서 메시지를 전송하는 핸들러에서 메시지를 전송하고 3초간 sleep 하며 블락킹되어 있으면 메시지 수신 역시 3초 간 멈추게 됩니다. 테스트 코드를 작성해서 진짜 그런지 한 번 확인해 보겠습니다. 지연 동작을 테스트하기 위해 클라이언트 채널에 메시지 전송 후 3초간 sleep하는 핸들러를 추가하고 메시지를 전송합니다. 클라이언트의 전송 채널에서 지연이 발생하고 있는 도중에 서버에서 메시지를 전송해 보면 클라이언트의 메시지 수신 역시 약 3초 뒤 처리됨을 확인할 수 있습니다.

@RequiredArgsConstructor
public class OutboundBlockingHandler extends ChannelOutboundHandlerAdapter {
    private final long delayMillis;

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.writeAndFlush(msg, promise);
        Thread.sleep(delayMillis);
    }
}

public class BlockingHandlerTest extends TcpLoopBackTest {
    @Test
    void blockingHandler() throws Exception {
        // Given : 일정 시간 블락킹된 클라이언트 채널
        client.test().pipeline().addLast(new OutboundBlockingHandler(3000));
        client.send("blocking");

        // When : 서버에서 메시지 전송
        server.sendAll("status");

        // Then : 지연 이후 메시지 수신
        await().atLeast(3000, TimeUnit.MILLISECONDS)
                .until(() -> Objects.equals(client.read(), "status"));
    }
}

전용 쓰레드 할당하기

앞에서 설명한 것 처럼 메시지를 처리하는 핸들러가 동기적인 API를 호출하고 블락킹되어 있는 경우 전체 채널 동작이 일시적으로 멈춥니다. 기능적으로 같은 비동기 API가 존재한다면 비동기 API로 대체해야 하지만 그럴 수 없는 상황이라면 해당 핸들러는 전용 쓰레드에 의해 실행되도록 할 수 있습니다. 사용법은 간단히 파이프라인에 핸들러를 추가할 때 전용 이벤트 루프 그룹(EventLoopGroup)을 생성해서 할당할 수 있습니다. 별도의 이벤트 루프 그룹을 지정하지 않고 생성된 모든 핸들러들은 채널의 이벤트루프를 공유해서 사용하게 됩니다. 이번에도 테스트 코드를 작성해서 변경된 동작을 확인해 보겠습니다. 앞선 테스트 코드와 동일하지만 지연이 발생하는 핸들러를 파이프라인에 추가할 때 전용 이벤트 루프 그룹을 할당해 줍니다. 그러면 클라이언트의 전송 채널에서 지연이 발생하고 있지만 서버로부터 메시지를 즉시 수신 가능함을 확인할 수 있습니다.

public class BlockingHandlerTest extends TcpLoopBackTest {
		@Test
    void blockingHandlerWithOwnEventLoop() throws Exception {
        // Given : 일정 시간 블락킹된 클라이언트 채널을 전용 이벤트 루프와 함께 생성
        client.test().pipeline().addLast(new DefaultEventLoopGroup(),
                                         new OutboundBlockingHandler(3000));
        client.send("blocking");

        // When : 서버에서 메시지 전송
        server.sendAll("status");

        // Then : 즉시 메시지 수신
        await().atMost(100, TimeUnit.MILLISECONDS)
                .until(() -> Objects.equals(client.read(), "status"));
    }
}

공유되는 핸들러

앞에서 네티는 송신과 수신을 하나의 쓰레드로 처리하기 때문에 멀티쓰레드 환경의 경합 조건(Race Condition)을 걱정하지 않아도 된다고 설명했습니다. 그러나 예외적으로 하나의 핸들러 인스턴스를 하나 또는 그 이상의 채널에 여러 번 추가하며 멀티쓰레드 환경에 실행되게 할 수 있습니다. 이런 경우에는 핸들러 클래스에 @Sharable 어노테이션이 붙여주어야 합니다. 네티는 @Sharable 어노테이션이 붙은 핸들러는 Thread-Safe 하다고 가정하고 두 번 이상 채널 파이프라인에 추가하여 공유 될 수 있도록 허용합니다. 반면에 @Sharable 어노테이션이 붙지 않은 핸들러는 채널 파이프라인에 두 번 이상 추가하려는 경우 예외가 발생하고 등록이 거부됩니다. 여기서 주의할 점은 @Sharable 어노테이션이 Thread-Safe 함을 만들어 주는 장치가 아니란 겁니다. @Sharable 어노테이션은 단지 네티 프레임워크와 개발자에게 이 핸들러가 Thread-Safe 하며 공유되어 사용될 수 있다는 정보를 주는 역할만 하고 실제 Thread-Safe 함을 보장하는 일은 개발자의 몫입니다. 따라서 @Sharable 어노테이션을 붙이며 공유하려는 핸들러는 상태를 가지지 않거나 또는 경합 조건이 발생하지 않도록 주의해야 합니다. 테스트 코드를 작성해 보면 @Sharable 어노테이션이 붙은 Thread-Safe 하지 않은 핸들러를 두개의 채널에 추가할 수 있고 심지어 평소에(?) 정상 동작하는 것을 확인할 수 있습니다. 그러나 언제든 해당 핸들러의 동작은 깨질 수 있는 위험 천만한 핸들러입니다. 또다른 테스트에서는 @Sharable 어노테이션을 붙이지 않은 경우에는 파이프라인 두 번 이상 추가 시 예외가 발생하는 것을 확인 할 수 있습니다. 전체 코드는 GitHub에서 확인할 수 있습니다.

@Sharable
public class SharableThreadUnsafeCountingHandler extends ChannelOutboundHandlerAdapter {
    private int count;

    public int getCount() {
        return count;
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg,
                      ChannelPromise promise) throws Exception {
        count++;
        super.write(ctx, msg, promise);
    }
}

public class UnsharableCountingHandler extends ChannelOutboundHandlerAdapter {
    private final AtomicInteger count = new AtomicInteger(0);

    public int getCount() {
        return count.get();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        count.incrementAndGet();
        super.write(ctx, msg, promise);
    }
}

@Slf4j
public class SharableHandlerTest {
		... 

		@Test
    void unsafeSharableHandlerTwice() throws InterruptedException {
        // When: 쓰레드 안전하지 않음 하나의 핸들러 인스턴스를 두 번 추가하고 메시지를 전송한다.
        SharableThreadUnsafeCountingHandler handler = new SharableThreadUnsafeCountingHandler();
        client1.test().pipeline().addLast("new", handler);
        client2.test().pipeline().addLast("reuse", handler);
        client1.send("message");
        client2.send("message");

        // Then: 당장 아무 문제 없는 것 처럼 실행되지만 언제든 데이터의 정확성이 깨질 수 있다.
        ChannelHandler newHandler =  client1.test().pipeline().toMap().get("new");
        ChannelHandler reuseHandler = client2.test().pipeline().toMap().get("reuse");
        assertSame(newHandler, reuseHandler);
        await().until(() -> handler.getCount() == 2); // 상황에 따라서 결과가 1이 될 수 있다.
    }

    @Test
    void UnsharableHandlerTwice() throws InterruptedException {
        UnsharableCountingHandler handler = new UnsharableCountingHandler();
        assertThrows(ChannelPipelineException.class, () -> {
            client1.test().pipeline().addLast(handler);
            client2.test().pipeline().addLast(handler);
        });
    }
}

마치며

정리해 보면 네티는 송신과 수신 핸들러들이 하나의 쓰레드에 의해 처리되도록 함으로 멀티쓰레드 환경에서의 문제를 제거해 줍니다. 그러나 이로 인해 하나의 핸들러에서 블락킹이 발생하면 모든 송수신 동작이 함께 멈춘다는 사실도 살펴 보았고 이를 해결하기 위해서는 별도의 전용 쓰레드를 블락킹이 발생하는 핸들러에 할당할 수 있음도 알아 보았습니다. 그리고 예외적이지만 @Sharable 어노테이션이 붙은 핸들러는 하나 이상의 채널에 두 번 이상 추가되어 멀티쓰레드 환경에서 실행될 수도 있는데 이 때 쓰레드 안정성은 개발자 스스로 보장해 주어야 함도 살펴보았습니다. 멀티쓰레드와 관련된 문제는 보통 매우 까다롭기 때문에 이런 특징을 잘 이해하고 네티를 사용해야 겠습니다.

profile
소프트웨어 엔지니어, 일상

0개의 댓글