기존 java.io와 다른 새로운 입출력 API
구분 | IO | NIO |
---|---|---|
입출력 방식 | 스트림 방식 | 채널 방식 |
버퍼 방식 | 넌버퍼(Non-Buffer) | 버퍼(Buffer) |
동기/비동기 방식 | 동기 방식 | 동기/비동기 모두 지원 |
블로킹/넌블로킹 | 블로킹 방식 | 블로킹/넌블로킹 방식 모두 지원 |
스트림은 압력 스트림과 출력 스트림으로 구분되어 있어 별도로 생성하지만, 채널은 양방향으로 입출력 가능하므로 하나만 생성
넌버퍼(non-buffer) 는 1바이트씩 읽고 출력하기 때문에 느리고, 스트림으로부터 입력된 데이터를 별도로 저장하지 않는 이상 버퍼 내에서 데이터 위치 이동을 통한 자유로운 이용이 불가능하지만 보조스트림인 BufferedInputStream
,BufferedOutputStream
을 이용해서 버퍼 제공 가능함
버퍼 는 기본적으로 버퍼를 사용해서 입출력 처리하기 때문에 성능이 좋고 읽은 데이터를 무조건 버퍼(메모리 저장소)에 저장하기 때문에 버퍼 내에서 데이터 위치 이동해가며 필요한 부분 읽고 쓰기가 가능함
IO 블로킹은 입력 스트림의 read() 메소드 호출 시 데이터 입력 전까지 스레드가 블로킹(대기상태)가 되며, 출력 스트림의 write() 메소드 호출 시 데이터 출력 전까지 스레드는 블로킹됨, 이때 스레드가 블로킹되면 다른 일을 할 수 없고 interrupt 통해서 빠져나오기도 불가능하므로 스트림 닫기를 통해서만 블로킹 빠져나올 수 있음
NIO 블로킹/넌블로킹에서 NIO 블로킹은 스레드를 interrupt 함으로써 블로킹 상태에서 빠져나올 수 있으며 넌블로킹은 입출력 작업 시 스레드가 블로킹되지 않음
연결 클라이언트의 수가 적고, 전송되는 데이터가 대용량이면서, 순차적 처리가 필요한 경우 사용
연결 클라이언트이 수가 많고, 전송되는 데이터 용량이 적으면서, 입출력 작업 처리가 빨리 끝나는 경우에 사용
java.nio.file
, java.nio.file.attribute
패키지를 통해 IO 보다 더 다양한 파일의 속성 정보를 제공해 주는 클래스와 인터페이스를 제공함
java.nio.file.Path
인터페이스는 java.io.File
클래스에 대응되며 여러 곳에서 파일 경로를 지정하기 위해 Path를 사용함
java.nio.file.Paths
클래스의 정적 메소드 get()을 통해 Path 구현 객체 얻을 수 있음
운영체제의 파일 시스템은 FileSystem 인터페이스를 통해서 접근하며 구현 객체는 FileSystems의 getDefault()를 통해 얻을 수 있음
java.nio.file.Files
클래스를 통해 파일과 디렉토리의 생성 및 삭제, 속성을 읽는 메소드를 제공함
디렉토리의 내용 변화를 감지해 파일의 생성, 삭제, 수정을 감시함
//WatchService 생성
WatchService watchService = FileSystems.getDefault().newWatchService();
//감시가 필요한 디렉토리에서 WatchService 등록
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE);
WatchEvent 발생
WatchService는 해당 이벤트 정보 가진 WatchKey 생성하고, Queue에 저장
WatchEvent의 take() 메소드를 호출해 큐에 watchKey가 들어올때까지 대기했다가 큐에 들어오면 반환
while(true){
WatchKey watchKey = watchService.take();
}
//WatchKey로부터 List<WatchEvent> 획득
List<WatchEvent<?>> list = watchKey.pollEvents();
while(true){
WatchKey watchKey = watchService.take();
List<WatchEvent<?>> list = watchKey.pollEvents();
for(WatchEvent watchEvent : list){
Kind kind = watchEvent.kind();
Path path = (Path)watchEvent.context();
if(kind == StandardWatchEventKinds.ENTRY_CREATE){
//생성된 경우 실행할 코드
}else if (kind == StandardWatchEventKinds.EVENT_DELETE){
//삭제된 경우 실행할 코드
}else if (kind == StandardWatchEventKinds.ENTRY_MODIFY){
//변경된 경우 실행할 코드
}else if(kind == StandardWatchEventKinds.OVERFLOW){
//운영체제에서 이벤트 소실되거나 버려진 경우 -> 별도 처리 코드 필요없음
}
}
boolean valid = watchKey.reset();
if(!valid){break;}
}
watchService.close();
한번 사용된 watchKey는 새로운 이벤트 발생시 다시 큐에 들어가야하기 때문에 reset() 메소드로 초기화해야하므로 초기화에 성공한 경우 true, 감시하는 디렉토리 삭제되거나 키가 더이상 유효하지 않은 경우 false 반환
읽고 쓰기 가능한 메모리 배열로 NIO에서는 데이터를 입출력하기 위해 항상 버퍼 사용함
저장되는 데이터 타입에 따라 ByteBuffer, CharBuffer, IntBuffer, DoubleBuffer로 분류
어떤 메모리를 사용하느냐에 따라 DirectBuffer, NonDirectBuffer로 분류
Buffer 추상 클래스를 상속하는 별도의 데이터 타입 클래스들
Byte/Char/Short/Int/Long/Float/Double Buffer로 나뉘며
MappedByteBuffer는 파일의 내용에 랜덤하게 접근하기 위해 파일의 내용을 메모리와 맵핑시킨 버퍼
nondirect buffer
JVM이 관리하는 힙 메모리 공간 이용하여 버퍼 생성 시간이 빠르지만 버퍼의 크기를 크게 잡을 수 없음
입출력 위해서는 임시 다이렉트 버퍼를 생성해 복사하여 운영체제의 native IO 기능을 수행해야하므로 입출력 성능이 낮음
direct buffer
운영체제가 관리하는 메모리 공간을 이용해 native C 함수를 호출하고 잡다한 처리가 필요해 버퍼 생성이 상대적으로 느리기 때문에 한 번 생성해 놓고 재사용하는 곳에 적합하며, 운영체제가 허용하는 범이 내에서 대용퍙 버퍼 생성 가능
각 데이터 타입별 넌다이렉트 버퍼 생성하며 매개값은 해당 데이터 타입의 저장 개수 지정
이미 생성되어 있는 타입별 배열을 래핑해서 넌다이렉트 버퍼 생성
JVM 힙 메모리 바깥 쪽, 즉 운영체제가 관리하는 메모리에 다이렉트 버퍼 생성해 각 타입별 Buffer 클래스에는 없고 ByteBuffer에서만 제공
각 타입별 다이렉트 버퍼를 생성하며 우선 ByteBuffer 생성한 후에 호출함
운영체제는 두 바이트 이상을 처리할 때 처리 효율이나 CPU 디자인 상의 문제로 바이트 해석 순서가 정해져있으며 바이트 해석 순서는 데이터를 외부로 보내거나 외부에서 받을 때 영향을 미치기 때문에 바이트 데이터 다루는 버퍼도 이를 고려해야함
Big endian은 앞 바이트부터 처리하며 Little endian은 뒤 바이트부터 처리함
JVM은 동일한 조건으로 클래스 실행해야하기 때문에 Big endian 방식으로 동작하게 되어있다
Little Endian으로 동작하는 운영체제에서 만든 데이터 파일을 Big Endian 방식으로 동작하는 운영체제에서 읽어들일 경우 ByteOrder
클래스로 데이터 순서 맞춰야함
운영체제와 JVM의 바이트 해석 순서가 다를 경우 JVM 이 운영체제와 데이터 교환할 때 자동으로 처리하지만 다이렉트 버퍼를 이용할 경우 운영체제의 native IO를 사용하므로 운영체제의 기본 해석 순서에 JVM 해석 순서를 맞추는 것이 도움됨
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(100).order(ByteOrder.nativeOrder());
get(...)
, put(...)
각 타입별 하위 Buffer 클래스가 가지고 있음
상대적 메소드(index 매개값 없음)의 경우 현재 위치 속성인 position에서 데이터 읽고 저장,
절대적 메소드(index 매개값 있음)의 경우 position과 상관없이 주어진 인덱스에서 데이터 읽고 저장
버퍼가 다 찼는데 데이터 저장하려는 경우, 버퍼에서 더이상 읽어올 데이터가 없는데 데이터 읽으려는 경우에 버퍼 예외가 발생함
BufferOverflowException
- position이 limit에 도달했을 때 put() 호출하면 발생, BufferUnderflowException
- position이 limit에 도달했을 때 get() 호출하면 발생, , InvalidMarkException
- mark가 없는 상태에서 reset() 메소드 호출하면 발생, ReadOnlyBufferException
- 읽기 전용 버퍼에서 put(), compact() 메소드 호출하면 발생
채널이 데이터 저장하고 읽는 버퍼는 모두 ByteBuffer로 프로그램 목적에 맞게 다른 기본 타입 버퍼로 변환이 필요함
Charset charSet = CharSet.forName("UTF-8");
Charset charset = Charset.defaultCharset();
String data = "...";
ByteBuffer byteBuffer = charset.encode(data);
ByteBuffer byteBuffer = ...;
String data = charset.decode(byteBuffer).toString();
int[] data = new int[] {10, 20};
IntBuffer intBuffer = IntBuffer.wrap(data);
ByteBuffer byteBuffer = ByteBuffer.allocate(intBuffer.capacity() * 4);
for(int i = 0 ; i < intBuffer.capacity(); i++){
byteBuffer.putInt(intBuffer.get(i));
}
byteBuffer.flip();
ByteBuffer byteBuffer = ...;
IntBuffer intBuffer = byteBuffer.asIntBuffer();
int[] data = new int[intBuffer.capacity()];
intBuffer.get(data);
System.out.println("읽은 배열 : " + Arrays.toString(data));
FileChannel 통해 파일 읽기와 쓰기 가능하며 동기화 처리가 되어있기 때문에 멀티 스레드 환경에서 사용해도 안전함
FileChannel.open(Path path, OpenOption... options)
, FileInputStream.getChannel()
, FileOutputStream.getChannel()
을 통해 생성 가능하며 FileChannel.close()
통해 닫음
OpenOption은 StandardOpenOption의 열거함수를 이용함
int bytesCount = fileChannel.write(ByteBuffer src);
파일에 쓰여지는 바이트는 ByteBuffer의 position부터 limit까지이다.
int bytesCount = fileChannel.read(ByteBuffer dst);
파일에서 읽혀지는 바이트는 ByteBuffer의 position부터 저장되며 리턴값은 파일에서 ByteBuffer로 읽혀진 바이트 수이다.
파일 채널을 두개 이용하거나 Files.copy() 이용
Path targetPath = Files.copy(Path source, Path target, CopyOption... options)
read(), write()가 블로킹되어 블로킹 동안 UI 갱신 또는 이벤트 처리가 불가능하므로 별도의 작업 스레드 생성해서 이들 메소드를 호출해야하며, 동시 처리해야할 파일 수가 많다면 스레드 수 증가로 문제될 수가 있음
read(), write()를 즉시 리턴하고 스레드풀에 작업처리 요청하여 처리하기 때문에 불특정 다수의 파일 및 대용량 파일의 입출력 작업시 유리하며
작업 스레드가 파일 입출력 완료하게 되면 콜백 메소드가 자동 호출됨
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(
Paths.get("C:/Temp/file.txt");
EnumSet.of(StandardOpenOption.CREATE, StandardOpenOption.WRITE),
executorService
);
fileChannel.close();
void read(ByteBuffer dst, long position, A attachment, CompletionHandler<Integer, A> handler)
void write(ButeBuffer src, long position, A attachment, CompletionHandler<Integer, A> handler)
dst, src : 읽거나 쓰기 위한 ByteBuffer
position : 파일에서 읽을 위치이거나 쓸 위치
attachment : 콜백 메소드로 전달한 첨부 객체
handler : CompletionHandler<Integer, A> 구현 객체
Integer : 입출력 작업 처리 후의 결과 타입 (고정)
A : 첨부 객체 타입으로 첨부 객체가 필요없다면 Void 가능
complete(Integer result, A attachment)
는 작업이 정상적으로 완료된 경우 콜백되며
failed(Throwable exc, A attachment)
는 예외로 인해 작업이 실패된 경우 콜백됨
new CompletionHandler<Integer, A>(){
@OVerride
public void completed(Integer result, A attachment){...}
@Override
public void failed(Throwable exc, A attachment){...}
}
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
serverSocketChannel.bind(new InetSocketAddress(5001));
SocketChannel socketChannel = serverSocketChannel.accept();
serverSocketChannel.close();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 5001));
socketChannel.close();
// 보내는 코드
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode("Hello Server");
socketChannel.write(byteBuffer);
// 받는 코드
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
int byteCount = socketChannel.read(byteBuffer);
byteBuffer.flip();
Charset charset = Charset.forName("UTF-8");
String message = charset.decode(byteBuffer).toString();
accept(), connect(), read(), write() 메소드가 블로킹 없이 즉시 리턴되기 때문에 작업 처리 준비가 되지 않은 상태에서 메소드 실행하면 안되므로 작업 처리 준비가 완료된 채널만 선택해서 처리해야함
셀렉터가 작업 처리 준비된 채널을 선택하며, 멀티 채널 작업을 싱글 스레드에서 처리할 수 있음
채널은 자신의 작업 유형을 SelectionKey로 생성하여 interestset에 키를 등록함, 작업 처리 준비가 된 키를 선택해 선택된 keySet에 별도 저장한 후, 선택된 keySet에서 키를 하나씩 꺼내면서 연관된 채널 작업을 처리함
try{
Selector selector = Selector.open();
}catch(IOException e){
...
}
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
SelectionKey selectionKey = serverSocketChannel.register(Selector sel, int ops);
SelectionKey selectionKey = socketChannel.register(Selector sel, int ops);
SelectionKey key = socketChannel.keyFor(selector)
를 통해서도 얻을 수 있음Selector.select() 메소드는 관심 키셋의 SelectionKey로부터 작업 처리 준비가 되었다는 통보가 올 때까지 블로킹
select가 반환되는 경우는 최소 하나의 채널이 작업 처리 준비가 완료된 경우, Selector의 wakeup() 메소드 호출 시, select() 호출 스레드가 인터럽트 된경우
select() 메소드가 1 이상의 값 리턴할 경우 selectedKeys() 메소드로 작업 처리 준비된 SelectionKey들을 Set 컬렉션으로 얻으면 됨
선택된 키셋에서 SelectionKey를 하나씩 꺼내 작업 유형별로 채널 작업을 처리함
int keyCount = selector.select();
if(keyCount == 0){
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if(selectionKey.isAcceptable()){//연결 수락 작업 처리}
else if(selectionKey.isReadable()){//읽기 작업 처리}
else if(selectionKey.isWritable()){//쓰기 작업 처리}
iterator.remove();
}
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
작업 스레드가 채널 작업 처리하다보면 채널 객체 이외의 다른 객체가 필요하며 이런 객체들은 SelectionKey에 첨부해두고 사용함
attach() 통해서 객체 첨부하며, attachment() 통해 첨부된 객체 얻음
Client client = new Client(socketChannel);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(client);
if(selectionKey.isReadable()){
Client client = (Client) selectionKey.attachment();
...
}
connect(), accept(), read(), write() 호출하면 즉시 리턴하며, 실질적인 입출력 작업 처리는 스레드 풀의 스레드가 담당하고 스레드가 작업 처리 완료하면 콜백 메소드를 호출함
read() 호출하면 즉시 리턴되며 내부적으로 스레드풀의 작업 스레드가 실질적 실행하며 실행 완료 후에 completed() 콜백
같은 스레드 풀을 공유하는 비동기 채널들의 묶음으로 하나의 스레드 풀을 사용하는 경우 모든 비동기 채널은 같은 채널 그룹에 속해야 함
채널 그룹 지정하지 않으면 기본 비동기 채널 그룹에 속하게 되며 내부적으로 생성되는 스레드 풀을 이용함
new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
Long.MAX_VALUE, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
threadFactory
);
비동기 채널 그룹이 사용하는 스레드 개수를 지정하려면 새로운 비동기 채널 그룹을 생성해야함
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
최대 스레드 수,
Executors.defaultThreadFactory()
);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
Executors.defaultThreadFactory()
);
channelGroup.shutdown();
channelGroup.shutdownNow();
//basic
AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
//new Channle
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
Executors.defaultThreadFactory()
);
AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
// port binding
asynchronousServerSocketChannel.bine(new InetSocketAddress(5001));
//accept
asynchronousServerSocketChannel.accept(null,
new CompletionHandler<AsynchronousSocketChannel, Void>{
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Void attachment){
//수락 후 실행할 코드
asynchronousServerSocketChannel.accept(null, this); //accept 재호출
}
@Override
public void failed(Throwable exc, Void attachment){
//수락 실패시 실행할 코드
}
}
);
//close
asynchronousServerSocketChannel.close();
accept(A attachment, CompletionHandler<AsynchronousSocketChannel, A> handler
매개변수로 콜백 메소드의 매개값으로 제공할 첨부 객체와 콜백 메소드 가지고 있는 CompletionHandler 구현 객체 가짐
//basic channel group
AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open();
//new channel group
AsynchronousChannelGroup channelGroup = AysnchronousChannelGroup.withFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
Executors.defaultThreadFactory()
);
AsynchronousSocketChannel asynchronousSocketChannle = AsynchronousSocketChannel.open(channelGroup);
//close
asynchronousSocketChannel.close();
connect(SocketAddress remote, A attachment, CompletionHandler<Void, A> handler);
매개값으로 서버 IP와 연결 포트 정보 가진 InetSocketAddress 객체, 콜백 메소드의 매개값으로 제공할 첨부 객체, CompletionHander<Void,A> 구현 객체 가짐
asynchronousSocketChannel.connect(new InetSocketAddress("localhost", 5001), null,
new CompletionHandler<Void, Void>(){
@Override
public void completed(Void result, Void attachment){
//연결 성공 후 실행할 코드
}
@Override
public void failed(Throwable e, Void attachment){
//연결 실패 후 실행할 코드
}
}
);
AsynchronousSocketChannel의 read(), write() 메소드로 데이터 통신
read(ByteBuffer dst, A attachment, CompletionHandler<Integer, A> handler);
write(ByteBuffer src, A attachment, CompletionHandler<Integer, A> handler);
읽고쓰기위한 ByteBuffer 객체, 콜백 메소드의 매개값으로 제공할 첨부 객체, CompletionHandler<Integer, A> 구현 객체
asynchronousSocketChannel.read(byteBuffer, attachment,
new CompletionHandler<Integer, A>(){
@Override
public void completed(Integer result, A attachment){
//받은 데이터 처리 코드
asynchronousSocketChannel.read(byteBuffer, attachment, this);
}
@Override
pulbic void failed(Throwable exc, A attachment){
//실패한 경우 실행 코드
}
}
);
asynchronousSocketChannel.write(byteBuffer, attachment,
new CompletionHandler<Integer, A>(){
@Override
public void completed(Integer result, A attachment){
//성공한 경우 실행할 코드
}
@Override
pulbic void failed(Throwable exc, A attachment){
//실패한 경우 실행 코드
}
}
);
DatagramChannel
블로킹, 넌블로킹 방식 모드 사용 가능함
//DatagramChannel 생성
DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
//데이터 보내기
int byteCount = datagramChannel.send(byteBuffer, new InetSocketAccress("localhost", 5001));
//닫기
datagramChannel.close();
//DatagramChannel 생성 및 포트 바인딩
DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
datagramChannel.bind(new InetSocketAddress(5001);
//데이터 받기
SocketAddress socketAddress = datagramChannel.receive(ByteBuffer, dst);
//닫기
datagramChannel.close();
receive()
데이터 받기 전까지 블로킹되고 데이터 받으면 반환되며 작업 스레드 생성해서 receive() 반복적으로 호출됨
작업 스레드 종료 위해서는 작업 스레드의 interrupt() 호출해 ClosedByInterruptException 발생시키거나, DatagramChannel의 close() 호출시켜 AsynchronousCloseException 예외 발생 시킴