CompletionStage 인터페이스
public interface CompletionStage<T> {
public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
}
CompletionStage 연산자 chaining
- 50개에 가까운 연산자들을 활용하여 비동기 task들을 실행하고 값을 변형하는 등 chaining을 이용한 조합 가능
- 비동기 non-Blocking 개발 지원에 큰 역할을 한다.
- 에러를 처리하기 위한 콜백 제공
Helper.completionStage()
.thenApplyAsync(value -> {
log.info("thenApplyAsync: {}", value);
return value + 1;
}).thenAccept(value -> {
log.info("thenAccept: {}", value);
}).thenRunAsync(() -> {
log.info("thenRun");
}).exceptionally(e -> {
log.info("exceptionally: {}", e.getMessage());
return null;
});
Thread.sleep(100);
ForkJoinPool
thread pool
- CompletableFuture는 내부적으로 비동기 함수들을 실행하기 위해, ForkJoinPool을 사용
- ForkJoinPool의 기본 size = 할당된 CPU 코어 -1
- 데몬 스레드. main 스레드가 종료되면 즉각 종료
- 보통은 worker 스레드가 돌고 있으면, main 스레드가 종료없이 대기한다. 하지만 데몬스레드는 main이 종료되면 본인의 진행 여부를 따지지 않고 종료시킴.
fork & join
- ForkJoinPool은 작업을 쪼개서 병렬 처리를 하고 합친다. 내부적으로 work stealing을 활용해 바쁜 Thread를 도울 것이다.
- Task를 for를 통해 subtask로 나누고
- Thread pool에서 steal work 알고리즘을 이용하여 균등하게 처리해서
- work stealing은 작업이 비는 Thread가 바쁜 Thread의 일을 "훔쳐오는" 방식이다.
- join을 통해서 결과를 생성

CompletionStage 연산자
함수형 인터페이스
- 함수형 프로그래밍을 지원하기 위해서 Java 8부터 도입
- 1개의 추상 메소드를 갖고 있는 인터페이스
- 함수를 1급 객체로 사용할 수 있다.
- 함수를 변수에 할당
- 함수를 인자로 전달
- 함수를 반환값으로 사용
- Function, Consumer, Supplier, Runnable . . .
- 함수형 인터페이스를 구현한 익명 클래스를 람다식으로 변경 가능
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}
- 함수형 인터페이스는 호출한 Thread에서 실행된다.
public static void main(String[] args) {
var consumer = getConsumer();
consumer.accept(1);
var consumerAsLambda = getConsumerAsLambda();
consumerAsLambda.accept(1);
handleConsumer(consumer);
}
public static Consumer<Integer> getConsumer() {
Consumer<Integer> returnValue = new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
log.info("value in interface: {}", integer);
}
};
return returnValue;
}
public static Consumer<Integer> getConsumerAsLambda() {
return integer -> log.info("value in lambda: {}", integer);
}
public static void handleConsumer(Consumer<Integer> consumer) {
log.info("handleConsumer");
consumer.accept(1);
}
CompletionStage 연산자와 연결
- 유추해볼 수 있는 건, 추상메소드에 따른 이름이라고 유추해볼 수 있겠다.
- thenAccept(Consumer action)은 값이 들어오고, 리턴이 없을거라 추측
- Consumer의 accept 메소드는 void를 반환함
- thenApply(Function fn)은 값이 들어오고, 변환되어 나올거라 추측
- thenRun(Runnable action)은 인자를 안 받고, 액션만 취할거라 추측