Reactor, Proactor 패턴

hj·2021년 6월 1일
0

Reactor 패턴

이벤트에 반응하는 객체를 만들고 이벤트가 발생하면 해당 객체가 반응하여 이벤트에 맞는 핸들러와 매핑시켜서 처리하는 구조

* 이벤트에 반응하고 이벤트핸들러를 등록할 객체 (Reactor)
* 이벤트에 맞는 핸들러를 매핑해주는 객체 (Handle)
* 이벤트를 처리하기 위한 추상 객체 (EventHandler)
* 실제 이벤트 로직을 담고 있는 객체 (ConcreteEventHandler...)

구현

      위 UML처럼 구현을 해보자

  • Reactor
import java.io.IOException;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.Map;

public class Reactor {

    private int defaultPort = 80;
    private ServerSocket serverSocket;
    private Map<String, EventHandler> eventHandlerMap;

    public Reactor() {
        init(defaultPort);
    }

    public Reactor(int port) {
        init(port);
    }

    private void init(int port) {
        eventHandlerMap = new HashMap<String, EventHandler>();
        try {
            serverSocket = new ServerSocket(port);
        } catch (IOException e) {}
    }

    public void start() {
        Handle handle = new Handle();
        while(true) {
            handle.handle(serverSocket, eventHandlerMap);
        }
    }

    public void registerHandler(EventHandler eventHandler) {
        eventHandlerMap.put(eventHandler.getHandler(), eventHandler);
    }

    public void removeHandler(EventHandler eventHandler) {
        eventHandlerMap.remove(eventHandler.getHandler());
    }
}
  • Handle
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;

public class Handle {

    public void handle(ServerSocket serverSocket, Map<String, EventHandler> eventHandlerMap) {
        try {
            Socket socket = serverSocket.accept();
            demultiplex(socket, eventHandlerMap);
        } catch (IOException e) {}
    }

    private void demultiplex(Socket socket, Map<String, EventHandler> eventHandlerMap) {
        try (InputStream inputStream = socket.getInputStream()) {
            byte[] bytes = new byte[4];
            inputStream.read(bytes);
            String header = new String(bytes);

            eventHandlerMap.get(header).handleEvent(inputStream);
        } catch (IOException e) {}
    }

}
  • EventHandler
import java.io.InputStream;

public interface EventHandler {
    public String getHandler();
    public void handleEvent(InputStream inputStream);
}
  • ConcreteEventHandler1
import java.io.IOException;
import java.io.InputStream;

public class ConcreteEventHandler1 implements EventHandler {

    @Override
    public String getHandler() {
        return "0001";
    }

    @Override
    public void handleEvent(InputStream inputStream) {
        System.out.println("hello ConcreteEventHandler1");

        try {
            byte[] bytes = new byte[1024];
            inputStream.read(bytes);

            //..... your biz logic
        } catch (IOException e) { }
    }
}
  • ConcreteEventHandler2
import java.io.IOException;
import java.io.InputStream;

public class ConcreteEventHandler2 implements EventHandler {
    @Override
    public String getHandler() {
        return "0002";
    }

    @Override
    public void handleEvent(InputStream inputStream) {
        System.out.println("hello ConcreteEventHandler2");

        try {
            byte[] bytes = new byte[1024];
            inputStream.read(bytes);

            //..... your biz logic
        } catch (IOException e) { }
    }
}
  • main
public class ReactorMain {
    public static void main(String[] args) {
        Reactor reactor = new Reactor();

        reactor.registerHandler(new ConcreteEventHandler1());
        reactor.registerHandler(new ConcreteEventHandler2());

        reactor.start();
    }
}

Proactor 패턴

OS의 비동기 I/O를 호출하고 작업이 완료되면 콜백형식으로 받아서 적절한 이벤트를 호출하고 처리하는 구조

* Proactor는 이벤트핸들러를 Dispatcher에 등록
* Async Operation Processor가 비동기 요청을 받고 완료가 되면 Completion Event Queue에 Enqueue 한다
* Completion Dispatcher는 I/O완료 통지를 받으면 Completion Event Queue에서 Dequeue 하며 Demultiplexer로 전달한다
* Demultiplexer에서 이벤트에 맞는 핸들러를 매핑해준다.
* 적절한 이벤트를 실행한다.

구현

      위 플로우대로 구현을 해보자

  • Proactor
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;

public class Proactor {

    private int port = 80;
    private Map<String, EventHandler> eventHandlerMap;

    public Proactor() {
        init(this.port);
    }

    public Proactor(int port) {
        init(port);
    }

    private void init(int port) {
        eventHandlerMap = new HashMap<String, EventHandler>();
        this.port = port;
    }

