[Netty 공부기록] 이벤트 모델

식빵·2022년 5월 8일
2

Study Memo

목록 보기
3/5
post-thumbnail

내 노트북에 있던 개인 필기내용(Netty 공부기록(2022-05-08))을 여기에 게시한다.
네트워크 소녀 netty 저서 내용을 읽고 내 방식대로 정리해 본 것이다.
참고로 여기에 작성된 코드들은 네트워크 소녀 netty 저서에서 제공되는 코드를 썼다.
소스는 https://github.com/krisjey/netty.book.kor 에서 누구나 볼 수 있다.

📌 개요

내가 생각하기에는 오늘날(현재 2022년) 이벤트 루프 기반 프레임워크하면 자바스크립트 진영의 Node.js 가 가장 인기가 좋은 거 같다.

자바 진영에도 이런 프레임워크가 있는데, 지금 배우고 있는 Netty 가 바로 그렇다.

Netty 는 현재 Spring 에서 계속 밀어주는 Spring WebFlux 프레임워크의 기본 설정으로 사용되고 있다(변경 가능하다).

Netty 의 영향력이 계속 커지는 중이라고 나는 생각한다.

그런데 같은 이벤트 루프 기반 프레임워크인 Node.jsNetty 에는 내부적으로 많은 차이점이 있다.

그중에서도 "이벤트 루프를 싱글 쓰레드(Node.js)로 운영하는지, 아니면 멀티쓰레드(Netty)로 운영하는지"에 대한 차이점을 알아보고, 네티가 사용하는 멀티쓰레드 이벤트 루프 방식에 대해 알아보자.


📌 이벤트 루프

이벤트 루프는 이벤트를 실행하기 위한 무한루프 스레드를 의미한다.

이벤트 루프가 이벤트를 처리하는 방식은 위 그림에도 나와있지만, 말로 풀자면 아래와 같다.

  1. 어떤 객체가 이벤트를 발생시킴
  2. 해당 이벤트가 이벤트 큐(Queue)에 등록됨
  3. 이벤트루프가 큐에 입력된 이벤트를 꺼내서 실행
    (참고: 실행 결과 반환 방식에 따라 콜백/퓨처패턴으로 나뉨)
  4. 만약 큐가 비어있다면, 이벤트가 들어올 때가지 대기

이때 이벤트 루프를 하나의 스레드로만 운영하면 단일(싱글) 스레드 이벤트 루프이고, 여러 스레드로 운영하면 다중(멀티) 스레드 이벤트 루프이다.




📌 싱글 스레드와 멀티 스레드 이벤트 루프

둘 간의 장단점을 보면서 차이점을 알아보자.


👏 단일 스레드 이벤트 루프

하나의 이벤트 루프만이 이벤트 큐에 있는 작업을 처리하는 방식이다.

이벤트 큐에 들어온 순서(= 이벤트가 발생한 순서)대로 이벤트를 처리하기 때문에 어떻게 동작할지 쉽게 알 수 있다.
하지만 멀티 코어 CPU를 효율적으로 사용하지 못한다.
그리고 오래 걸리는 작업이 하나라도 걸리면 뒤에 있는 이벤트에 대한 처리속도가 떨어지게 된다.

대표적인 구현체로는 Node.js 가 있다.


👏 멀티 스레드 이벤트 루프

여러 개의 이벤트 루프가 하나의 이벤트 큐에 동시에 접근하여 이벤트에 대한 처리를 하는 방식이다.

이러면 CPU를 효율적으로 사용할 수 있다. 다만 이벤트 루프들이 하나의 이벤트 큐에 접근함으로
단일 스레드 이벤트 루프처럼 큐의 작업이 순서대로 수행되지 않는 경우가 발생한다.
다르게 말하면 동작이 예측하기 어렵다는 뜻이다.

그리고 멀티 스레드에서 항상 생각하는 공유 자원에 대한 데이터 레이스(data race) 에 대한 처리를 해야한다.
해본 사람은 알겠지만, 이게 쉽지 않다.

마지막으로 멀티 스레드에 의한 컨텍스트 스위칭 비용도 무시할 수 없다.
멀티 스레드에서는 무작정 스레드를 늘린다고 효율이 좋아지는게 아니다.
그래서 적절한 스레드의 개수 제한을 주는 게 일반적이다.




📌 네티의 이벤트 루프

그런데 네티를 사용하면 멀티 스레드이면서도 이벤트가 발생한 순서대로 이벤트를 처리하는 작업이 가능하다.

그게 가능한 이유를 알아보기 전에 먼저 일반적인 멀티 스레드 이벤트 루프의 구조를 아래 그림을 통해서 보자,

그림을 보면 알겠지만, 하나의 객체가 발생시킨 이벤트(event-01 ~ event-04)에 대한 처리가 이곳 저곳에서 순서없이 처리되고 있다.
그래서 event-02 가 엄청 빨리 수행되고, event-01이 조금 느리게 완료되면 일의 처리 순서가
event-02, event-01, event-03, event-04 처럼 될 수 있다.

