[Netty 공부기록] 채널 파이프라인

식빵·2022년 5월 7일
0

Study Memo

목록 보기
2/5
post-thumbnail

🍀 채널 파이프라인과 코덱

여기에 작성된 코드들은 네트워크 소녀 netty 책의 소스를 기반으로 작성된 것이다.
해당 소스는 https://github.com/krisjey/netty.book.kor 에서 찾아볼 수 있다.


📌 채널 파이프라인? 이벤트 핸들러? 코덱?

채너 파이프란?

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel.

채널의 인바운드, 아웃바운드 최종 연산 이전에 수행하는 중간 연산(intercept)을 해주는 ChannelHandler 들의 모음(list)이다.


채널에서 발생한 이벤트에 대한 이벤트에 대한 처리를 해줄 ChannelHandler 들의 리스트이다.

좀 더 풀어서, 그리고 추상적으로 표현하자면 아래와 같다.

"채널 파이프라인"은 채널에서 발생한 이벤트가 이동하는 통로이며,
이동 하는 이벤트를 처리하는 클래스를 "이벤트 핸들러"라고 한다.
이때 이벤트 핸들러를 상속받아서 구현한 구현체들을 "코덱"이라고 한다.

  • 채널 파이프라인 : ChannelHandler list, 이벤트가 이동하는 통로
  • 이벤트 핸들러: 이벤트를 처리하는 클래스
  • 코덱: 이벤트 핸들러를 상속받아서 구현한 구현체들

자주 사용되는 이벤트 핸들러에 대해서는 미리 구현해둔 코덱 묶음이 있다. io.netty.handler.codec 패키지를 살펴보면 알 수 있다.




📌 이벤트 실행

네티 이벤트 핸들러의 "이벤트 메서드"는 데이터 수신과 같은 "이벤트"가 발생하면 자동으로 호출된다.
네티의 이러한 "자동 호출"은 잘 생각해보면 이점이 있다.

예를들어, 만약 네트를 쓰지 않고 소켓 프로그래밍을 직접했다면 이러한 수신/송신/에러 등에 대한 함수를 코드에 "직접 작성(=직접 호출)"해야 하고, 이로 인해서 코드가 매우 복잡해지고, 중복이 난무했을 것이다. 이외에도 구글링을 하면 많은 장점을 제시해줄 것이다.


이런 이벤트의 "자동실행"을 위해서 네티에서는 데이터 수신/송신/에러 등을 모두 "이벤트"로 추상화시킨다.
그리고 이에 대한 처리 또한 채널 파이프라인과 이벤트 핸들러라는 개념으로 추상화시킨다.
이를 위해서는 아래와 같은 네티 설정(bootstrap)을 할 것이다.

  1. 부트스트랩으로 네트워크 애플리케이션에 필요한 설정 지정
  2. 부트스트랩에서 이벤트 핸들러를 사용하여 채널 파이프라인 구성
  3. 이벤트 핸들러의 데이터 수신 이벤트 메서드에서 데이터를 읽는다.
  4. 이벤트 핸들러의 네트워크 끊김 이벤트 메서드에서 에러 처리를 한다.

위처럼 작성하면 네티의 "이벤트 루프"(다음 장에서 배움)가 소켓 채널에서 발생한 이벤트에 해당하는 이벤트 메서드를 자동으로 실행한다.

소켓 채널에 데이터가 수신되었을 때 네티가 이벤트 메서드를 실행하는 방법은 다음과 같다.

  1. 네티의 이벤트 루프가 채널 파이프라인에 등록된 첫 번째 이벤트 핸들러를 가져온다.
  2. 이벤트 핸들러에 데이터 수신 이벤트 메서드가 구현되어 있으면 실행한다.
  3. 데이터 수신 이벤트 메서드가 구현되어 있지 않으면 다음 이벤트 핸들러를 가져온다.
  4. 2를 수행한다(반복)
  5. 채널 파이프라인에 등록된 마지막 이벤트 핸들러에 도달할 때까지 1을 반복한다.

📌 채널 파이프라인

