CompletableFuture는 Java8에서 도입된 비동기 프로그래밍 API다.
Future의 단점을 보완해서 나왔다.
Async Non-blocking 구조로 개발할 수 있다.
처음으로 알아볼 메서드는 runAsync()
와 supplyAsync()
이다.
비동기 프로그래밍을 시작할 때 반환 값의 유무에 따라 둘 중에 하나를 선택할 수 있다.
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
Util.logging(msg: "runAsync()");
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
[ForkJoinPool.commonPool-worker-1] runAsync()
runAsync()
는 위의 예제와 같이 반환 값 없이 비동기 프로그래밍을 수행한다.
Util.logging()
은 스레드 이름을 함께 출력하도록 했다.
CompletableFuture은 어떤 스레드풀을 사용할 지 지정해줄 수 있는데, 지정하지 않은 경우 위처럼 ForkJoinPool
이 디폴트이다.
String flowResult = "START";
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Util.logging(msg: "Async task 1");
return flowResult + " -> supplyAsync()";
}).thenApplyAsync((res) -> {
Util.logging(msg: "Async task 2");
return res + " -> thenApplyAsync()";
});
future.join();
try {
String result = future.get();
Util.logging(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
[ForkJoinPool.commonPool-worker-1] Async task 1
[ForkJoinPool.commonPool-worker-1] Async task 2
[main] START -> supplyAsync() -> thenApplyAsync()
supplyAsync()
는 위 예제를 보면 flowResult + " -> supplyAsync()"
을 반환하고 있다.
이 반환 값은 thenApplyAsync()
의 매개변수로 이어받아서 작업할 수 있다.
비동기 작업이 끝났을 때, 호출하는 후행 작업을 '콜백'이라고 부른다.
CompletableFuture에서 콜백 작업을 수행하는 방법을 몇 가지 알아보자.
매개변수와 반환 값의 유무에 따라 3가지 콜백 메서드를 살펴보자.
thenRun은 매개변수(결과 값)이 없고 반환 값이 없는 콜백 작업을 수행할 때 사용한다.
thenAccept은 매개변수가 있고 반환 값이 없는 콜백 작업을 수행할 때 사용한다.
thenApply는 매개변수가 있고 반환 값이 있는 콜백 작업을 수행할 때 사용한다.
그리고 각 콜백 함수는 기본 메서드와 Async 메서드가 존재한다.
둘의 차이를 자세히 알아보자.
String flowResult = "START";
ExecutorService es = Executors.newFixedThreadPool(nThreads: 1);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Util.logging(msg: "supplyAsync()");
return flowResult + " -> supplyAsync()";
}, es).thenApply((res) -> {
Util.logging(msg: "thenApply()");
return res + " -> thenApply()";
});
[pool-1-thread-1] supplyAsync()
[pool-1-thread-1] thenApply()
[main] START -> supplyAsync() -> thenApply()
위 코드는 thenApply()
메서드 사용 예제와 수행 결과다.
ExecutorService를 지정해서 supplyAsync()
가 지정한 스레드풀을 활용해서 동작하게 했다.
어떤 스레드에서 수행됐는 지 유심히 살펴보자.
supplyAsync()
가 사용한 스레드를 넘겨받아서 사용하고 있다.
만약 thenApplyAsync()
메서드였다면 어떻게 동작했을까?
String flowResult = "START";
ExecutorService es = Executors.newFixedThreadPool(nThreads: 1);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Util.logging(msg: "supplyAsync()");
return flowResult + " -> supplyAsync()";
}, es).thenApplyAsync((res) -> {
Util.logging(msg: "thenApplyAsync()");
return res + " -> thenApplyAsync()";
});
[pool-1-thread-1] supplyAsync()
[ForkJoinPool.commonPool-worker-1] thenApplyAsync()
[main] START -> supplyAsync() -> thenApplyAsync()
위 코드는 thenApplyAsync()
메서드 사용 예제와 수행 결과다.
수행 순서나 다른 동작은 동일하지만 작업을 수행한 스레드가 달라졌다.
ForkJoinPool이라는 CompletableFuture에서 디폴트로 정의된 스레드풀을 사용했다.
이처럼 Async가 붙지 않은 콜백 메서드의 경우, 현재 사용중인 스레드 중에서 하나를 이어받아서 사용한다.
하지만 Async가 붙은 콜백 메서드의 경우, 지정된 스레드풀을 사용한다.
만약 지정된 스레드풀이 없는 경우 CompletableFuture의 디폴트 스레드풀인 ForkJoinPool을 사용한다.
비동기 작업을 수행하는 스레드에서 예외가 발생하면 어떻게 컨트롤할 수 있을까?
exceptionally()
메서드를 활용할 수 있다.
예외 처리 메서드 역시 콜백 함수와 동일하게 기본 메서드와 Async 메서드가 존재한다.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Util.logging(msg: "Async task 1");
return flowResult + " -> supplyAsync()";
}).thenApplyAsync((res) -> {
Util.logging(msg: "Async task 2");
int num = 12 / 0; // OCCUR ERROR!!
return res + " -> thenApplyAsync()";
}).exceptionallyAsync((e) -> {
e.printStackTrace();
return null;
});
[ForkJoinPool.commonPool-worker-1] Async task 1
[ForkJoinPool.commonPool-worker-1] Async task 2
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
at java.base.java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
at java.base.java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1491)
at java.base.java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:2073)
at java.base.java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2035)
at java.base.java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)
위 예제에서는 예외를 발생시키기 위해 0으로 나누는 연산을 수행했다.
exceptionallyAsync()
메서드에서 스택 트레이스를 출력해주고 있고, 콘솔에서 실제로 확인되고 있다.