Java NIO에 대해 알아보자 (2/2)

hj·2021년 5월 25일
1

Asynchronous IO(AIO)

Java 1.7부터 새롭게 등장한 클래스를 얘기하기 전에 

Asynchronous IO에 대해 이해가 필요하다.

I/O Model

IBM AIO 관련 내용을 참고하면 위 사진처럼 I/O 모델이 존재한다.

모델을 보고나서 먼저 든 생각은 Synchronous-Blocking? Asynchronous-Non-Blocking? 무슨 차이지라는 생각과 Synchronous인데 Non-Blocking? Asynchronous인데 Blocking?이 무엇이지 라는 생각이 들었다.
그래서 이번기회에 해당 개념을 제대로 이해해 보기 위해 정리해보았다.

Synchronous/Asynchronous

동시에 어떠한 행위를 신경쓰는 것
즉 호출된 함수의 작업 결과를 누가 신경쓰는지에 따라 다르다.
* Synchronous : 호출된 함수의 작업 결과를 호출하는 함수와 호출되는 함수 둘다 신경쓴다.
* Asynchronous : 호출하는 함수는 호출된 함수의 작업 결과를 신경쓰지 않는다.

Blocking/Non-Blocking

어떠한 행위 자체가 그 행위로 인해 다른 무엇가를 못하고 막혀 대기하는 상태
즉 호출된 함수가 작업이 완료될때까지 호출한 함수을 제어권을 통제하는지 마는지에 따라 다르다.
* Blocking : 호출된 함수가 호출한 함수에게 제어권을 넘겨주지않고 자신의 작업이 모두 마칠때까지 대기하게 만든다.
* Non-Blocking : 호출된 함수가 호출한 함수에게 바로 제어권을 넘겨주고 다른 행위를 할 수 있도록 하게 만든다.

Synchronous/Asynchronous, Blocking/Non-Blocking 예시

아래 이미지는 적절한 예시라 판단되어 여기에서 가져온 내용이다.

그러면 앞서 구현하였던 코드는 어떤 부분에 해당 될까?

I/O 자체는 Channel을 통해 Block이 되지 않고 읽어드리지만 Selector가 하나의 Key 작업이 완료 될때까지 다른 Selector Key들이 해당 Key 작업을 완료여부를 신경쓰고 기다리고 있기 때문에 Synchronous Non-Block 모델에 해당될 것이다.

AsynchronousServerSocketChannel

Java 1.7 등장한 클래스이며 AIO를 지원하는 클래스이다
문서를 참고하여서 간단한 코드를 구현해보자

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AsyncNio {

    AsynchronousChannelGroup channelGroup;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;


    private void init() throws IOException {
        try {
            channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
                    Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory());
            asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);

            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 80);
            asynchronousServerSocketChannel.bind(inetSocketAddress);
        } catch (IOException e) {
        } finally {
        }
    }

    public void startServer() throws IOException {
        init();
        start();
    }

    private void start() {
        try {
            asynchronousServerSocketChannel.accept(null, acceptCompletionHandler(null, asynchronousServerSocketChannel));
        } catch(Exception e) {
        }
    }

    private CompletionHandler<AsynchronousSocketChannel, Void> acceptCompletionHandler(String att, AsynchronousServerSocketChannel asynchronousServerSocketChannel) {
        return new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel channel, Void attachment) {
                // TODO Auto-generated method stub
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);

                channel.read(readBuffer, 60, TimeUnit.SECONDS, attachment, channelReadHandler(readBuffer, channel));
                readBuffer.rewind();
                asynchronousServerSocketChannel.accept(null, acceptCompletionHandler(null, asynchronousServerSocketChannel));
            }
            @Override
            public void failed(Throwable exc, Void attachment) {
            }
        };
    }

    private CompletionHandler<Integer, Void> channelReadHandler(ByteBuffer readBuffer, AsynchronousSocketChannel asynchronousSocketChannel) {
        return new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer result, Void attachment) {
                // TODO Auto-generated method stub
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                try {
                    // biz ....
                } catch(Exception e) {
                    //HTTP 500 error
                } finally {
                    asynchronousSocketChannel.write(byteBuffer, 60, TimeUnit.SECONDS, attachment, channelWriteHandler(byteBuffer, asynchronousSocketChannel));
                    byteBuffer.rewind();
                }
            }
            @Override
            public void failed(Throwable exc, Void attachment) {
                //HTTP 503 error
            }
        };
    }

    private CompletionHandler<Integer, Void> channelWriteHandler(ByteBuffer writeBuffer, AsynchronousSocketChannel asynchronousSocketChannel) {
        return new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer result, Void attachment) {
                // TODO Auto-generated method stub
                try {
                    // biz ....
                } catch(IOException e) { }
            }
            @Override
            public void failed(Throwable exc, Void attachment) {
                try {
                    close();
                } catch(IOException e) { }
            }
        };
    }

    private void close() throws IOException {
        asynchronousServerSocketChannel.close();
        if(channelGroup != null) channelGroup.shutdown(); channelGroup = null;
    }
}

소스를 보면 block되는 곳은 없고 처리는 OS의 비동기를 통해 CompletionHandler로 콜백 처리가 되고 있다.
이러한 흐름을 Proactor패턴이라 부르는데. Proactor패턴은 다음시간에 알아보자.

다음 글엔 Reactor 패턴과 Proactor패턴을 알아보는 내용을 담도록 하겠다
(*Proactor 패턴 :OS의 비동기 I/O를 호출하고 작업이 완료되면 콜백형식으로 받아서 적절한 이벤트를 호출하고 처리하는 구조
다만 리눅스나 유닉스 환경에서는 좋지 못하다.)

* 참고자료

profile
Something Interesting

4개의 댓글

comment-user-thumbnail
2021년 5월 25일

배낀 글 감사합니다. BTS 방탄소년단

1개의 답글
comment-user-thumbnail
2021년 5월 31일

포레버 NCT

1개의 답글