CompletableFuture [ForkJoinPool 관점에서 CompletionStage] (4/4)

세젤게으름뱅이·2025년 4월 7일

Spring Webflux

목록 보기
6/16

CompletionStage 연산자 및 thenXXX vs thenXXXAsync의 차이를 thread 관점에서 다뤄보려 한다.

  • 간단한 Helper 클래스 코드
  • finishedStage : 1을 반환하는 완료된 CompletableFuture 반환
    • CompletionStage 연산자 호출시, 이미 결과가 있음. (isDone = true)
  • runningStage : 1초를 sleep한 후 1을 반환하는 completableFuture
    • CompletionStage 연산자 호출시, 아직 미완. 비동기 수행 중 (isDone = false)
@SneakyThrows
public static CompletionStage<Integer> finishedStage() {
    var future = CompletableFuture.supplyAsync(() -> {
        log.info("supplyAsync");
        return 1;
    });
    Thread.sleep(100);
    return future;
}
public static CompletionStage<Integer> runningStage() {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
            log.info("I'm running!");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return 1;
    });
} 

thenAccept

thenAccept[Async]

  • Conumser를 파라미터로 받는다.
  • 이전 task로부터 값을 받지만, 값을 넘기지는 않는다.
  • 때문에 다음 task에 null이 전달된다.
  • 값을 받아서 action만 수행하는 경우에 유용하다.
@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
CompletionStage<Void> thenAccept(Consumer<? super T> action); 
CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
}

thenAccept vs thenAcceptAsync (finishedStage)

  • thenAccept

      1. finishedStage()에서 done이 되면 thenAccept 콜백 실행
      1. 하지만 실행 스레드는 Main 스레드
      1. chaining을 했지만 결국 다음 task에게는 값이 전달되지 않음 (null)
log.info("start main");
CompletionStage<Integer> stage = Helper.finishedStage();
stage.thenAccept(i -> {
        log.info("{} in thenAccept", i);
}).thenAccept(i -> {
        log.info("{} in thenAccept2", i);
});
        log.info("after thenAccept");
Thread.sleep(100);
  • thenAcceptAsync

      1. finishedStage()에서 done이 되면 thenAcceptAsync 콜백 실행
      1. ForkJoinPool에서 할당받은 스레드 실행
      1. chaining을 했지만 결국 다음 task에게는 값이 전달되지 않음 (null)
log.info("start main");
CompletionStage<Integer> stage = Helper.finishedStage();
stage.thenAcceptAsync(i -> {
        log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i -> {
        log.info("{} in thenAcceptAsync2", i);
});
        log.info("after thenAccept");
Thread.sleep(100);

스레드 관점에서의 차이점?

  • done 상태에서 thenAccept는 caller (main)의 스레드에서 실행
  • done 상태의 completionStage에 thenAccept를 사용하는 경우, caller 스레드를 block 할 수 있음.
    • ex) finishedStage() 연산이 시간이 오래 걸렸다고 가정하면, 그 사이 main 스레드는 계속 block