    public void start() {
        try {
            AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
                    Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
            AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);

            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", this.port);
            asynchronousServerSocketChannel.bind(inetSocketAddress);
            
            asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, new Dispatcher(eventHandlerMap));
        } catch (IOException e) {
        } finally {
        }
    }

    public void registerHandler(EventHandler eventHandler) {
        eventHandlerMap.put(eventHandler.getHandler(), eventHandler);
    }

    public void removeHandler(EventHandler eventHandler) {
        eventHandlerMap.remove(eventHandler.getHandler());
    }
}
  • EventHandler
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public interface EventHandler extends CompletionHandler<Integer, ByteBuffer> {

    public String getHandler();
    public void setChannel(AsynchronousSocketChannel channel);
}
  • Dispatcher
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class Dispatcher implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {

    private Map<String, EventHandler> eventHandlerMap;

    public Dispatcher(Map<String, EventHandler> eventHandlerMap) {
        this.eventHandlerMap = eventHandlerMap;
    }

    public Dispatcher() {}

    @Override
    public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
        attachment.accept(attachment, this);

        ByteBuffer buffer = ByteBuffer.allocate(4);
        result.read(buffer, 60, TimeUnit.SECONDS, buffer, new Demultiplexer(result, eventHandlerMap));
    }

    @Override
    public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {

    }
}
  • Demultiplexer
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Map;

public class Demultiplexer implements CompletionHandler<Integer, ByteBuffer> {

    private AsynchronousSocketChannel channel;
    private Map<String, EventHandler> eventHandlerMap;

    public Demultiplexer(AsynchronousSocketChannel channel, Map<String, EventHandler> eventHandlerMap) {
        this.channel = channel;
        this.eventHandlerMap = eventHandlerMap;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();

        String header = new String(attachment.array());
        ByteBuffer newBuffer = ByteBuffer.allocate(1024);

        eventHandlerMap.get(header).setChannel(channel);
        channel.read(newBuffer, newBuffer, eventHandlerMap.get(header));
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {

    }
}
  • ConcreteEventHandler1
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;

public class ConcreteEventHandler1 implements  EventHandler {

    private AsynchronousSocketChannel channel;

    @Override
    public String getHandler() {
        return "0001";
    }

    @Override
    public void setChannel(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        //..... your biz logic
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {

    }
}
  • ConcreteEventHandler2
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;

public class ConcreteEventHandler2 implements EventHandler {

    private AsynchronousSocketChannel channel;

    @Override
    public String getHandler() {
        return "0002";
    }

    @Override
    public void setChannel(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        //..... your biz logic
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {

    }
}
  • main
public class ProactorMain {
    public static void main(String[] args) {
        Proactor proactor = new Proactor();

        proactor.registerHandler(new ConcreteEventHandler1());
        proactor.registerHandler(new ConcreteEventHandler2());

        proactor.start();
    }
}

성능

아래 테스트는 여기서 참고하였습니다.

  • Connection 1개
    (X축 : I/O operation 크기)

    Connection이 하나일 경우엔 Proactor패턴이 더 느린걸 볼 수 있다.
    이유는 유저레벨 -> 커널레벨, 커널레벨->유저레벨로 데이터를 전달하는데 발생하는 오버헤드 때문에 느리다.

  • Connection 3개
    (X축 : I/O operation 크기)

    Connection이 3개인 경우엔 비슷한 성능을 보이고 있다.

  • Connection 50개
    (X축 : I/O operation 크기)

    Connection이 50개인 경우부터는 I/O operation이 커질수록 우세한 성능결과를 볼 수 있다.

  • Connection 100개
    (X축 : I/O operation 크기)

    Connection이 100개인 경우도 50개인 경우와 마찬가지로 우세한 성능 결과를 볼 수 있다.

결론

Proactor 패턴은 대규모 서비스일 수록 Reactor 패턴보다 뛰어나지만 규모가 작은 서비스인 경우엔 Reactor패턴이 더 유리한걸 볼 수 있다. 다만 Proactor패턴은 비동기가 OS에서 이루어지기에 비동기 시스템 호출의 흐름을 제어하기가 어려운 단점이 있다.
추가적으로 Proactor 패턴은 window 환경에선 iocp를 활용해 빠를진 모르나 리눅스나 유닉스같은 환경에선 좋지 못하다.
리눅스나 유닉스는 reactor방식인 epoll, kqueue을 쓰며 iocp에 못지않는 성능을 보여준다.

* 참고자료

profile
Something Interesting

2개의 댓글

comment-user-thumbnail
2021년 6월 21일

좋은 글 감사합니다. -도라에몽-

1개의 답글