이제 일반적인 멀티 스레드 이벤트 루프 구조의 문제점을 알아봤으니, 이를 해결한 네티의 방식을 알아보자.

네티는 아래 3가지 특징이 있다는 것을 일단 알자.

  • 네티의 이벤트는 채널에서 발생한다.
  • 이벤트 루프 객체는 이벤트 큐를 가지고 있다.
  • 네티의 채널은 하나의 이벤트 루프에 등록된다.

이런 특징을 이용해서 네티의 이벤트 루프 구조는 아래와 그림과 같다.

네티의 각 채널은 하나의 이벤트 루프에 등록되며, 채널에서 발생한 이벤트는 해당 이벤트 루프가 갖고 있는 이벤트 큐에 등록된다.

이러므로서 채널에서 발생한 이벤트는 항상 "동일한 이벤트 루프 스레드"에서 처리하게 되고,
결과적으로 인해서 이벤트 발생 순서와 처리 순서가 일치하게 된다.

일반적인 멀티 스레드 이벤트 루프 구조에서는 근본적인 문제가 하나의 이벤트 큐를 다수의 이벤트 루프가 공유한다는 점이였다.
그 핵심 문제를 네티는 현명하게 대처한 것이다.


네티는 실제로 이벤트를 처리하기 위하여 SingleThreadEventExecutor 클래스를 사용한다.
가볍게만 보자.

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
                                                                   SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

    // ... 생략 ...

    private final Queue<Runnable> taskQueue; // 이벤트가 저장될 큐

    /**
     * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
     * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
     * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
     * implementation that does not support blocking operations at all.
     */
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    }

    /**
     * @see Queue#poll()
     */
    protected Runnable pollTask() {
        assert inEventLoop();
        return pollTaskFrom(taskQueue);
    }

    protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        for (;;) {
            Runnable task = taskQueue.poll();
            if (task != WAKEUP_TASK) {
                return task;
            }
        }
    }
    
    

    /**
     * Runs all tasks from the passed {@code taskQueue}.
     *
     * @param taskQueue To poll and execute all tasks.
     *
     * @return {@code true} if at least one task was executed.
     */
    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
        Runnable task = pollTaskFrom(taskQueue);
        if (task == null) {
            return false;
        }
        for (;;) {
            safeExecute(task); // 실행!!!
            task = pollTaskFrom(taskQueue);
            if (task == null) {
                return true;
            }
        }
    }
}

이벤트가 저장되는 collection으로 큐를 사용하는 걸 알 수 있다.
그리고 "이벤트"라는 것을 "태스크(task)"로 표현한다는 것도 알 수 있다.

또한 실제 큐의 구현체로는 LinkedBlockingQueue 를 사용하는 것이 확인된다.
BlockingQueue 이므로 thread-safe 가 보장된다.

LinkedBlockingQueue 인스턴스 메소드 poll() 만 봐도 thread-safe 하게 동작하도록 구현된 것을 확인할 수 있다.

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        final E x;
        final int c;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() == 0)
                return null;
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }




📌 네티의 비동기 IO 처리

이벤트 루프를 통해서 이벤트가 순차적으로 처리된 것을 확인했다.
그렇다면 처리된 이벤트에 대한 결과는 어떻게 반환받고 처리할 수 있을까?

이것을 위해서 네티는 두 가지 패턴을 제공하는데, 하나는 이전에 본 이벤트 핸들러이고, 다른 하나는 퓨처 패턴이다.

퓨처 패턴은 간단하게 말해서 클라이언트가 어떤 메서드를 호출하면, 해당 메서드는 클라이언트의 요구를 위한 연산 처리를 다른 스레드에게
위임하고, 바로 어떤 값을 반환한다. 이때 반환하는 것이 퓨처 객체인데, 이 객체를 통해서 실제 연산에 대한 결과를 받을 수 있다.
실제 연산이 완료되기 전까지 다른 작업을 하다가 퓨처 객체에게 실제 연산결과를 받을 수 있다.

더 깊게 알고 싶다면 퓨처 패턴은 검색해보자. 이 이상의 깊은 내용은 다루지 않겠다.


그런데 이전에 작성했던 간단한 Netty 애플리케이션에서도 이런 네티의 퓨처를 사용한 적이 있다.
잠시 아래 코드를 보자.

public class ServerBootstrapExample {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap boot = new ServerBootstrap();
            boot.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new EchoServerV3FirstHandler());
                        }
                    });

            ChannelFuture f = boot.bind(8888).sync(); // 여기!!!!!
            f.channel().closeFuture().sync();		  // 여기!!!!!

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

그렇다. 우리는 이미 퓨처 패턴을 사용하고 있었다.
위 코드에서 딱 2줄에서 사용중인데, 이 2줄을 이해하기 편하도록 좀 더 풀어쓰면 아래와 같다.


