CompletableFuture [CompletionStage] (4/2)

세젤게으름뱅이·2025년 4월 7일

Spring Webflux

목록 보기
5/16

CompletionStage 인터페이스

public interface CompletionStage<T> {
  public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
  public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn);

  public CompletionStage<Void> thenAccept(Consumer<? super T> action);
  public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

  public CompletionStage<Void> thenRun(Runnable action);
  public CompletionStage<Void> thenRunAsync(Runnable action);

  public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
  public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);

  public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
}

CompletionStage 연산자 chaining

  • 50개에 가까운 연산자들을 활용하여 비동기 task들을 실행하고 값을 변형하는 등 chaining을 이용한 조합 가능
    • 비동기 non-Blocking 개발 지원에 큰 역할을 한다.
  • 에러를 처리하기 위한 콜백 제공
Helper.completionStage() 
		  .thenApplyAsync(value -> {
        log.info("thenApplyAsync: {}", value); 
return value + 1;
        }).thenAccept(value -> {
        log.info("thenAccept: {}", value); 
}).thenRunAsync(() -> {
        log.info("thenRun"); 
}).exceptionally(e -> {
        log.info("exceptionally: {}", e.getMessage());
        return null;
        });
Thread.sleep(100); 
// 위와 같이 콜백을 여러개 등록할 수 있다.

ForkJoinPool

thread pool

  • CompletableFuture는 내부적으로 비동기 함수들을 실행하기 위해, ForkJoinPool을 사용
  • ForkJoinPool의 기본 size = 할당된 CPU 코어 -1
  • 데몬 스레드. main 스레드가 종료되면 즉각 종료
    • 보통은 worker 스레드가 돌고 있으면, main 스레드가 종료없이 대기한다. 하지만 데몬스레드는 main이 종료되면 본인의 진행 여부를 따지지 않고 종료시킴.

fork & join

  • ForkJoinPool은 작업을 쪼개서 병렬 처리를 하고 합친다. 내부적으로 work stealing을 활용해 바쁜 Thread를 도울 것이다.
  • Task를 for를 통해 subtask로 나누고
  • Thread pool에서 steal work 알고리즘을 이용하여 균등하게 처리해서
    • work stealing은 작업이 비는 Thread가 바쁜 Thread의 일을 "훔쳐오는" 방식이다.
  • join을 통해서 결과를 생성

CompletionStage 연산자

함수형 인터페이스

  • 함수형 프로그래밍을 지원하기 위해서 Java 8부터 도입
  • 1개의 추상 메소드를 갖고 있는 인터페이스
  • 함수를 1급 객체로 사용할 수 있다.
    • 함수를 변수에 할당
    • 함수를 인자로 전달
    • 함수를 반환값으로 사용
  • Function, Consumer, Supplier, Runnable . . .
  • 함수형 인터페이스를 구현한 익명 클래스를 람다식으로 변경 가능
@FunctionalInterface 
public interface Function<T, R> { 
	R apply(T t); 
}
  • 함수형 인터페이스는 호출한 Thread에서 실행된다.
public static void main(String[] args) {
    var consumer = getConsumer();
    consumer.accept(1);
    var consumerAsLambda = getConsumerAsLambda();
    consumerAsLambda.accept(1);
    handleConsumer(consumer);
}
public static Consumer<Integer> getConsumer() {
    Consumer<Integer> returnValue = new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) {
            log.info("value in interface: {}", integer);
        }
    };
    return returnValue;
}
public static Consumer<Integer> getConsumerAsLambda() {
    return integer -> log.info("value in lambda: {}", integer);
}
public static void handleConsumer(Consumer<Integer> consumer) {
    log.info("handleConsumer");
    consumer.accept(1);
}

CompletionStage 연산자와 연결

  • 유추해볼 수 있는 건, 추상메소드에 따른 이름이라고 유추해볼 수 있겠다.
  • thenAccept(Consumer action)은 값이 들어오고, 리턴이 없을거라 추측
    • Consumer의 accept 메소드는 void를 반환함
  • thenApply(Function fn)은 값이 들어오고, 변환되어 나올거라 추측
  • thenRun(Runnable action)은 인자를 안 받고, 액션만 취할거라 추측
profile
🤦🏻‍♂️

0개의 댓글