CompletableFuture
CompletableFuture란?
- 자바에서 비동기(Asynchronous) 프로그래밍을 가능하게 하는 인터페이스이다.
- 기존의
Future
를 사용해서도 어느정도 가능했으나, 제약이 많았다.
Future로 어떤 제약이 있었을까?
Future
를 외부에서 완료시킬 수 없다. (단, 취소하거나, get()에 타임아웃을 설정할 수는 있다.)
- 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
Future
에서는 return되는 결과값을 가지고 무언가를 하려면 get()이후에만 가능하다.
- 여러
Future
조합을 사용할 수 없다. (ex. Event 정보를 가져온 다음 Event에 참석하는 회원 목록을 가져오기)
- 예외 처리용 API를 제공하지 않는다.
CompletableFuture의 장점
Future
와 달리 외부에서 명시적으로 Complete를 시켜버릴 수 있다.
- (ex. 몇 초 이내에 응답이 안오면, 특정 값을 리턴)
CompletableFuture
를 사용하면 명시적으로 Executors
를 사용할 필요가 없어진다.
- 그냥
CompletableFuture
만 가지고도 비동기적으로 어떤 작업들을 실행할 수가 있다.
CompletableFuture<String> completableFuture = new CompletableFuture<>();
CompletableFuture<String> completableFuture2 = CompletableFuture.completedFuture("nathan2");
completableFuture.complete("nathan");
System.out.println(completableFuture.get());
System.out.println(completableFuture2.get());
비동기로 작업 실행하기
- 리턴값이 없는 경우: runAsync()
- 리턴값이 있는 경우: supplyAsync()
- 원하는
Executor
(스레드 풀)를 사용해서 실행할 수도 있다.
- 기본은
ForkJoinPool.commonPool()
return type이 없는 작업을 하고 싶다면? - runAsync()
- return이 없기 때문에
get()
또는 join()
을 해야 원하는 작업을 얻을 수 있다.
CompletableFuture<Void> tmp1 = CompletableFuture.runAsync(() -> {
System.out.println("runAsync "+Thread.currentThread().getName());
});
tmp1.get();
>>>
runAsync ForkJoinPool.commonPool-worker-9
return type이 있는 작업을 하고 싶다면? - supplyAsync()
CompletableFuture<String> tmp2 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync " + Thread.currentThread().getName());
return "supplyAsync";
});
System.out.println(tmp2.get( ));
>>>
supplyAsync ForkJoinPool.commonPool-worker-9
supplyAsync
callback을 이용한 작업들
- 지금까지는 코드만 달라졌을 뿐
Future
를 사용했을 때와 거의 같다.
- 비동기적(Asynchronous)으로 callback을 주고 작업을 해보자.
- 여전히
get()
은 호출해야 함에 유의하자.
- 호출하지 않으면, 아무일도 일어나지 않는다.
- 콜백 자체를 또 다른 스레드에서 실행할 수 있다.
callback에 return을 붙이는 경우 - thenApply()
Future
일 때는 이런 작업이 불가능 했음.
- callback을
get()
호출하기 전에 작업하는 것이 불가능.
Function
이 들어감
CompletableFuture<String> tmp3 = CompletableFuture.supplyAsync(()->{
System.out.println("callback " + Thread.currentThread().getName());
return "CallBack";
}).thenApply((s) -> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
});
System.out.println(tmp3.get());
>>>
callback ForkJoinPool.commonPool-worker-9
main
CALLBACK
callback에 return을 붙이지 않는 경우 - thenAccept()
Consumer
가 들어감
- return 값이 없으므로 type은
Void
(최종적으로 실행되는 것에 focus를 맞춘다.)
CompletableFuture<Void> tmp4 = CompletableFuture.supplyAsync(() -> {
System.out.println("callback2 " + Thread.currentThread().getName());
return "callback2";
}).thenAccept((s) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(s.toUpperCase());
});
tmp4.get();
>>>
callback2 ForkJoinPool.commonPool-worker-9
main
CALLBACK2
return 없이 callback으로 넘어오는 경우 - thenRun()
- return 없이 넘어온 callback에 return이 붙지 않는 경우에 쓰인다.
CompletableFuture<Void> tmp5 = CompletableFuture.supplyAsync(() -> {
System.out.println("callback3 " + Thread.currentThread().getName());
return "callback3";
}).thenRun(()->{
System.out.println("thenRun");
});
>>>
callback3 ForkJoinPool.commonPool-worker-9
thenRun
ForkJoinPool
- 어떻게 스레드 풀을 만들지 않고 별도의 스레드들로 동작을 하는걸까?
ForkJoinPool
이란 Executor 구현체 중 하나인데, Deque를 통하여 자기 스레드가 할 일이 없으면, Deque에서 가져와서 처리하는 방식의 프레임 워크이다.
- 자기가 파생시킨 서브 태스크들을 다른 스레드들에 분산시켜 처리하고 모아서 Join하는 식으로 작업단위를 구성한다.
스레드 풀을 직접 만들어 진행해보기
- 원하면, 언제든 스레드 풀을 직접 생성하여 진행할 수 있다.
supplyAsync
호출 할 때 두 번째 인자로 줄 수 있음(runAsync
도 마찬가지)
ExecutorService executorService2 = Executors.newFixedThreadPool(4);
CompletableFuture<String> tmp6 = CompletableFuture.supplyAsync(()->{
System.out.println("ThreadPool " + Thread.currentThread().getName());
return "ThreadPool";
}, executorService2);
System.out.println(tmp6.get());
executorService2.shutdown();
>>>
ThreadPool pool-2-thread-1
ThreadPool
작업들을 조합하기
두 작업을 서로 이어서 실행하려면? - thenCompose()
public class CompletableFutureEx2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(()->{
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = hello.thenCompose(CompletableFutureEx2::getWorld);
System.out.println(world.get());
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(message + Thread.currentThread().getName());
return message+ " World";
});
}
}
>>>
Hello ForkJoinPool.commonPool-worker-9
HelloForkJoinPool.commonPool-worker-9
Hello World
두 작업을 독립적으로 실행하고, 둘 다 종료 했을 때 콜백을 실행하려면? - thenCombine()
- 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백을 실행한다.
- 두 작업이 서로 연관관계(의존성)는 없지만, 동시에 따로따로 실행하는 방법 :
thenCombine + biFunction
public class CompletableFutureEx2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(()->{
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = hello.thenCompose(CompletableFutureEx2::getWorld);
CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> {
return h+" "+w;
});
System.out.println(future.get());
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(message + Thread.currentThread().getName());
return message+ " World";
});
}
}
>>>
Hello Hello World
여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행하려면? - allOf()
- 여러 작업을 모두 실행하고 모든 작업 결과에 콜백을 실행한다.
- 두 개 이상의 서브 태스크들을 모두 합쳐서 실행하는 방법
List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray)
.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
results.get().forEach(System.out::println);
>>>
Hello
Hello World
여러 작업 중 가장 빨리 끝난 하나의 결과에 콜백을 실행하려면? - anyOf()
- 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백을 실행한다.
CompletableFuture<Void> future3 = CompletableFuture.anyOf(world, hello).thenAccept(System.out::println);
future3.get();
>>>
Hello World
예외 처리
exceptionally
- 비동기적으로 진행되는 task들에서 에러가 발생한다면,
exceptionally
를 통해 에러 타입을 받아서 뭔가를 return하는 Function을 넘겨줄 수 있다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureEx3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if (throwError){
throw new IllegalArgumentException();
}
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
}).exceptionally(ex -> {
System.out.println(ex);
return "Error!";
});
System.out.println(hello.get());
}
}
>>>
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
Error!
handle
exceptionally
보다 조금 더 일반적인 사용이 가능하다.
- BiFunction을 넘겨준다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureEx3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
boolean throwError = false;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if (throwError){
throw new IllegalArgumentException();
}
System.out.println("Hello "+Thread.currentThread().getName());
return "Hello";
}).handle((result, ex) ->{
if (ex != null){
System.out.println(ex);
return "Error!";
}
return result;
});
System.out.println(hello.get());
}
}
>>>
Hello ForkJoinPool.commonPool-worker-9
Hello
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
Error!
함께 보면 좋을 자료들
Reference