채널 파이프라인은 네티의 채널과 이벤트 핸들러 사이의 연결 통로 역할이다.

  • 채널이 전기를 생산하는 발전소이고,
  • 채널 파이프라인이 전기를 보내주는 전선이고,
  • 하나의 전선으로 들어오는 전기가 멀티탭으로 분리되듯, 가전제품 여러대가 하나의 전선으로 부터 전기를 받아서 기동된다.
    바로 이 가전제품들이 이벤트 핸들러들과 같다고 생각하면 편하다.

잠시 코드를 보자.

public class ServerBootstrapExample {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap boot = new ServerBootstrap();
            boot.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 1.
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception { //2
                            ChannelPipeline pipeline = ch.pipeline(); // 3
                            pipeline.addLast(new EchoServerHandler()); // 4
                        }
                    });

            ChannelFuture f = boot.bind(8888).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

childHandler 메서드를 통해서 서버 소켓 채널이 생성한 (클라이언트와 통신하게 되는) 소켓 채널이 상요할 채널 파이프라인을 설정하게 된다.

내부에 보면 initChannel 메소드의 파라미털로 SocketChannel, 그리고 그 파라미터의 pipeline 메소드를 통해서 파이프라인 참조값을 가져온다.

이것은 하나의 채널이 하나의 파이프라인을 갖는다는 의미와 같다.

그리고 이 파이프라인에 addLast 를 통해서 실제 이벤트 핸들러를 등록하게 된다.

정리하자면 아래와 같다.

  1. 서버 소켓 채널이 클라이언트에게서 접속 요청을 받는다.
  2. 서버 소켓 채널은 클라이언트 연결에 대응하는 소켓 채널 객체를 생성하고, 빈 채널 파이프라인 객체를 생성하여 소켓 채널에 할당한다.
  3. 앞서 작성한 ChannelInitializer.initChannel(SocketChannel ch) 를 호출한다.
  4. 호출하면 1에서 생성했던 파이프라인 객체를 얻어오게 되고, 사용자가 작성한 이벤트 핸들러 객체가 등록된다.




📌 이벤트 핸들러

네티에서는 비동기 호출을 지원하는 두 가지 패턴이 제공된다.

  1. Future 패턴
  2. 리액터 패턴의 구현체인 이벤트 핸들러

그리고 이런 이벤트 핸들러를 위해서 이벤트를 인터페이스로 정의하고 ,
이런 인터페이스를 상속받은 이벤트 핸들러를 작성하여 채널 파이프라인에 등록한다.
그리고 채널 파이프라인으로 입력되는 이벤트를 이벤트 루프가 가로채어 이벤트에 해당하는 메서드를 수행하는 구조다.

이때 네티가 제공하는 이벤트의 종류나 발생 시기를 모르면 네티 사용에 큰 장애물이 된다. 지금부터 알아보자.


1. 채널 인바운드 이벤트

먼저 인바운드 이벤트부터 알아보자.

인바운드 이벤트는 소켓 채널에서 발생한 이벤트 중에서 상대방의 어떤 동작을 취했을 때 발생한다.

채널 활성화, 데이터 수신 드으이 이벤트가 그러하다.

네티는 인바운드 이벤트를 위한 ChennelInBoundHandler 인터페이스를 제공한다.

package io.netty.channel;

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

참고:

여기서 종종 나오는 ChannelHandlerContext에 대한 정의는 아래와 같다.

Enables a ChannelHandler to interact with its ChannelPipeline and other handlers. Among other things a handler can notify the next ChannelHandler in the ChannelPipeline as well as modify the ChannelPipeline it belongs to dynamically.

예를 들어보겠다.

클라이언트가 서버에 접속하여 "hello"라는 문자열 데이터를 전송하고 연결을 끊었다고 가정해보겠다.

그러면 아래와 같은 순서로 일이 진행된다. 그리고 그와 매칭되는 이벤트를 옆에 적어봤다.

  1. 서버 소켓 채널이 소켓 채널을 이벤트 루프에 등록한다.(이건 추후에 알아갈 것이다) ==> channelRegistered 이벤트 발생
  2. 소켓 채널 활성화 ==> channelActive 이벤트 발생
  3. 데이터 수신 ==> channelRead 이벤트 발생
  4. 데이터 수신 완료 ==> channelReadComplete 이벤트 발생
  5. 채널 비활성화 ==> channelActive 이벤트 발생
  6. 이벤트 루프에서 채널 제거 ==> channelUnregistered 이벤트 발생

