CompletionStage 연산자 및 thenXXX vs thenXXXAsync의 차이를 thread 관점에서 다뤄보려 한다.
- 간단한 Helper 클래스 코드
- finishedStage : 1을 반환하는 완료된 CompletableFuture 반환
- CompletionStage 연산자 호출시, 이미 결과가 있음. (isDone = true)
- runningStage : 1초를 sleep한 후 1을 반환하는 completableFuture
- CompletionStage 연산자 호출시, 아직 미완. 비동기 수행 중 (isDone = false)
@SneakyThrows
public static CompletionStage<Integer> finishedStage() {
var future = CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
});
Thread.sleep(100);
return future;
}
public static CompletionStage<Integer> runningStage() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
log.info("I'm running!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
});
}
thenAccept
thenAccept[Async]
- Conumser를 파라미터로 받는다.
- 이전 task로부터 값을 받지만, 값을 넘기지는 않는다.
- 때문에 다음 task에 null이 전달된다.
- 값을 받아서 action만 수행하는 경우에 유용하다.
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
CompletionStage<Void> thenAccept(Consumer<? super T> action);
CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
}
thenAccept vs thenAcceptAsync (finishedStage)
thenAccept
- finishedStage()에서 done이 되면 thenAccept 콜백 실행
- 하지만 실행 스레드는 Main 스레드
- chaining을 했지만 결국 다음 task에게는 값이 전달되지 않음 (null)
log.info("start main");
CompletionStage<Integer> stage = Helper.finishedStage();
stage.thenAccept(i -> {
log.info("{} in thenAccept", i);
}).thenAccept(i -> {
log.info("{} in thenAccept2", i);
});
log.info("after thenAccept");
Thread.sleep(100);
thenAcceptAsync
- finishedStage()에서 done이 되면 thenAcceptAsync 콜백 실행
- ForkJoinPool에서 할당받은 스레드 실행
- chaining을 했지만 결국 다음 task에게는 값이 전달되지 않음 (null)
log.info("start main");
CompletionStage<Integer> stage = Helper.finishedStage();
stage.thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync2", i);
});
log.info("after thenAccept");
Thread.sleep(100);
스레드 관점에서의 차이점?
- done 상태에서 thenAccept는 caller (main)의 스레드에서 실행
- done 상태의 completionStage에 thenAccept를 사용하는 경우, caller 스레드를 block 할 수 있음.
- ex) finishedStage() 연산이 시간이 오래 걸렸다고 가정하면, 그 사이 main 스레드는 계속 block

결론
- CompletionStage 자체는 비동기처리 O
- finishedStage()는 이미 Done 상태
- 작업이 가벼우면 thenAccept, 무겁거나 비동기성을 살리려면 thenAcceptAsync
- thenAccept는 앞선 작업의 스레드에서 후속 작업 실행 (앞선 작업 = caller)
- 불필요한 스레드 전환 없이 빠르게 실행 가능
- 작은 후속 작업에 적합 (ex. 로그 출력, 단순 연산)
- thenAcceptAsync는 별도 스레드 (executor / ForkJoinPool에서 후속 작업 실행
- 작업이 무겁거나 blocking 가능성이 있을 때 유용
- 별도 스레드에서 처리하니까, 다른 작업에 영향 안 줌
thenAccept vs thenAcceptAsync (runningStage)
thenAccept
- runningStage()은 1초 후 리턴하기에, done 상태가 아니다.
- 하지만 실행 스레드는 callee의 스레드
- callee의 스레드 = supplyAsync에서 생성된 ForkJoinPool
- chaining을 했지만 결국 다음 task에게는 값이 전달되지 않음 (null)
log.info("start main");
CompletionStage<Integer> stage = Helper.runningStage();
stage.thenAccept(i -> {
log.info("{} in thenAccept", i);
}).thenAccept(i -> {
log.info("{} in thenAccept2", i);
});
Thread.sleep(2000);
thenAcceptAsync
- runningStage()은 1초 후 리턴하기에, done 상태가 아니다.
- ForkJoinPool에서 할당받은 스레드 실행
- chaining을 했지만 결국 다음 task에게는 값이 전달되지 않음 (null)
log.info("start main");
CompletionStage<Integer> stage = Helper.runningStage();
stage.thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
});
Thread.sleep(2000);
스레드 관점에서의 차이점?
- done 상태가 아닐 때, thenAccept는 callee의 스레드에서 실행
- done 상태가 아닐 때, completionStage에 thenAccept를 사용하는 경우 callee 스레드를 block 할 수 있음.

