Netty를 사용해 네트워크 프로그래밍을 하며 메시지 전송과 관련된 문제를 만났습니다. 문제 해결을 위해 Netty 코드를 뒤지며 메시지 처리 구조를 분석한 결과를 정리해 봅니다. 아래 다이어그램은 사용자가 Channel을 통해 메시지 전송을 요청했을 때 Netty 내부 처리 구조를 도식화 한 것입니다. 처리되는 순서대로 설명을 기술합니다.
사용자가 Channel을 통해 메시지 전송을 요청한다.
Channel channel = ...
channel.writeAndFlush(message);
Channel은 ChannelPipeline으로 메시지를 전달한다. ChannelPipeline은 기본적으로 TailContext와 HeadContext를 가진다. Pipeline의 시작과 끝이라 할 수 있다. Tail과 Head 사이에는 사용자가 등록한 ChannelHandlerContext가 체인 구조로 연결되고, 전달된 메시지가 체인을 따라 Outbound 방향으로 흘러간다.
public abstract class AbstractChannel ... {
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
}
public class DefaultChannelPipeline ... {
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
}
ChannelPipeline은 메시지가 각각의 Handler를 거칠 때 마다 Handler에게 바인딩된 EventExecutor 쓰레드와 현재 메시지 전송을 요청하고 있는 쓰레드가 동일한지 체크한다. 만약 서로 다른 쓰레드라면(Handler의 EventExecutor가 아니라면) 메시지를 Queue에 삽입하고 그대로 실행을 반환한다. Queue에 쌓인 메시지는 이후에 EventExecutor에 의해 비동기적으로 처리되게 된다. 당연하지만 Pipeline의 첫 ChannelHandlerContext에서는 항상 요청 쓰레드와 EventExecutor가 다르게 되고 메시지가 Queue에 쌓이게 된다.
abstract class AbstractChannelHandlerContext ... {
private void write(Object msg, boolean flush, ChannelPromise promise) {
...
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
...
}
}
만약 사용자가 별도의 EventExecutor를 설정하지 않았다면(기본 설정) 모든 Handler는 Channel의 EventLoop 쓰레드를 공유해서 사용하게 된다. 그러으로 Pipeline의 Tail 외에는 Queue에 메시지가 버퍼링되는 일은 일어나지 않는다. 반면에 사용자가 특정 Handler의 EventExecutor를 설정해 주었다면, Executor가 달라지는 Handler에서는 Queue에 메시지가 버퍼링 된 후 서로 다른 EventExecutor에 의해 메시지가 비동기적으로 처리되게 된다. 아래 빨간 부분이 공용 Channel EventLoop 쓰레드에 의해 메시지가 처리될 때의 모델이고, 초록색 부분이 사용자가 개별적으로 EventExecutor를 설정했을 때의 실행 모델이다.
abstract class AbstractChannelHandlerContext ... {
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
}
Pipeline을 통과한 메시지는 다시 Channel로 전달된다.
Netty Channel은 내부적으로 NIO 채널을 통해 네트워크로 메시지를 전송한다.
public class NioSocketChannel ... {
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
...
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
...
}
}