위에서 작성한 이벤트를 간단하게 정리하면 아래와 같다.

  • channelRegistered 이벤트
    • 그냥 간단하게 채널이 생성되었을 때 발생하는 것이라 생각하면 된다.
    • 만약 서버라면 "서버 소켓 채널"과 "클라이언트 소켓 채널" 생성, 즉 서로 다른 2가지 위치에서 이벤트가 발생하고,
      클라이언트라면 "소켓 채널"을 생성하는 곳에서 이벤트가 발생한다.
  • channelActive 이벤트
    • channelRegistered 이벤트 이후에 연달아 발생하는 이벤트이며
    • 채널이 생성 및 이벤트 루프 등록된 후 네티 API를 통해서 채널 입출력이 가능한 상태를 알려주는 이벤트이다.
    • 서버 및 클라이언트 모두 상대방에 연결한 직후 딱 1번만 수행되는 이벤트이다.
  • channelRead 이벤트

    • 데이터가 수신되었음을 알리는 이벤트이다.

    • 수신된 데이터는 네티의 ByteBuf 객체에 저장되며 이벤트 메서드의 두번째 인자인 Object msg 를 통해서 접근 가능하다.

    • package me.dailycode;
      
      import io.netty.buffer.ByteBuf;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.channel.ChannelInboundHandlerAdapter;
      
      import java.nio.charset.StandardCharsets;
      
      public class EchoServerV1Handler extends ChannelInboundHandlerAdapter {
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              ByteBuf readMessage = (ByteBuf) msg; // 수신 데이터를 msg 에서 접근
              System.out.println("channelRead : " + readMessage.toString(StandardCharsets.UTF_8));
              ctx.writeAndFlush(msg);
          }
      
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              cause.printStackTrace();
              ctx.close();
          }
      }
    • 참고로 네티 내부에서는 모든 데이터가 ByteBuf로 관리된다.

  • channelReadComplete 이벤트

    • 데이터 수신이 "완료"되었음을 알리는 이벤트이다.

    • channelRead 와 헷갈리지만, 분명히 서로 다르다.

      예를 클라이언트 쪽에서 서버로 'A', 'B', 'C' 라는 문자 데이터를 순차적으로 전송했다고 쳐보자. 이때는 서버에서는 channelRead 이벤트가 발생한다. 그 이후에 더 이상 받을 데이터가 없을 때 channelReadComplete 이벤트가 발생한다.

    • 즉 채널에 데이터가 있을 때는 channelRead, 더 이상 데이터가 없을 때는 channelReadComplete 이벤트가 터지는 것이다.

    • package me.dailycode;
      
      import io.netty.buffer.ByteBuf;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.channel.ChannelInboundHandlerAdapter;
      
      import java.nio.charset.StandardCharsets;
      
      public class EchoServerV2Handler extends ChannelInboundHandlerAdapter {
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              ByteBuf readMessage = (ByteBuf) msg;
              System.out.println("channelRead : " + readMessage.toString(StandardCharsets.UTF_8));
              ctx.write(msg);
          }
      
          @Override
          public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
              System.out.println("channelReadComplete 발생");
              ctx.flush();
          }
      
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              cause.printStackTrace();
              ctx.close();
          }
      }
    • 위의 코드는 방금 본 EchoServerV1Handler 와는 조금 다르다. 여기서는 ctx.writeAndFlush 대신 writeflush를 따로 호출한다.

  • channelInActive 이벤트
    • 채널 비활성화를 알리는 이벤트이며, 이후에 채널에 대한 입출력 작업은 못한다.
  • channelUnregistered 이벤트
    • 채널이 이벤트 루프에서 제거되었을 때 발생한다.




2. 아웃바운드 이벤트

네티 사용자(프로그램)이 요청한 동작에 대한 이벤트 들이며, 연결요청/데이터 전송/소켓 닫기 등이 이에 해당한다.

인바운드와 마찬가지로 이벤트 핸들러 인터페이스를 제공한다.

package io.netty.channel;

import java.net.SocketAddress;

/**
 * {@link ChannelHandler} which will get notified for IO-outbound-operations.
 */