ChannelFuture bindFuture = boot.bind(8888);
// 8888번 포트로 바인드하는 "비동기 bind 메서드"를 호출하였다. 
// bind 메서드는 바인딩이 완료되기 전에 ChannelFuture 를 반환한다.

bindFuture.sync();
// ChannelFuture 인터페이스의 sync() 메소드는 주어진 ChannelFuture 객체의 작업이 완료될 때까지
// "블로킹"하는 메서드다. 즉 bind 메서드의 처리가 완전히 끝나기 전까지 이 다음은 실행되지 않는다.

Channel serverSocketChannel = bindFuture.channel();
// 8888번 포트에 바인딩된 서버 소켓 채널을 가져온다.

ChannelFuture closeFuture = serverSocketChannel.closeFuture();
// 바인드가 완료된 서버 소켓 채널의 CloseFuture 객체를 가져오고, 
// 이 객체는 네티 내부에서 채널이 생성될 때 같이 생성된다. 
// 하나의 채널을 통해서 closeFuture() 메소드를 여러번 호출해도 항상 동일한 CloseFuture 객체를 반환 받는다는 것이다.

closeFuture.sync();
// CloseFuture 의 완료시점은 바로 채널이 close 가 완료된 시점이다. 
// 그리고 앞에서 말했지만, sync() 는 ChannelFuture 객체의 작업이 완료될 때까지 "블로킹"하는 메서드다.
// 그러므로 close 가 되기 전까지 이 다음 코드들은 실행이 안되게 막힌다.

네티의 ChannelFuture 는 비동기 IO 를 위한 것이고, 이 객체를 통해서 작업의 완료 유무를 확인할 수 있다는 점을 잊지말자.

그런데 완료 유무를 확인 하는 건 좋은데, ChannelFuture 의 처리가 완료된 시점에 곧바로 특정 작업이 수행되길 원할 수 있다.

이럴 때는 ChannelFuture 의 addListener 메서드를 통해서 리스너를 등록하는 방식이 있다.

아래 코드를 보자.


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ChannelFuture channelFuture = ctx.writeAndFlush(msg);
        channelFuture.addListener(ChannelFutureListener.CLOSE); /// 여기에 집중!
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ctx.writeAndFlush(msg); 비동기 메서드를 통해서 ChannelFuture 를 반환받는다.
소켓 버퍼에 데이터를 넣고, 전송까지 하는 작업이 완료되면 이때 등록한 ChannelFutureListener.Close 가 동작한다.


ChannelFutureListener.Close 코드는 아래와 같다.

/**
 * Listens to the result of a {@link ChannelFuture}.  The result of the
 * asynchronous {@link Channel} I/O operation is notified once this listener
 * is added by calling {@link ChannelFuture#addListener(GenericFutureListener)}.
 *
 * <h3>Return the control to the caller quickly</h3>
 *
 * {@link #operationComplete(Future)} is directly called by an I/O
 * thread.  Therefore, performing a time consuming task or a blocking operation
 * in the handler method can cause an unexpected pause during I/O.  If you need
 * to perform a blocking operation on I/O completion, try to execute the
 * operation in a different thread using a thread pool.
 */
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {


    /**
     * A {@link ChannelFutureListener} that closes the {@link Channel} which is
     * associated with the specified {@link ChannelFuture}.
     */
    ChannelFutureListener CLOSE = new ChannelFutureListener() { // 현재 쓰고 있는 건 이거다!
        @Override
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };
    
    /**
     * A {@link ChannelFutureListener} that closes the {@link Channel} when the
     * operation ended up with a failure or cancellation rather than a success.
     */
    ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().close();
            }
        }
    };

    /**
     * A {@link ChannelFutureListener} that forwards the {@link Throwable} of the {@link ChannelFuture} into the
     * {@link ChannelPipeline}. This mimics the old behavior of Netty 3.
     */
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().pipeline().fireExceptionCaught(future.cause());
            }
        }
    };
}

ChannelFutureListener의 익명 클래스이고 GenericFutureListener 클래스의 operationComplete 메소드를 오버라이드하고 있다.

메소드 이름만 봐도 알겠지만, ChannelFuture 가 내부적으로 처리하고 있는 연산이 "완료"되고 나서 호출되는 메소드라는 것을 알 수 있다.

아무튼 메서드의 내용은 ChannelFuture 를 생성시킨 채널을 close() 하는 것을 확인할 수 있다.

이미 알겠지만, 직접 리스너를 익명 클래스로 생성하여 addListener 메소드의 인자로 제공할 수 있다.


ChannelFuture channelFuture = ctx.writeAndFlush(msg);
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
        future.channel().close();
    }
});
// 람다식으로도 표현 가능하다!
// channelFuture.addListener((ChannelFutureListener) future 
// 								-> future.channel().close());
profile
백엔드를 계속 배우고 있는 개발자입니다 😊

2개의 댓글

comment-user-thumbnail
2022년 12월 28일

안녕하세요. 혹시 netty서적 어디서 구하셨는지 알수있을까요??

1개의 답글