결론

  • CompletionStage 자체는 비동기처리 O
  • finishedStage()는 이미 Done 상태
  • 작업이 가벼우면 thenAccept, 무겁거나 비동기성을 살리려면 thenAcceptAsync
  • thenAccept는 앞선 작업의 스레드에서 후속 작업 실행 (앞선 작업 = caller)
    • 불필요한 스레드 전환 없이 빠르게 실행 가능
    • 작은 후속 작업에 적합 (ex. 로그 출력, 단순 연산)
  • thenAcceptAsync는 별도 스레드 (executor / ForkJoinPool에서 후속 작업 실행
    • 작업이 무겁거나 blocking 가능성이 있을 때 유용
    • 별도 스레드에서 처리하니까, 다른 작업에 영향 안 줌

thenAccept vs thenAcceptAsync (runningStage)

  • thenAccept

      1. runningStage()은 1초 후 리턴하기에, done 상태가 아니다.
      1. 하지만 실행 스레드는 callee의 스레드
      • callee의 스레드 = supplyAsync에서 생성된 ForkJoinPool
      1. chaining을 했지만 결국 다음 task에게는 값이 전달되지 않음 (null)
log.info("start main");
CompletionStage<Integer> stage = Helper.runningStage(); 
stage.thenAccept(i -> {
        log.info("{} in thenAccept", i); 
}).thenAccept(i -> {
        log.info("{} in thenAccept2", i); 
});
Thread.sleep(2000); 
  • thenAcceptAsync

      1. runningStage()은 1초 후 리턴하기에, done 상태가 아니다.
      1. ForkJoinPool에서 할당받은 스레드 실행
      1. chaining을 했지만 결국 다음 task에게는 값이 전달되지 않음 (null)
log.info("start main");
CompletionStage<Integer> stage = Helper.runningStage(); 
stage.thenAcceptAsync(i -> {
        log.info("{} in thenAcceptAsync", i); 
}).thenAcceptAsync(i -> {
        log.info("{} in thenAcceptAsync", i); 
});
Thread.sleep(2000); 

스레드 관점에서의 차이점?

  • done 상태가 아닐 때, thenAccept는 callee의 스레드에서 실행
  • done 상태가 아닐 때, completionStage에 thenAccept를 사용하는 경우 callee 스레드를 block 할 수 있음.

결론

  • CompletionStage 자체는 비동기처리 O
  • 작업이 가벼우면 thenAccept, 무겁거나 비동기성을 살리려면 thenAcceptAsync
  • thenAccept는 앞선 작업의 스레드에서 후속 작업 실행 (현재 앞선 작업 = callee)
    • 불필요한 스레드 전환 없이 빠르게 실행 가능
    • 작은 후속 작업에 적합 (ex. 로그 출력, 단순 연산)
  • thenAcceptAsync는 별도 스레드 (executor / ForkJoinPool에서 후속 작업 실행
    • 작업이 무겁거나 blocking 가능성이 있을 때 유용
    • 별도 스레드에서 처리하니까, 다른 작업에 영향 안 줌

thenXXX[Async]의 실행 스레드

  • 가장 큰 문제점은, thenXXX는 stage의 상태에 따라 caller 스레드인지, callee 스레드인지 나뉘게 되는데, 코드 상에서는 어느 스레드에서 콜백이 동작할지 유추하기 어려움
    • 추가적으로 caller와 callee에 대한 blocking까지 생길 수 있음.
  • 그에 반면 async들은 스레드 풀에서 할당받기에 유추가 가능함.

thenXXXAsync의 스레드풀 변경

  • 모든 thenXXXAsync 연산자는 두번째 인자로 executor를 추가로 받는다.
  • 이를 통해서 다른 스레드풀로 task를 실행할 수 있다.
  • 특정한 콜백을 다른 스레드에서 쓰고 싶을 때 사용 가능
var single = Executors.newSingleThreadExecutor();
var fixed = Executors.newFixedThreadPool(10); 
log.info("start main");
CompletionStage<Integer> stage = Helper.completionStage(); 
stage.thenAcceptAsync(i -> {
        log.info("{} in thenAcceptAsync", i); 
}, fixed).thenAcceptAsync(i -> {
        log.info("{} in thenAcceptAsync2", i); 
}, single);
        log.info("after thenAccept"); 
Thread.sleep(200); 
single.shutdown(); 
fixed.shutdown();

thenApply

thenApply[Async]

  • Function을 파라미터로 받는다.
  • 이전 task로부터 T 타입의 값을 받고, 가공하여 U 타입의 값을 반환한다.
  • 다음 task에게 반환했던 값이 전달됨.
  • 값을 변형해서 전달해야 하는 경우 유용.
@FunctionalInterface
public interface Function<T, R> { 
		R apply(T t);
<U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); 
<U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
}
  • ? super T : 더 넓은 범위의 입력 허용 (유연하게)_
  • ? extends U : 리턴값은 최소한 U를 보장

thenCompose

thenCompose[Async]

  • Function을 파라미터로 받는다.
  • T 타입의 값을 받고, 가공하여 U 타입의 CompletionStage를 반환한다.
  • 반환한 CompletionStage가 done 상태가 되면 값을 다음 task에 전달한다.
  • 다른 future를 반환해야 하는 경우 유용
@FunctionalInterface 
public interface Function<T, R> { 
    default <V> Function<V, R> compose(Function<? super V, ? extends T> before) 
        Objects.requireNonNull(before); 
        return (V v) -> apply(before.apply(v)); 
    } 
}
<U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U >> fn); 
<U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U >> fn);

thenCompose가 필요한 경우 (vs thenApply)

thenApply 구조의 문제점

CompletionStage<Integer> stage = CompletableFuture.completedFuture(10);
CompletionStage<CompletionStage<String>> nested = stage.thenApply(i -> {
    return CompletableFuture.completedFuture("value: " + i);
});
  1. CompletableFuture.completedFuture의 리턴타입은 현재 CompletionStage <Integer'>
  2. stage의 상태가 isDone() == true 일 시점에, 해당 결과 값으로(i, Integer) thenApply의 람다 실행
  3. thenApply(fn)는 원래 CompletionStage<U'>을 리턴한다.
    3-1. 즉, 입력값을 가공한 결과를 CompletionStage로 감싸서 리턴한다.
  4. 그런데 fn 내부에서 return CompletableFuture.completedFuture('String 값')처럼
    이미 CompletionStage<String'>를 반환하면,
    이게 다시 CompletionStage<U'>로 감싸지기 때문에
    결과는 CompletionStage<CompletionStage<String'>> 같은 중첩 구조가 된다.
  5. 람다 안에서의 결과인 CompletionStage<String'>에 thenApply의 리턴타입인 CompletionStage<U'>가 한번 더 감싸짐

thenCompose로 해결

CompletionStage<Integer> stage = CompletableFuture.completedFuture(3);
CompletionStage<String> result = stage.thenCompose(i -> {
    return CompletableFuture.completedFuture("Value is " + i);
});
  1. thenApply : T 입력, U 리턴
  2. thenCompose : T 입력, CompletionStage<U'> 리턴
  3. thenCompose는 람다가 CompletionStage를 리턴할 경우, 다시 감싸지 않고, 이어서 실행되도록 연결하는 기능

thenRun

thenRun[Async]

  • Runnable을 파라미터로 받는다.
  • 이전 task로부터 값을 받지 않고, 값을 반환하지 않음.
  • 다음 task에게 null이 전달됨.
  • future가 완료되었다는 이벤트를 기록할 때 유용.
@FunctionalInterface 
public interface Runnable { 
	public abstract void run(); 
} 
public CompletionStage<Void> thenRun(Runnable action); 
public CompletionStage<Void> thenRunAsync(Runnable action); 

exceptionally

exceptionally

  • Function을 파라미터로 받는다.
  • 이전 task로부터 exception을 받아서 처리하고 값을 반환한다.
  • 다음 task에게 반환된 값을 전달한다.
  • future 파이프라인에서 발생한 에러를 처리할 때 유용
@FunctionalInterface 
CompletionStage<T> exceptionally( 
Function<Throwable, ? extends T> fn);
profile
🤦🏻‍♂️

0개의 댓글