public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * Called once a bind operation is made.
     *
     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
     * @param localAddress  the {@link SocketAddress} to which it should bound
     * @param promise       the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception    thrown if an error occurs
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a connect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
     * @param remoteAddress     the {@link SocketAddress} to which it should connect
     * @param localAddress      the {@link SocketAddress} which is used as source on connect
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a disconnect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error occurs
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

각 이벤트가 어떤 의미인지 알아보자.

  • bind 이벤트
    • 클라이언트 연결을 대기하는 IP 포트가 설정되었을 때 발생한다.
    • 서버 소켓이 사용중인 정보를 SocketAddress 파라미터로 제공한다.
  • connect 이벤트
    • 클라이언트 소켓 채널이 서버에 연결되었을 때 발생한다.
    • 두 가지 SocketAddress 정보를 제공하며, 하나는 원격이며 다른 하나는 로컬용이다.
  • disconnect 이벤트
    • 클라이언트 소켓 채널의 연결이 끊어졌을 때 발생한다.
  • close 이벤트
    • 소켓 채널이 닫혔을 때 발생한다.
  • write 이벤트
    • 소켓 채널에 데이터가 기록되었을 때 발생한다.
    • 소켓 채널에 기록된 데이터 버퍼가 파라미터로 제공된다.
  • flush 이벤트
    • flush 메소드가 호출되었을 때 발생하는 이벤트이다.



3. 주의 사항

여러개의 이벤트 핸들러를 등록할 때 같은 이벤트 메소드를 연속적으로 호출하기 위해서는 반드시 ctx.fireChannelRead(msg)를 호출해야 한다.

아래와 같이 작성해야만 한다는 뜻이다.

package me.dailycode;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class ServerBootstrapExample {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap boot = new ServerBootstrap();
            boot.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new EchoServerV3FirstHandler());
                            pipeline.addLast(new EchoServerV3SecondHandler());
                        }
                    });

            ChannelFuture f = boot.bind(8888).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}
package me.dailycode;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.StandardCharsets;

public class EchoServerV3FirstHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf readMessage = (ByteBuf) msg;
        System.out.println("channelRead First!: " + readMessage.toString(StandardCharsets.UTF_8));
        ctx.write(msg);
        ctx.fireChannelRead(msg); /// 꼭해야 한다!!!
    }
}
package me.dailycode;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.StandardCharsets;

public class EchoServerV3SecondHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf readMessage = (ByteBuf) msg;
        System.out.println("channelRead Second!: " + readMessage.toString(StandardCharsets.UTF_8));
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ChannelHandlerContext 인터페이스의 fire 접두사가 붙는 메소드가 이벤트를 발생시키는 메서드라는 점을 잘 알아두자.




📌 코덱

보통 동영상 압축 알고리즘을 코덱이라고 부른다.

원본 영상(RAW 파일)을 입력으로 넣고, 압축 알고리즘을 MPEG 를 사용하면 MPEG 파일이 생성되는 것이다.

그런데 이렇게 압축한 걸 다시 원본으로 만들려면 압축 해제를 해야 한다.

결국 영상은 압축과 해제라는 두 단계, 즉 압축할 때 사용되는 인코더와 압축을 해제할 때 사용되는 디코더로 나뉜다.

이렇듯 원본을 압축하는 것을 "인코딩"이라 하고 압축해제하여 원본을 얻어내는 것을 "디코딩"이라 한다.

이 과정을 네티의 소켓 채널로 바꿔 말하면

  • 인코더는 인바운드
  • 디코더는 아웃바운드

이다.


너무 뜬금 없는가? 아래처럼 표현하면 이해가 될 것이다.

  • 네티에서 인코더는 전송할 데이터를 전송 프로토콜에 맞추어 변환 작업을 수행하고
  • 디코더는 그와 반대의 작업을 수행한다.

이제 코덱의 구조를 살펴보자.




📌 코덱의 구조

네티에서는 인바운드와 아웃바운드와 관련된 이벤트 핸들러를 각각 인코더, 디코더라고도 부른다.

여기서 알아야할 것은 네티의 인코더/디코더 모두 애플리케이션 내부의 데이터를 변환한다는 것이다.
보낼 때는 인코더로 내부 데이터를 패킷으로 변환하고, 받을 때는 패킷을 애플리케이션에서 쓰는 데이터로 변환해야 한다.


👏 코덱의 실행 과정

