Netty | Blocking 발생하는 Handler 영향 확인하고 Executor(EventLoop) 분리하기

주싱·2022년 2월 1일
0

Netty

목록 보기
5/9

Netty 프레임워크에서 Blocking 발생하는 Handler 영향을 확인하고 Executor(EventLoop) 분리를 통해 문제를 해결합니다. GitHub

  • 간단한 TCP 서버와 클라이언트를 구성하고 연결합니다.
  • Blocking이 발생(의도적으로 3초 Sleep)하는 OutboundHandler 핸들러를 Pipeline에 추가합니다.
  • 클라이언트에서 Outbound로 메시지를 전송 시도합니다.
  • 서버에서 클라이언트로 10개의 메시지를 전송합니다.
  • 클라이언트에서는 Outbound에서 발생한 Blocking 기간 동안 서버가 보낸 메시지를 수신할 수 없습니다.
  • Blocking이 발생하는 Handler에 독립적인 Executor(EventLoop)를 할당합니다.
  • 특정 Handler에서의 Blocking과 상관없이 채널에서 발생하는 이벤트들이 정상 처리됨을 확인합니다.
@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
@DisplayName("Blocking 발생하는 Handler 의 영향과 해결")
public class BlockingHandlerTest {
    // 서버
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    NioEventLoopGroup serverAcceptGroup = new NioEventLoopGroup();
    NioEventLoopGroup serverServiceGroup = new NioEventLoopGroup();
    ConcurrentHashMap<SocketAddress, Channel> activeServerChannelMap = new ConcurrentHashMap<>();

    // 클라이언트
    Bootstrap clientBootstrap = new Bootstrap();
    NioEventLoopGroup clientEventLoopGroup = new NioEventLoopGroup();
    BlockingQueue<String> clientResponseQueue = new LinkedBlockingQueue<>();
    Channel clientChannel;

    // 테스트 데이터
    String fixedResponse = "RESP";

    @BeforeEach
    @SneakyThrows
    public void beforeEach() {
        serverSetupAndStart();
        clientSetupAndConnect();
        waitForServerServiceActive();
    }

    private void waitForServerServiceActive() throws InterruptedException {
        Thread.sleep(100);
    }

    @AfterEach
    @SneakyThrows
    public void afterEach() {
        clientEventLoopGroup.shutdownGracefully();
        serverServiceGroup.shutdownGracefully();
        serverAcceptGroup.shutdownGracefully();
    }

    @SneakyThrows
    private void serverSetupAndStart() {
        serverBootstrap.group(serverAcceptGroup, serverServiceGroup)
                .channel(NioServerSocketChannel.class)
                .localAddress("0.0.0.0", 12345)
                .option(ChannelOption.SO_REUSEADDR, true)
                .childHandler(new ChannelInitializer<>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline()
                                // Inbound
                                .addLast(new ActiveServerChannelUpdater(activeServerChannelMap))
                                .addLast(new FixedLengthFrameDecoder(4))
                                .addLast(new StringDecoder())
                                .addLast(new ServerResponseHandler(fixedResponse))

                                // Outbound
                                .addLast(new StringEncoder())

                                // Duplex
                                .addLast(new Logger("Server", true));
                    }
                });
        serverBootstrap.bind().sync();
    }

    @SneakyThrows
    private void clientSetupAndConnect() {
        clientBootstrap.group(clientEventLoopGroup)
                .channel(NioSocketChannel.class)
                .remoteAddress("127.0.0.1", 12345)
                .handler(new ChannelInitializer<>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline()
                                // Inbound
                                .addLast(new FixedLengthFrameDecoder(4))
                                .addLast(new StringDecoder())
                                .addLast(new ReceiveDataUpdater(clientResponseQueue))

                                // Outbound
                                .addLast(new StringEncoder())

                                // Duplex
                                .addLast(new Logger("Client", true));
                    }
                });
        clientChannel = clientBootstrap.connect().sync().channel();
    }

    @Test
    @SneakyThrows
    @DisplayName("Blocking 발생하는 Handler 에 의해 전체 채널 지연")
    void blockingSideEffectTest() {
        // Given : Blocking 동작을 가진 Handler 추가
        clientChannel.pipeline().addLast(new OutboundDelayHandler(3000));

        // When : 클라이언트에서 메시지 1개 전송(Blocking), 서버에서 10개 메시지 전송
        clientChannel.writeAndFlush("ABCD");

        Channel serverServiceChannel = activeServerChannelMap.get(clientChannel.localAddress());
        for (int i = 0; i < 10; i++) {
            serverServiceChannel.writeAndFlush(String.format("RES%d", i));
        }

        // Then : 클라이언트에서 10개 메시지 수신
        for (int i = 0; i < 10; i++) {
            String response = clientResponseQueue.poll(100, TimeUnit.MILLISECONDS);
            Assertions.assertNull(response);
        }
    }

    @Test
    @SneakyThrows
    @DisplayName("Blocking 발생하는 Handler 독립적 쓰레드로 분리")
    void TakeAwayBlockingSideEffectTest() {
        // Given : Blocking 동작을 가진 Handler 독립적인 쓰레드로 처리
        clientChannel.pipeline().addLast(new DefaultEventLoopGroup(), new OutboundDelayHandler(3000));

        // When : 클라이언트에서 메시지 1개 전송(Blocking), 서버에서 10개 메시지 전송
        clientChannel.writeAndFlush("ABCD");

        Channel serverServiceChannel = activeServerChannelMap.get(clientChannel.localAddress());
        for (int i = 0; i < 10; i++) {
            serverServiceChannel.writeAndFlush(String.format("RES%d", i));
        }

        // Then : 클라이언트에서 10개 메시지 수신
        for (int i = 0; i < 10; i++) {
            String response = clientResponseQueue.poll(100, TimeUnit.MILLISECONDS);
            Assertions.assertEquals(String.format("RES%d", i), response);
        }
    }
}
profile
소프트웨어 엔지니어, 일상

0개의 댓글