Netty 프레임워크 내부 살펴보기 : initChannel, connect 🔎

주싱·2022년 1월 10일
3

Netty

목록 보기
3/9

Netty 프레임워크를 사용하여 TCP 클라이언트 프로그램을 개발하며 쉽게 이해되지 않는 문제를 만났습니다. 문제 해결을 위해 프레임워크 내부 코드를 살펴보며 문제에 대해 분명하게 이해하고 또 문제를 해결할 수 있었습니다.

문제 살펴보기

저는 TCP 서버 역할을 하는 장비와 통신하는 클라이언트 프로그램을 개발하고 있습니다. 이 때 장비가 꺼졌다가 다시 켜지더라도 자동으로 항상 연결되도록 하기 위해 주기적으로 연결을 재시도하고 있습니다. 지금까지는 (실제 장비를 연결할 수 없어서)장비를 시뮬레이션하는 서버를 항상 올리고 로컬에서 테스트 해온 덕분에(?) 연결이 실패하는 케이스를 만나지 못했습니다. 오늘은 시뮬레이터를 끄고 연결 실패 케이스를 테스트하게 되었는데요. 아래와 같은 오류가 연결에 실패할 때마다 반복해서 발생하였습니다.

io.netty.channel.ChannelPipelineException: xxx.handler.StatusUpdateHandler is not a @Sharable handler, so can't be added or removed multiple times.

(전체 예외 메시지는 아래와 같습니다.)

io.netty.channel.ChannelPipelineException: xxx.handler.StatusUpdateHandler is not a @Sharable handler, so can't be added or removed multiple times.
	at io.netty.channel.DefaultChannelPipeline.checkMultiplicity(DefaultChannelPipeline.java:600) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:202) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	... 
	at io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.ChannelInitializer.handlerAdded(ChannelInitializer.java:112) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:938) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.DefaultChannelPipeline.access$100(DefaultChannelPipeline.java:46) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute(DefaultChannelPipeline.java:1463) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers(DefaultChannelPipeline.java:1115) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded(DefaultChannelPipeline.java:650) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:502) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:417) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:474) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.55.Final.jar:4.1.55.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.55.Final.jar:4.1.55.Final]
	at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]

예외 메시지 살펴보기

콜스택을 보니 ChannelInitializer.initChannel() 호출 시 문제가 발생한 것을 확인할 수 있었습니다. 그리고 StatusUpdateHandler 라는 핸들러가 @Sharable 속성을 가진 핸들러가 아니라서 파이프라인에 여러번 추가하거나 삭제할 수 없다고 합니다.

io.netty.channel.ChannelPipelineException: xxx.handler.StatusUpdateHandler is not a @Sharable handler, so can't be added or removed multiple times.
  ...
  at io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129) ~[netty-transport-4.1.55.Final.jar:4.1.55.Final]
  ...

문제 코드 살펴보기

제가 개발한 문제 코드를 살펴 보면 아래와 같이 채널 파이프라인에 핸들러들(메시지를 처리하는)을 추가해 주는 단순한 코드입니다. 그리고 예외를 발생시킨 StatusUpdateHandler는 다른 핸들러들과는 다르게 미리 생성된 객체를 파이프라인에 추가해 주고 있습니다.

public class BheXdcPipelineInitializer extends ChannelInitializer<SocketChannel> {
    private StatusUpdateHandler<BheImageDownConverterStatus> statusUpdateHandler;
    private ChannelResponseNotifier<BheXdcRxMessage> responseNotifier;
    private ChannelExceptionHandler exceptionHandler;
    private BheXdcRxDecoderMapper rxDecoderMapper;

