이 포스팅의 코드 및 정보들은 강의를 들으며 정리한 내용을 토대로 작성한 것입니다.
Future만 가지고는 여러 작업들을 이어서 처리하는 게 힘들었다.
비동기적인 작업 여러개를 처리하는 거 자체가 콜백을 줄 수 없기 때문에 쉽지 않았다.
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("[Hello Thread] " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("[World Thread] " + Thread.currentThread().getName());
return "World";
});
이렇게 되면 Hello 끝난 다음에 World가 와야 한다. get()해서 기다려야 한다.
그 다음에 World의 get()을 해야 한다.
이렇게 get()을 2개 쓰는 것보다 thenCompose()를 사용하여 앞의 작업과 뒤의 작업을 이어서 작업을 추가적으로 사용할 수 있다.
world를 hello 다음에 바로 실행시키기 위해 hello의 결과를 참조할 수 있는 파라미터를 참고하도록 메서드를 따로 만든다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("[Hello Thread] " + Thread.currentThread().getName());
return "Hello";
});
// hello.thenCompose(s -> getWorld(s));
hello.thenCompose(Main::getWorld); // Method Reference
}
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("[World Thread] " + Thread.currentThread().getName());
return message + " World";
});
}
이 둘을 연결한 하나의 Future가 나오고, Future를 get()하면 Hello World를 받을 수 있다.
이렇게 이어가는 방식은 두 Future간에 의존성이 있는 경우에 사용되고, 둘이 서로 연관관계가 없는 경우에 사용되는 메서드도 있다.
둘이 서로 연관관계는 없지만, 둘이 동시에 비동기적으로(따로) 실행하는 방법이 있다.
예를 들어, 주식의 경우에는 어느 한쪽이 먼저 와야지 다른 한쪽을 가져오는 구조가 아니라 따로 보내고 둘 다 결과가 왔을 때를 활용하고 싶은 경우이다.
그때는 thenCombine()을 쓰면 된다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("[Hello Thread] " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("[World Thread] " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
}
hello.thenCombine(world, 뭐시기)
에서 뭐시기
는 world랑 hello를 합친 것이고, hello의 결과랑 world의 결과가 둘 다 왔을 때 입력값은 2개, 결과값은 1개인 BiFunction 타입을 반환하는 것이다.
이렇게 하면 둘의 결과가 다 왔을 때 뒤에 있는 BiFunction에 해당하는 람다가 실행된다.
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("[World Thread] " + Thread.currentThread().getName());
return "World";
});
이런 sub task들이 두 개 이상일 때 여러 task를 다 합쳐서 실행할 수 있다. allOf()에 넘긴 모든 task가 다 끝났을 때 thenApply(), thenAccept() 등의 추가적인 callback을 실행할 수 있는 것이다.
하지만, 문제는 어떤 특정한 결과를 가져올 수 없다.
이 노란색 형광으로 칠한 모든 task의 결과가 동일한 type이라는 보장도 없고, 그 중에 어떤 것들은 에러가 났을 수도 있다.
이게 결과값이라는 것 자체가 무의미하다.
실행하면 thenAccept()의 print문에서 null을, future.get()의 print문에서 null이 출력될 것이다.
이거를 제대로(?) 받고 싶다면
예를 들어, 모든 task의 결과값을 Collection으로 만들어서 받고 싶으면
List<CompletableFuture> futures = Arrays.asList(hello, world);
이런 식으로 모든 task를 뭉쳐놓고
CompletableFuture<Void> future = CompletableFuture
.allOf(hello, world) // allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenAccept(System.out::println);
CompletableFuture<Void> future = CompletableFuture.allOf(futuresArray)
.thenApply(v -> {
return futures.stream()
.map(f -> f.get());
});
배열로 만든 futuresArray를 allOf()에 인수로 넘긴 후, 넘겨줬던 모든 future에 stream의 map()을 적용한다. thenApply()가 호출되는 이 시점에는 이미 모든 future의 작업이 다 끝났다. 즉, map()에서 get()을 호출해도 된다.
그런데, get()에는 Checked Exception이 발생할 수 있으며, try~catch문을 걸어놓으면 코드가 너무 길어진다. 이를 대신하며 join()을 사용하면 try~catch를 걸어놓을 필요는 없지만 Unchecked Exception이 발생한다.
.map(CompletableFuture::join)
일단 get() 대신에 join()을 사용할텐데, 이 join()을 사용하면 future에서 반환하는 최종 결과값이 나올 것이다. 그 결과값을 stream의 collect()로 모아서 List로 만든다.
CompletableFuture<List<Object>> results = CompletableFuture.allOf(futuresArray)
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
이렇게 하면 결과를 가져올 수 있다.
results.get()을 출력하면 배열안에 인덱스 목록처럼 나온다.
결과값 받아서 전부다 끝났을 때 그 결과값(results)으로 다른 작업을 처리할 수도 있다.
그리고 이때는 아무것도 blocking이 되지 않는다.
anyOf()는 여러 작업 중 빨리 끝나는 것의 결과를 받아서 작업을 하는 거니까, allOf()처럼 List가 오는 것이 아니고 결과 하나가 온다.
CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
future.get();
둘 중 아무거나 하나 먼저 오는 게 출력된다.
public static void main(String[] args) throws ExecutionException, InterruptedException {
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if(throwError) {
throw new IllegalArgumentException();
}
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
}
비동기적으로 실행한 어떤 task안에서 에러가 발생한다면 exceptionally()에서 에러 타입을 받아서 무언가를 반환한다. 이때 Function을 넘겨줄 수 있는데, 이 Function으로 어떤 default값을 넘겨주면 된다.
값을 넘겨주게 되면, hello의 get()으로 결과를 얻다가 에러로 던져지고
.exceptionally(ex -> {
System.out.println(ex); // ex는 Exception Type
return "Error!!!";
});
이 부분으로 들어오게 된다.
결과값을 Error로 반환했으니까 if(throwError) {}
if문으로 들어오게 되는 것이다.
에러가 없으면 에러를 던지지 않고 정상적으로 값을 출력한다.
정상적으로 종료됐을 때와 에러가 발생했을 때 두 경우 모두 사용할 수가 있으며, BiFunction이 들어온다.
.handle((result, ex) -> {
return result;
});
첫 번째 파라미터는 정상적인 경우의 결과값, 두 번째 파라미터는 Exception이 발생했을 때 error를 받는다.
정상적인 경우에는 result를 반환하면 되지만 에러가 있다면 "Error!!!"라고 반환하면 아까 전에 봤던 exceptionally()와 같은 결과가 나오게 된다.