public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { … } public static CompletableFuture<Void> runAsync(Runnable runnable) { … } public boolean complete(T value) { … } public boolean isCompletedExceptionally() { … } } public static CompletableFuture<Void> allOf(CompletableFuture<?> ... cfs) { … } public static CompletableFuture<Object> anyOf(CompletableFuture<?> ... cfs) { … }
- Supplier를 제공하여 CompletableFuture를 생성 가능하게 함
- Supplier<T'>는 아무런 인자 없이 값을 만들고 반환하는 인터페이스.
@FunctionalInterface public interface Supplier<T> { T get(); }
- Supplier의 반환값이 CompletableFuture의 결과로
- supplyAsync는 비동기로 실행하여 값을 만들어주는 작업을 하고 CompletableFuture로 감싼다.
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { … }
- Runnable을 제공하여 CompletableFuture를 생성 가능하게 함
@FunctionalInterface public interface Supplier<T> { T get(); }
- 값을 반환하지 않는다.
- 받는 값도, 반환할 값도 X
- 다음 task에 null이 전달된다.
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { … }
- Runnable의 작업이 종료되면, 그 다음을 처리할래? 뉘앙스의 사건전달
var future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1; // CompletableFutute<Integer>
});
assert !future.isDone();
Thread.sleep(1000);
assert future.isDone();
assert future.get() == 1;
- supplyAsync의 리턴타입은 현재 CompletableFuture<Integer;>
- 100ms간 sleep 이기 때문에 최초 isDone == false
- 1000ms 대기 후 isDone == true
- 리턴된 값을 future에서 get()으로 꺼내어 비교
var future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assert !future.isDone();
Thread.sleep(1000);
assert future.isDone();
assert future.get() == null;
- runAsync은 리턴 X
- 100ms간 sleep 이기 때문에 최초 isDone == false
- 1000ms 대기 후 isDone == true
- 리턴된 값을 future에서 get()으로 꺼내어 비교 (null)
CompletableFuture<Integer> future = new CompletableFuture <>(); // 빈 future
assert !future.isDone(); // 비어있는 future이기 때문에 isDone 판단 X
var triggered = future.complete(1); // future에 1로 명시
assert future.isDone(); // true
assert triggered; // true, 완료되지 않았던 future가 완료되어서
assert future.get() == 1; // true
triggered = future.complete(2); // false, 해당 future은 위에서 이미 완료됨
assert future.isDone(); // true, 위에서 이미 완료됨
assert !triggered; // false, future.complete(2) 실패
assert future.get() == 1; // true, future의 결과는 그대로
- CompletableFuture가 완료되지 않았다면, 주어진 값으로 채울 수 있다.
- complete에 의해 값이 바뀌면 true, 아니라면 false 반환
![]() | ![]() |
|---|
future
- isDone == true 시점에서 completed일 때, complete 혹은 exception이기 때문에 무조건 complete라고만 간주할 수 없다.
CompletableFuture
- isDone == true 시점에 completed, canceld 외에 exceptionally가 추가 되었다.
- isDone() 시점에서 예외인지, 성공인지 구분이 가능해진다.
- exceptionally의 결과는 isCompletedExceptionally()로 확인이 가능해졌다.
- isCanceled도, isCompletedExceptionally도 아니라면 completed를 보장 가능해진다.
- Exception에 의해서 complete 되었는지 확인할 수 있다.
var futureWithException = CompletableFuture.supplyAsync(() -> { return 1 / 0; // false, supplyAsync()에 의해 리턴은 CompletableFuture<Boolean> }); Thread.sleep(100); assert futureWithException.isDone(); // true assert futureWithException.isCompletedExceptionally(); // true, complete 되었으니
CompletableFuture는 전부 다른 Thread에서 실행됨.
- 여러 completableFuture를 모아서, 하나의 completableFuture로 변환할 수 있다.
- 모든 completableFuture가 완료된다면, 상태가 done으로 변경
- Void를 변환하므로, 각각의 값에 get()으로 접근해야 한다.
public static CompletableFuture<Void> allOf(CompletableFuture<?> ... cfs) { … }
- allOf를 통해, 모든 completableFuture가 done이 된 시점을 유추할 수 있다.
- "다 끝났음" 정도의 뉘앙스로 이해하자.
var startTime = System.currentTimeMillis();
var firstFuture = Helper.waitAndReturn(100, 1); // future 반환
var secondFuture = Helper.waitAndReturn(500, 2); // future 반환
var thirdFuture = Helper.waitAndReturn(1000, 3); // future 반환
CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture) // allOf는 합쳐진 CompletableFuture를 반환
.thenAcceptAsync(v -> { // allOf는 CompletableFuture<Void>를 리턴하기에 v는 null
log.info("after allOf");
try {
// v에서 각 future를 가져올 수 없기에, 직접 future에서 get()으로 접근
log.info("first: {}", firstFuture.get());
log.info("second: {}", secondFuture.get());
log.info("third: {}", thirdFuture.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}).join();
var endTime = System.currentTimeMillis();
결과
- Thread 관점
- 1~3번 future는 각 다른 thread에서 작동을 했다.
- 그리고 allOf로 하나의 future에 합쳐짐
- allOf 실행 시점에는 뭉쳐진 한 future에서 실행이 될 것
- 실행소요 시간이 1014ms인 이유
- 100ms 완료 -> 500ms 완료 -> 1000ms 완료 -> 1600ms 소요 개념 X
- 병렬적으로 실행되기 때문에 1000ms 소요 예상
- 100ms, 500ms 소요시간의 future가 완료되어도, 가장 느린 1000ms의 작업 때문에 앞선 2개의 future는 대기를 하게 됨.
CompletableFuture는 전부 다른 Thread에서 실행됨.
- 여러 completableFuture를 모아서, 하나의 completableFuture로 변환할 수 있다.
- 주어진 future 중 하나라도 완료되면 상태가 done으로 변경
- 제일 먼저 done 상태가 되는 future의 값을 반환
- Object형으로 반환된다. allOf와는 달리 리턴값에 접근 가능
public static CompletableFuture<Object> anyOf(CompletableFuture<?> ... cfs) { … }
var startTime = System.currentTimeMillis();
var firstFuture = Helper.waitAndReturn(100, 1); // future 반환
var secondFuture = Helper.waitAndReturn(500, 2); // future 반환
var thirdFuture = Helper.waitAndReturn(1000, 3); // future 반환
CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture) // any
.thenAcceptAsync(v -> {
log.info("after anyOf");
log.info("first value: {}", v);
}).join();
var endTime = System.currentTimeMillis();
log.info("elapsed: {}ms", endTime - startTime);
결과
- Thread 관점
- 1~3번 future는 각 다른 thread에서 작동을 할 것이다.
- anyOf 실행 시점은 가장 빨리 끝난 작업이 반환될 시점
- 실행소요 시간이 114ms인 이유
- 가장 빠른 작업인 1번 future가 바로 done이 되어 반환됨
- 지연 로딩을 제공하지 않는다.
- CompletableFuture를 반환하는 함수를 호출시, 즉시 작업이 실행된다.
- 별도의 Thread에서 sleep()하면 되지 않을까? --> 이것 또한 별도의 Thread에서 작업되기 때문에 지연로딩으로 보기 힘들다.
- 지속적으로 생성되는 데이터를 처리하기 어렵다.
- CompletableFuture에서 데이터를 반환하고 나면, 다시 다른 값을 전달하기 어려움
- ex) 데이터를 한번에 몰아서 주는 게 아니라, 받고 -> 처리, 받고 -> 처리 이런식이라고 가정했을 때. CompletableFuture에서 값을 한번에 모아서 complete를 시키던가 해야하는데 그러지 못 하는 제약이 있음.