    public void init(int camId, LastStatus<BheImageDownConverterStatus> lastStatusShared, ChannelResponseListener<BheXdcRxMessage> listener) {
        statusUpdateHandler = new StatusUpdateHandler<>(lastStatusShared, new BheImageDownConverterStatus());
        responseNotifier = new ChannelResponseNotifier<>(BheXdcRxMessage.class, listener);
        exceptionHandler = new ChannelExceptionHandler(camId);
        rxDecoderMapper = new BheXdcRxDecoderMapper();
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        String objectName = "X-DC CAM";

        ch.pipeline()
                // Inbound
                .addLast(new DelimiterBasedFrameDecoder(BheXdcChannelSpec.getMaxLengthOfMessage(), true, true, BheXdcChannelSpec.getRxLineDelimiterBuf()))
                .addLast(new StringDecoder())
                .addLast(new StringSeparator(BheXdcChannelSpec.getAllSeparatorRegExp()))
                .addLast(new StringArrayDecoder<>(rxDecoderMapper, BheXdcChannelSpec.getIndexOfMessageId()))
                .addLast(new BheXdcInboundMessageVerifier()) //TODO
                .addLast(statusUpdateHandler)
                .addLast(responseNotifier)

                // Outbound
                .addLast(new StringEncoder())
                .addLast(new PacketEncoder(new BheXdcPacketEncodeUtil(BheXdcChannelSpec.getTxLineDelimiterBuf())))
                .addLast(new BheXdcOutboundMessageVerifier()) //TODO

                // Inbound (Error Handler)
                .addLast(exceptionHandler);
    }
}

원인 분석

여기까지만 보면 왜 문제가 발생했는지 잘 모르겠습니다. 일단 @Sharable 속성이 무엇을 의미하는지 JavaDoc 문서를 찾아보았습니다. 읽어보면 @Sharable 어노테이션은 Netty와 코드를 읽는 개발자에게 이 핸들러는 Race Condition 없이, 하나 또는 다수의 파이프라인에 여러번 추가할 수 있다는 정보를 제공하는 역할을 한다고 합니다. 어노테이션이 붙었다고 해서 핸들러가 Thread-safe 함을 Netty는 보장하지 않으며 이를 지킬 책임은 개발자에게 있으니 주의해야 합니다.

Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipelines multiple times without a race condition.
If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables. This annotation is provided for documentation purpose, just like the JCIP annotations.

Netty 내부 동작 구조를 모를 당시에는 위 설명을 기준으로도 제 코드가 문제가 없다고 생각했습니다. 왜냐하면 저는 하나의 채널로 하나의 파이프라인을 생성한 뒤에 동일한 서버에 연결만 반복 시도하고 있었기 때문입니다. 같은 서버에 연결을 재시도 하고 있으니까 채널도 파이프라인도 하나이고 Race Condition이 발생하는 두 개 이상의 채널을 만들지 않는다고 직관적으로 생각했습니다. 답이 안나와서 Netty 내부 코드를 따라가 보았습니다. BootStrap.connect() 함수를 따라가다 보니 연결 시작부분에 아래와 같은 코드가 있습니다. 그렇군요. 연결을 시도할 때 마다 매번 새로운 채널, 새로운 파이프 라인을 생성해 주네요.

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
		...
} 

그리고 그 다음 라인 init(channel) 코드를 따라가보니 채널 파이프라인에 config.handler()을 등록해 줍니다. config.handler()는 Bootstrap 초기화 시 등록한 현재 문제가 되고 있는 ChannelInitializer 입니다.

void init(Channel channel) {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());

    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}

그리고 이후에 채널이 EventLoop에 등록될 때 ChannelInitializer에 정의된 channelRegistered() 이벤트 함수가 호출되고, channelRegistered() 내에서 문제가 된 initChannel() 함수를 호출되게 됩니다.

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
    // the handler.
    if (initChannel(ctx)) {
        // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
        // miss an event.
        ctx.pipeline().fireChannelRegistered();

        // We are done with init the Channel, removing all the state for the Channel now.
        removeState(ctx);
    } else {
        // Called initChannel(...) before which is the expected behavior, so just forward the event.
        ctx.fireChannelRegistered();
    }
}

이제 답이 나왔네요. Netty는 하나의 Bootstrap으로 연결을 반복 시도하더라도, 연결 시도할 때 마다 새로운 채널, 새로운 파이프라인을 만들고 새롭게 초기화 합니다. 그래서 ChannelInitializer.initChannel() 함수가 반복적으로 호출되었습니다. 다시 제 코드로 돌아가보면 ChannelInitializer 객체 초기화 시에 생성한 statusUpdateHandler를 InitChannel()에서 반복 사용하게 됨으로 Netty 설계상에 명시한 '@Sharable 어노테이션 하지 않은 Handler를 파이프라인에 두 번 이상 추가할 수 없다'는 제약을 벗어납니다. 동일한 연결 객체(Bootstramp)에 동일한 연결 대상이긴 하지만 매 번 새로 생성되는 서로 다른 파이프라인에 Handler가 여러번 추가되고 있네요.