결론
- CompletionStage 자체는 비동기처리 O
- 작업이 가벼우면 thenAccept, 무겁거나 비동기성을 살리려면 thenAcceptAsync
- thenAccept는 앞선 작업의 스레드에서 후속 작업 실행 (현재 앞선 작업 = callee)
- 불필요한 스레드 전환 없이 빠르게 실행 가능
- 작은 후속 작업에 적합 (ex. 로그 출력, 단순 연산)
- thenAcceptAsync는 별도 스레드 (executor / ForkJoinPool에서 후속 작업 실행
- 작업이 무겁거나 blocking 가능성이 있을 때 유용
- 별도 스레드에서 처리하니까, 다른 작업에 영향 안 줌
thenXXX[Async]의 실행 스레드

- 가장 큰 문제점은, thenXXX는 stage의 상태에 따라 caller 스레드인지, callee 스레드인지 나뉘게 되는데, 코드 상에서는 어느 스레드에서 콜백이 동작할지 유추하기 어려움
- 추가적으로 caller와 callee에 대한 blocking까지 생길 수 있음.
- 그에 반면 async들은 스레드 풀에서 할당받기에 유추가 가능함.
thenXXXAsync의 스레드풀 변경
- 모든 thenXXXAsync 연산자는 두번째 인자로 executor를 추가로 받는다.
- 이를 통해서 다른 스레드풀로 task를 실행할 수 있다.
- 특정한 콜백을 다른 스레드에서 쓰고 싶을 때 사용 가능
var single = Executors.newSingleThreadExecutor();
var fixed = Executors.newFixedThreadPool(10);
log.info("start main");
CompletionStage<Integer> stage = Helper.completionStage();
stage.thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}, fixed).thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync2", i);
}, single);
log.info("after thenAccept");
Thread.sleep(200);
single.shutdown();
fixed.shutdown();
thenApply
thenApply[Async]
- Function을 파라미터로 받는다.
- 이전 task로부터 T 타입의 값을 받고, 가공하여 U 타입의 값을 반환한다.
- 다음 task에게 반환했던 값이 전달됨.
- 값을 변형해서 전달해야 하는 경우 유용.
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
<U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
}
- ? super T : 더 넓은 범위의 입력 허용 (유연하게)_
- ? extends U : 리턴값은 최소한 U를 보장
thenCompose
thenCompose[Async]
- Function을 파라미터로 받는다.
- T 타입의 값을 받고, 가공하여 U 타입의 CompletionStage를 반환한다.
- 반환한 CompletionStage가 done 상태가 되면 값을 다음 task에 전달한다.
- 다른 future를 반환해야 하는 경우 유용
@FunctionalInterface
public interface Function<T, R> {
default <V> Function<V, R> compose(Function<? super V, ? extends T> before)
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}
}
<U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U >> fn);
<U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U >> fn);
thenCompose가 필요한 경우 (vs thenApply)
thenApply 구조의 문제점
CompletionStage<Integer> stage = CompletableFuture.completedFuture(10);
CompletionStage<CompletionStage<String>> nested = stage.thenApply(i -> {
return CompletableFuture.completedFuture("value: " + i);
});
- CompletableFuture.completedFuture의 리턴타입은 현재 CompletionStage <Integer'>
- stage의 상태가 isDone() == true 일 시점에, 해당 결과 값으로(i, Integer) thenApply의 람다 실행
- thenApply(fn)는 원래 CompletionStage<U'>을 리턴한다.
3-1. 즉, 입력값을 가공한 결과를 CompletionStage로 감싸서 리턴한다.
- 그런데 fn 내부에서 return CompletableFuture.completedFuture('String 값')처럼
이미 CompletionStage<String'>를 반환하면,
이게 다시 CompletionStage<U'>로 감싸지기 때문에
결과는 CompletionStage<CompletionStage<String'>> 같은 중첩 구조가 된다.
- 람다 안에서의 결과인 CompletionStage<String'>에 thenApply의 리턴타입인 CompletionStage<U'>가 한번 더 감싸짐
thenCompose로 해결
CompletionStage<Integer> stage = CompletableFuture.completedFuture(3);
CompletionStage<String> result = stage.thenCompose(i -> {
return CompletableFuture.completedFuture("Value is " + i);
});
- thenApply : T 입력, U 리턴
- thenCompose : T 입력, CompletionStage<U'> 리턴
- thenCompose는 람다가 CompletionStage를 리턴할 경우, 다시 감싸지 않고, 이어서 실행되도록 연결하는 기능
thenRun
thenRun[Async]
- Runnable을 파라미터로 받는다.
- 이전 task로부터 값을 받지 않고, 값을 반환하지 않음.
- 다음 task에게 null이 전달됨.
- future가 완료되었다는 이벤트를 기록할 때 유용.
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
exceptionally
exceptionally
- Function을 파라미터로 받는다.
- 이전 task로부터 exception을 받아서 처리하고 값을 반환한다.
- 다음 task에게 반환된 값을 전달한다.
- future 파이프라인에서 발생한 에러를 처리할 때 유용
@FunctionalInterface
CompletionStage<T> exceptionally(
Function<Throwable, ? extends T> fn);