이 포스팅의 코드 및 정보들은 강의를 들으며 정리한 내용을 토대로 작성한 것입니다.
이전 Future를 쓰던 방식의 가장 큰 문제점들이 여럿 있었다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(() -> "Hello World");
// get을 하기 전까지 어떤 것도 할 수 없다.
future.get();
}
그런데 get()이 블로킹 콜이기 때문에 get() 전에 많은 작업을 하면 되지만, 보통 프로그래밍을 할 때 "Hello World"작업이 완료된 뒤의 작업도 같이 구현시킨다. get()을 통해서 "Hello World"가 완료되면 그때 뒤에 써준 작업이 호출되도록 하는 것이 일반적으로 봐왔던 비동기적인 프로그래밍의 코딩 패턴이다.
하지만, 이런 여러 작업들은 Java 5의 Future 만으로는 어려운 작업이 된다.
자바에서 비동기(Asynchronous) 프로그래밍을 가능하게 하는 인터페이스
CompletableFuture를 쓰면 더이상 명시적으로 Executor를 만들어서 쓸 필요가 없다. CompletableFuture만 가지고 비동기적으로 어떤 작업들을 실행할 수 있다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("Foo");
}
complete()를 통해 future의 기본 값을 Foo
라고 정해주면서 Future의 작업 자체를 끝내게 되는 것이다.
단, get()이 없어지지는 않는다.
future.complete("Foo")
이거는 명시적으로 값을 준 거고,
static factory method를 활용해서 값을 받을 수도 있다.
만약, 실제로 어떤 작업을 실행하고 싶으면 반환이 있는/없는 작업으로 나눌 수 있다.
반환이 없는 작업은 runAsync()를 쓰면 된다.
runAsync()가 반환 타입이 없기 때문에, 제네릭 타입도 Void로 나온다. 이거는 Future만 정의한 것이기 때문에 아무 일이 벌어지지 않으므로, get()또는 join()을 해야 어떤 일이 벌어진다.
join()은 안에서 Exception이 벌어지거든, UncatchedException으로 던져지므로 굳이 에러 처리를 명시적으로 할 필요는 없다.
위 실습에서
future.get()
만 하면[Thread-Example] ForkJoinPool.commonPool-worker-3
만 출력된다. 반환 값이 있으므로 콘솔 출력을 통해 반환값까지 받아 출력하면 되는 것에 유의할 것
그냥 ExecutorService에다가 Callable 넘겨준 후 get()호출해서 blocking한 다음에 결과 가져와서 출력하는 거랑 같다.
이 포스팅 맨 위에서 서술한 바와 같이 뭔가 결과가 왔을 때 비동기적으로 이에 해당하는 callback을 실행해야 한다. Future로는 한계가 있고 get()도 여전히 호출해야 하지만, 적합한 방법들이 있다.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("[Thread-Example] " + Thread.currentThread().getName());
return "The sentence for example";
}).thenApply();
callback을 주는 방법은 thenApply()를 뒤에 붙이는 것이다. 우리가 받은 그 결과값을 다른 타입으로 변경할 수 있으며, thenApply() 자리에 function이 들어가는 것이다.
Java5의 Future를 썼을 때와는 달리, callback을 get() 호출하기 전에 정의하는 게 가능한 걸 확인할 수 있다.
thenAccept(Consumer) callback은 반환이 없고, 받아서 쓰기만 하면 될 때 활용할 수 있다.
thenAccept()에는 Consumer가 들어오며, 반환이 없기 때문에 타입이 달라지는 건 자명하다.
supplyAsync(() -> {
System.out.println("[Thread-Example] " + Thread.currentThread().getName());
return "The sentence for example";
})
여기서 콘솔 출력하는 작업까지 끝나면(종료되면)
thenAccept((s) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(s.toUpperCase());
});
supplyAsync()에서 작업하고 나서의 결과값을 받아서 부가적인 처리를 한다.
반환 받을 필요 없이 뭔가 동작을 하기만 하면 될 때 활용할 수 있는 callback이다. thenApply()
나 thenAccept()
와는 달리 결과값을 참고하지 않고 안에는 Runnable이 온다.
3개의 callback을 스레드 풀을 만들지 않고 사용하고 있었는데, 이는 ForkJoinPool 덕분에 그런 것이다.
ForkJoinPool은 Java7에 들어온 건데, 이것도 역시 Executor를 구현한 구현체 중 하나이다.
조금 다른 점은 작업을 Work Stealing 알고리즘을 사용한다는 것이며, 이는 다른 Deque(덱, Dequeue)을 쓴다는 것이다.
Queue는 먼저 들어온 게 먼저 나가지만 Deque(Dequeue)는 맨 마지막에 들어온 게 먼저 나간다.
그래서 Deque를 써서 자기 스레드가 할 일이 없으면 스레드가 직접 Deque에서 자기가 할 일을 가져와서 처리하는 방식의 Framework이다.
작업 단위를 자기가 파생시키는 세부적인 sub task가 있다면, 그 sub task를 잘게 쪼개서 다른 스레드에 분산시켜서 작업을 처리하고 모아서(joining) 그 결과값을 도출해내는 ForkJoin Framework이다.
그러므로, 우리가 별다른 Executor를 사용하지 않아도 내부적으로 ForkJoin Pool에 있는 commonPool()이라는 걸 쓰게 된다.
다만, 원한다면 얼마든지 직접 스레드 풀을 만들어서 스레드를 이용할 수 있다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("[Thread-Example] " + Thread.currentThread().getName());
return "The sentence for example";
}).thenRun(() -> {
System.out.println(Thread.currentThread().getName());
});
System.out.println(future.get());
}
이렇게 만든 거를 supplyAsync()
를 호출할 때 두 번째 인자로 줄 수 있다. supplyAsync()
나 runAsync()
, 이 두 개의 function을 호출하는 작업을 여기서 (직접 제공하려는) 스레드 풀을 사용해서 처리한다.
이렇게 하면 이름이 다른 Pool이 출력될 것이다.
thenApply()
, thenAccept()
, thenRun()
이 셋도 마찬가지로 callback을 실행할 어떤 풀을 다른 곳에서 실행하고 싶다면 thenRunAsync()
를 쓰면 된다.
thenRunAsync()를 쓰니 스레드의 이름이 다르게 출력되는 것을 확인할 수 있다.
ExecutorService를 쓸 때는 shutdown()을 해줘야 한다.
이것도 마찬가지로 스레드의 이름이 다르게 출력된다.
자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스
Future로는 하기 어려웠던 작업들
CompletableFuture
비동기로 작업 실행하기
콜백 제공하기