그리고 Netty에서 핸들러가 여러번 등록되는지를 어떻게 체크하는지가 궁금해서 ch.pipeline().addLast() 코드도 한 번 따라가 봅니다. 조금 따라가다 보면 예외 콜 스택에 등장한 checkMultiplicity() 함수가 등장합니다. checkMultiplicity() 함수 내에서는 handler.add 플래그를 통해 핸들러가 이미 등록된적 있는지 확인하여 있다면 제가 만났던 예외를 던져주고, 처음 등록이라면 handler.add 플래그를 설정합니다. 궁금한 부분이 해결되었습니다.

/* DefaultChannelPipeline.java */ 

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
		... 
    checkMultiplicity(handler);
} 

private static void checkMultiplicity(ChannelHandler handler) {
  if (handler instanceof ChannelHandlerAdapter) {
      ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
      if (!h.isSharable() && h.added) {
          throw new ChannelPipelineException(
                  h.getClass().getName() +
                  " is not a @Sharable handler, so can't be added or removed multiple times.");
      }
      h.added = true;
  }
}

문제 해결하기

해결방법은 두 가지 입니다. statusUpdateHandler와 같이 한 번 생성하고 반복해서 초기화에 사용되는 핸들러에 @Sharable 어노테이션을 붙여주거나 파이프라인 초기화 시 매번 new 해서 핸들러는 생성해주는 것입니다. 저는 파이프라인 초기화 시 매번 핸들러를 생성해 주도록 했습니다. 약간 비효율적이란 생각이 들기는 하는데 프레임워크에서 채널도 파이프라인도 매번 만들어 주고 있고 그 하위에 있는 핸들러도 새로 만들어 주는 것도 괜찮은 것 같습니다. 특별한 성능 이슈는 현재 없습니다.

public class BheXdcPipelineInitializer extends ChannelInitializer<SocketChannel> {
    private int camId;
    private LastStatus<BheImageDownConverterStatus> lastStatusShared;
    private ChannelResponseListener<BheXdcRxMessage> responseListener;

    public void init(int camId, LastStatus<BheImageDownConverterStatus> lastStatusShared, ChannelResponseListener<BheXdcRxMessage> listener) {
        this.camId = camId;
        this.lastStatusShared = lastStatusShared;
        this.responseListener = listener;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        String objectName = "X-DC CAM";
        BheXdcRxDecoderMapper rxDecoderMapper = new BheXdcRxDecoderMapper();

        ch.pipeline()
                // Inbound
                .addLast(new DelimiterBasedFrameDecoder(BheXdcChannelSpec.getMaxLengthOfMessage(), true, true, BheXdcChannelSpec.getRxLineDelimiterBuf()))
                .addLast(new StringDecoder())
                .addLast(new StringSeparator(BheXdcChannelSpec.getAllSeparatorRegExp()))
                .addLast(new StringArrayDecoder<>(rxDecoderMapper, BheXdcChannelSpec.getIndexOfMessageId()))
                .addLast(new BheXdcInboundMessageVerifier()) // TODO
                .addLast(new StatusUpdateHandler<>(lastStatusShared, new BheImageDownConverterStatus()))
                .addLast(new ChannelResponseNotifier<>(BheXdcRxMessage.class, responseListener))

                // Outbound
                .addLast(new StringEncoder())
                .addLast(new PacketEncoder(new BheXdcPacketEncodeUtil(BheXdcChannelSpec.getTxLineDelimiterBuf())))
                .addLast(new BheXdcOutboundMessageVerifier()) // TODO

                // Inbound (Error Handler)
                .addLast(new ChannelExceptionHandler(camId));
    }
}

마치며

처음에 이해되지 않던 것들이 코드를 보며 분명하게 이해가 되어 좋았던 것 같습니다. 좋은 프로젝트를 만들어 공개해준 이희승님과 여러 메인테이너 커미터 분들께 감사한 마음이 듭니다.

profile
소프트웨어 엔지니어, 일상

2개의 댓글

comment-user-thumbnail
2022년 4월 14일

도움 많이 되었습니다. 감사합니다.

1개의 답글