네티 코덱은 템플릿 메서드 패턴을 구현했다.
템플릿 메서드 패턴은 상위 구현체에서 메서드의 실행 순서만을 지정하고,
수행할 메서드의 구현은 하위 구현체에게 위임하는 패턴이다.
netty 에서 제공하는 Base64Encoder 를 보면 이를 알 수 있다.

일단 MEssageToMessaageEncoder 에서 encode 라는 메소드를 추상 메소드로 선언하고,

Base64Encoder 가 encode 메소드를 구현한다.

package io.netty.handler.codec;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.TypeParameterMatcher;

import java.util.List;

/**
 * {@link ChannelOutboundHandlerAdapter} which encodes from one message to an other message
 *
 * For example here is an implementation which decodes an {@link Integer} to an {@link String}.
 *
 * <pre>
 *     public class IntegerToStringEncoder extends
 *             {@link MessageToMessageEncoder}&lt;{@link Integer}&gt; {
 *
 *         {@code @Override}
 *         public void encode({@link ChannelHandlerContext} ctx, {@link Integer} message, List&lt;Object&gt; out)
 *                 throws {@link Exception} {
 *             out.add(message.toString());
 *         }
 *     }
 * </pre>
 *
 * Be aware that you need to call {@link ReferenceCounted#retain()} on messages that are just passed through if they
 * are of type {@link ReferenceCounted}. This is needed as the {@link MessageToMessageEncoder} will call
 * {@link ReferenceCounted#release()} on encoded messages.
 */
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {

    // ... 생략!
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    encode(ctx, cast, out); /// 여기서 실제 구현된 메소드가 사용될 예정이다!!
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (out.isEmpty()) {
                    throw new EncoderException(
                        StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
            if (out != null) {
                try {
                    final int sizeMinusOne = out.size() - 1;
                    if (sizeMinusOne == 0) {
                        ctx.write(out.getUnsafe(0), promise);
                    } else if (sizeMinusOne > 0) {
                        // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                        // See https://github.com/netty/netty/issues/2525
                        if (promise == ctx.voidPromise()) {
                            writeVoidPromise(ctx, out);
                        } else {
                            writePromiseCombiner(ctx, out, promise);
                        }
                    }
                } finally {
                    out.recycle();
                }
            }
        }
    }
    
    // ... 생략!

    /**
     * Encode from one message to an other. This method will be called for each written message that can be handled
     * by this encoder.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to
     * @param msg           the message to encode to an other one
     * @param out           the {@link List} into which the encoded msg should be added
     *                      needs to do some kind of aggregation
     * @throws Exception    is thrown if an error occurs
     */
    protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

package io.netty.handler.codec.base64;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.internal.ObjectUtil;

import java.util.List;

/**
 * Encodes a {@link ByteBuf} into a Base64-encoded {@link ByteBuf}.
 * A typical setup for TCP/IP would be:
 * <pre>
 * {@link ChannelPipeline} pipeline = ...;
 *
 * // Decoders
 * pipeline.addLast("frameDecoder", new {@link DelimiterBasedFrameDecoder}(80, {@link Delimiters#nulDelimiter()}));
 * pipeline.addLast("base64Decoder", new {@link Base64Decoder}());
 *
 * // Encoder
 * pipeline.addLast("base64Encoder", new {@link Base64Encoder}());
 * </pre>
 */
@Sharable
public class Base64Encoder extends MessageToMessageEncoder<ByteBuf> {

    // ... 생략!

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        out.add(Base64.encode(msg, msg.readerIndex(), msg.readableBytes(), breakLines, dialect));
    }
}




📌 기본 제공 코덱

네티에서 기본으로 제공되는 코덱(인코더/디코더)에 대해서 알아보자.

