위 UML처럼 구현을 해보자
import java.io.IOException;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.Map;
public class Reactor {
private int defaultPort = 80;
private ServerSocket serverSocket;
private Map<String, EventHandler> eventHandlerMap;
public Reactor() {
init(defaultPort);
}
public Reactor(int port) {
init(port);
}
private void init(int port) {
eventHandlerMap = new HashMap<String, EventHandler>();
try {
serverSocket = new ServerSocket(port);
} catch (IOException e) {}
}
public void start() {
Handle handle = new Handle();
while(true) {
handle.handle(serverSocket, eventHandlerMap);
}
}
public void registerHandler(EventHandler eventHandler) {
eventHandlerMap.put(eventHandler.getHandler(), eventHandler);
}
public void removeHandler(EventHandler eventHandler) {
eventHandlerMap.remove(eventHandler.getHandler());
}
}
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
public class Handle {
public void handle(ServerSocket serverSocket, Map<String, EventHandler> eventHandlerMap) {
try {
Socket socket = serverSocket.accept();
demultiplex(socket, eventHandlerMap);
} catch (IOException e) {}
}
private void demultiplex(Socket socket, Map<String, EventHandler> eventHandlerMap) {
try (InputStream inputStream = socket.getInputStream()) {
byte[] bytes = new byte[4];
inputStream.read(bytes);
String header = new String(bytes);
eventHandlerMap.get(header).handleEvent(inputStream);
} catch (IOException e) {}
}
}
import java.io.InputStream;
public interface EventHandler {
public String getHandler();
public void handleEvent(InputStream inputStream);
}
import java.io.IOException;
import java.io.InputStream;
public class ConcreteEventHandler1 implements EventHandler {
@Override
public String getHandler() {
return "0001";
}
@Override
public void handleEvent(InputStream inputStream) {
System.out.println("hello ConcreteEventHandler1");
try {
byte[] bytes = new byte[1024];
inputStream.read(bytes);
//..... your biz logic
} catch (IOException e) { }
}
}
import java.io.IOException;
import java.io.InputStream;
public class ConcreteEventHandler2 implements EventHandler {
@Override
public String getHandler() {
return "0002";
}
@Override
public void handleEvent(InputStream inputStream) {
System.out.println("hello ConcreteEventHandler2");
try {
byte[] bytes = new byte[1024];
inputStream.read(bytes);
//..... your biz logic
} catch (IOException e) { }
}
}
public class ReactorMain {
public static void main(String[] args) {
Reactor reactor = new Reactor();
reactor.registerHandler(new ConcreteEventHandler1());
reactor.registerHandler(new ConcreteEventHandler2());
reactor.start();
}
}
위 플로우대로 구현을 해보자
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
public class Proactor {
private int port = 80;
private Map<String, EventHandler> eventHandlerMap;
public Proactor() {
init(this.port);
}
public Proactor(int port) {
init(port);
}
private void init(int port) {
eventHandlerMap = new HashMap<String, EventHandler>();
this.port = port;
}
public void start() {
try {
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", this.port);
asynchronousServerSocketChannel.bind(inetSocketAddress);
asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, new Dispatcher(eventHandlerMap));
} catch (IOException e) {
} finally {
}
}
public void registerHandler(EventHandler eventHandler) {
eventHandlerMap.put(eventHandler.getHandler(), eventHandler);
}
public void removeHandler(EventHandler eventHandler) {
eventHandlerMap.remove(eventHandler.getHandler());
}
}
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public interface EventHandler extends CompletionHandler<Integer, ByteBuffer> {
public String getHandler();
public void setChannel(AsynchronousSocketChannel channel);
}
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class Dispatcher implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
private Map<String, EventHandler> eventHandlerMap;
public Dispatcher(Map<String, EventHandler> eventHandlerMap) {
this.eventHandlerMap = eventHandlerMap;
}
public Dispatcher() {}
@Override
public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
attachment.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(4);
result.read(buffer, 60, TimeUnit.SECONDS, buffer, new Demultiplexer(result, eventHandlerMap));
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
}
}
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Map;
public class Demultiplexer implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
private Map<String, EventHandler> eventHandlerMap;
public Demultiplexer(AsynchronousSocketChannel channel, Map<String, EventHandler> eventHandlerMap) {
this.channel = channel;
this.eventHandlerMap = eventHandlerMap;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String header = new String(attachment.array());
ByteBuffer newBuffer = ByteBuffer.allocate(1024);
eventHandlerMap.get(header).setChannel(channel);
channel.read(newBuffer, newBuffer, eventHandlerMap.get(header));
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
public class ConcreteEventHandler1 implements EventHandler {
private AsynchronousSocketChannel channel;
@Override
public String getHandler() {
return "0001";
}
@Override
public void setChannel(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
//..... your biz logic
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
public class ConcreteEventHandler2 implements EventHandler {
private AsynchronousSocketChannel channel;
@Override
public String getHandler() {
return "0002";
}
@Override
public void setChannel(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
//..... your biz logic
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
public class ProactorMain {
public static void main(String[] args) {
Proactor proactor = new Proactor();
proactor.registerHandler(new ConcreteEventHandler1());
proactor.registerHandler(new ConcreteEventHandler2());
proactor.start();
}
}
아래 테스트는 여기서 참고하였습니다.
Connection 1개
(X축 : I/O operation 크기)
Connection이 하나일 경우엔 Proactor패턴이 더 느린걸 볼 수 있다.
이유는 유저레벨 -> 커널레벨, 커널레벨->유저레벨로 데이터를 전달하는데 발생하는 오버헤드 때문에 느리다.
Connection 3개
(X축 : I/O operation 크기)
Connection이 3개인 경우엔 비슷한 성능을 보이고 있다.
Connection 50개
(X축 : I/O operation 크기)
Connection이 50개인 경우부터는 I/O operation이 커질수록 우세한 성능결과를 볼 수 있다.
Connection 100개
(X축 : I/O operation 크기)
Connection이 100개인 경우도 50개인 경우와 마찬가지로 우세한 성능 결과를 볼 수 있다.
좋은 글 감사합니다. -도라에몽-