참고로 아래 있는 코덱들은 네티 4.0 기준이다.

  • base64 코덱
    • 참고) Base64 란 8비트 이진 데이터를 ASCII 문자로 구성된 일련의 문자열로 바꾸는 인코딩이다.
  • bytes 코덱
    • 바이트 배열 데이터에 대한 송수신을 지원
  • http 코덱
    • HTTP 프로토콜을 지원하며, 하위 패키지에서 관련된 데이터 송수신 방법을 지원한다.
    • 예를 들어서 CORS, mutlipart, websocket 과 관련된 코덱들을 지원한다.
  • marshalling 코덱
    • 마샬링이란 객체를 네트워크를 통해 송신 가능한 형태로 변환하는 과정이다.
    • 반대로 언마샬링은 네트워크를 통해 수신한 정보를 다시 객체로 변환하는 과정을 의미한다.
    • 기존의 jdk에서 제공한 (역)직렬화의 문제점을 개선한 JBoss의 라이브러리를 사용한다.
  • protobuf 코덱
    • 구글의 프로토콜 버퍼를 사용한 송수신을 지원하는 코덱이다.
  • rtsp 코덱
    • RTSP(Real Time Streaming Protocol)은 오디오와 비디오 같은 실시간 데이터의 전달을 위해서 특수하게 만들어진 애플리케이션 레벨의 프로토콜이다.
    • 보통 실시간 동영상 스트리밍을 위한 제어 프로토콜이다.
  • sctp 코덱
    • TCP 가 아닌 SCTP 전송 계층을 사용하는 코덱이다.
  • spdy 코덱
    • 구글의 SPDY 프로토콜을 지원하는 코덱이다. 하지만 http2 에 의해서 지원이 중단되었다.
    • 현재는 Netty 4.1 부터 제공하는 HTTP/2 코덱을 사용한다.
  • string 코덱
    • 단순 문자열의 송수신을 지원하는 코덱이다.
  • serialization 코덱
    • 자바의 객체를 네트워크로 전송할 수 있도록 직/역직렬화를 지원하는 코덱이다.
    • ... 안 쓸거 같다 ^^;




📌 사용자 정의 코덱

사용자가 직접 필요한 프로토콜을 구현한 것을 의미한다. 이때 필요한 인바운드와 아웃바운드 핸들러를 구현해야 한다.

네티에서 기본 제공하는 HttpServerCodec 과 HttpHelloWorldServerHandler 사용자 정의 코덱으로 구성된

HttpHelloWorldServer 클래스를 작성해봤다.


package me.dailycode;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.EventExecutorGroup;

import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {
    private static final byte[] CONTENT = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};

    private static final AsciiString CONTENT_TYPE = new AsciiString("Content-Type");

    private static final AsciiString CONTENT_LENGTH = new AsciiString("Content-Length");

    private static final AsciiString CONNECTION = new AsciiString("Connection");

    private static final AsciiString KEEP_ALIVE = new AsciiString("keep-alive");

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpRequest) {
            HttpRequest req = (HttpRequest) msg;

            // HttpHeaders.is100ContinueExpected(req) 는 deprecated!
            if (HttpUtil.is100ContinueExpected(req)) {
                ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
            }

            // HttpHeaders.isKeepAlive() 는 deprecated!
            boolean keepAlive = HttpUtil.isKeepAlive(req);

            DefaultFullHttpResponse response
                    = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));

            response.headers().set(CONTENT_TYPE, "text/plain");
            response.headers().set(CONTENT_LENGTH, response.content().readableBytes());

            if (!keepAlive) {
                ctx.write(response).addListener(ChannelFutureListener.CLOSE);
            } else {
                response.headers().set(CONNECTION, KEEP_ALIVE);
                ctx.write(response);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

package me.dailycode;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslContext;

public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    public HttpHelloWorldServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (sslCtx != null) {
            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
        }

        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpHelloWorldServerHandler());
    }
}

package me.dailycode;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

public final class HttpHelloWorldServer {

    static final boolean SSL = System.getProperty("ssl") != null;

    static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "10005"));

    public static void main(String[] args) throws Exception {
        // Configure SSL
        final SslContext sslCtx;

        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
            // sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

        } else {
            sslCtx = null;
        }

        // Configure the server
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap b = new ServerBootstrap();
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new HttpHelloWorldServerInitializer(sslCtx));

            Channel ch = b.bind(PORT).sync().channel();

            System.err.println("Open your web browser and navigate to "
                    + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');

            ch.closeFuture().sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

지금 당장 모든것을 이해하기는 정말 어렵다. 다만 코덱을 직접 구현해봤다는 데에만 의의를 두길 바란다.

아무튼 실행하고나서 http://127.0.0.1:10005 로 접속하면 Hello World 를 확인할 수 있다.


실행결과


profile
백엔드를 계속 배우고 있는 개발자입니다 😊

0개의 댓글