CompletableFuture [CompletableFuture 연산자] (4/5)

세젤게으름뱅이·2025년 4월 8일

Spring Webflux

목록 보기
7/16

supplyAsync vs runAsync

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {}
    public static CompletableFuture<Void> runAsync(Runnable runnable) {}
    public boolean complete(T value) {}
    public boolean isCompletedExceptionally() {}
}
public static CompletableFuture<Void> allOf(CompletableFuture<?> ... cfs) {}
public static CompletableFuture<Object> anyOf(CompletableFuture<?> ... cfs) {}

supplyAsync

  • Supplier를 제공하여 CompletableFuture를 생성 가능하게 함
    • Supplier<T'>는 아무런 인자 없이 값을 만들고 반환하는 인터페이스.
@FunctionalInterface 
public interface Supplier<T> { 
	T get(); 
}
  • Supplier의 반환값이 CompletableFuture의 결과로
    • supplyAsync는 비동기로 실행하여 값을 만들어주는 작업을 하고 CompletableFuture로 감싼다.
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {}

runAsync

  • Runnable을 제공하여 CompletableFuture를 생성 가능하게 함
@FunctionalInterface 
public interface Supplier<T> { 
	T get(); 
}
  • 값을 반환하지 않는다.
    • 받는 값도, 반환할 값도 X
  • 다음 task에 null이 전달된다.
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {}
  • Runnable의 작업이 종료되면, 그 다음을 처리할래? 뉘앙스의 사건전달

코드비교

var future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return 1;			//  CompletableFutute<Integer>
}); 
assert !future.isDone(); 
Thread.sleep(1000); 
assert future.isDone(); 
assert future.get() == 1; 
  1. supplyAsync의 리턴타입은 현재 CompletableFuture<Integer;>
  2. 100ms간 sleep 이기 때문에 최초 isDone == false
  3. 1000ms 대기 후 isDone == true
  4. 리턴된 값을 future에서 get()으로 꺼내어 비교
var future = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}); 
assert !future.isDone(); 
Thread.sleep(1000); 
assert future.isDone(); 
assert future.get() == null; 
  1. runAsync은 리턴 X
  2. 100ms간 sleep 이기 때문에 최초 isDone == false
  3. 1000ms 대기 후 isDone == true
  4. 리턴된 값을 future에서 get()으로 꺼내어 비교 (null)

complete

CompletableFuture<Integer> future = new CompletableFuture <>(); // 빈 future
assert !future.isDone();  				// 비어있는 future이기 때문에 isDone 판단 X 
  
var triggered = future.complete(1); 	// future에 1로 명시
assert future.isDone(); 				// true
assert triggered; 						// true, 완료되지 않았던 future가 완료되어서
assert future.get() == 1;				// true
  
triggered = future.complete(2); 		// false, 해당 future은 위에서 이미 완료됨
assert future.isDone(); 				// true, 위에서 이미 완료됨
assert !triggered; 						// false, future.complete(2) 실패 
assert future.get() == 1;				// true, future의 결과는 그대로
  • CompletableFuture가 완료되지 않았다면, 주어진 값으로 채울 수 있다.
  • complete에 의해 값이 바뀌면 true, 아니라면 false 반환

Future 상태 vs CompletableFuture 상태 비교

future

  • isDone == true 시점에서 completed일 때, complete 혹은 exception이기 때문에 무조건 complete라고만 간주할 수 없다.

CompletableFuture

  • isDone == true 시점에 completed, canceld 외에 exceptionally가 추가 되었다.
    • isDone() 시점에서 예외인지, 성공인지 구분이 가능해진다.
  • exceptionally의 결과는 isCompletedExceptionally()로 확인이 가능해졌다.
  • isCanceled도, isCompletedExceptionally도 아니라면 completed를 보장 가능해진다.

isCompletedExceptionally

  • Exception에 의해서 complete 되었는지 확인할 수 있다.
var futureWithException = CompletableFuture.supplyAsync(() -> { 
	return 1 / 0; 					// false, supplyAsync()에 의해 리턴은 CompletableFuture<Boolean>
}); 
Thread.sleep(100); 
assert futureWithException.isDone(); // true
assert futureWithException.isCompletedExceptionally(); // true, complete 되었으니



allOf

CompletableFuture는 전부 다른 Thread에서 실행됨.

  • 여러 completableFuture를 모아서, 하나의 completableFuture로 변환할 수 있다.
  • 모든 completableFuture가 완료된다면, 상태가 done으로 변경
  • Void를 변환하므로, 각각의 값에 get()으로 접근해야 한다.
   public static CompletableFuture<Void> allOf(CompletableFuture<?> ... cfs) {}
  • allOf를 통해, 모든 completableFuture가 done이 된 시점을 유추할 수 있다.
    • "다 끝났음" 정도의 뉘앙스로 이해하자.
var startTime = System.currentTimeMillis();
var firstFuture = Helper.waitAndReturn(100, 1);		// future  반환
var secondFuture = Helper.waitAndReturn(500, 2);	// future  반환
var thirdFuture = Helper.waitAndReturn(1000, 3);	// future  반환
  
CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture) 	// allOf는  합쳐진 CompletableFuture를 반환
.thenAcceptAsync(v -> {								// allOf는 CompletableFuture<Void>를 리턴하기에 v는 null
        log.info("after allOf"); 	
try {
		// v에서 각 future를 가져올 수 없기에, 직접 future에서 get()으로 접근
        log.info("first: {}", firstFuture.get());	
        log.info("second: {}", secondFuture.get());
        log.info("third: {}", thirdFuture.get());
        } catch (Exception e) {
        throw new RuntimeException(e); 
}
        }).join();
var endTime = System.currentTimeMillis(); 

결과

  • Thread 관점
    • 1~3번 future는 각 다른 thread에서 작동을 했다.
    • 그리고 allOf로 하나의 future에 합쳐짐
    • allOf 실행 시점에는 뭉쳐진 한 future에서 실행이 될 것
  • 실행소요 시간이 1014ms인 이유
    • 100ms 완료 -> 500ms 완료 -> 1000ms 완료 -> 1600ms 소요 개념 X
    • 병렬적으로 실행되기 때문에 1000ms 소요 예상
    • 100ms, 500ms 소요시간의 future가 완료되어도, 가장 느린 1000ms의 작업 때문에 앞선 2개의 future는 대기를 하게 됨.



anyOf

CompletableFuture는 전부 다른 Thread에서 실행됨.

  • 여러 completableFuture를 모아서, 하나의 completableFuture로 변환할 수 있다.
  • 주어진 future 중 하나라도 완료되면 상태가 done으로 변경
  • 제일 먼저 done 상태가 되는 future의 값을 반환
  • Object형으로 반환된다. allOf와는 달리 리턴값에 접근 가능
   public static CompletableFuture<Object> anyOf(CompletableFuture<?> ... cfs) {}
var startTime = System.currentTimeMillis();
var firstFuture = Helper.waitAndReturn(100, 1);		// future  반환
var secondFuture = Helper.waitAndReturn(500, 2);	// future  반환
var thirdFuture = Helper.waitAndReturn(1000, 3); 	// future  반환
CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture) // any
.thenAcceptAsync(v -> {
        log.info("after anyOf"); 
log.info("first value: {}", v); 
}).join();
var endTime = System.currentTimeMillis(); 
log.info("elapsed: {}ms", endTime - startTime); 

결과

  • Thread 관점
    • 1~3번 future는 각 다른 thread에서 작동을 할 것이다.
    • anyOf 실행 시점은 가장 빨리 끝난 작업이 반환될 시점
  • 실행소요 시간이 114ms인 이유
    • 가장 빠른 작업인 1번 future가 바로 done이 되어 반환됨

CompletableFuture의 한계

  • 지연 로딩을 제공하지 않는다.
    • CompletableFuture를 반환하는 함수를 호출시, 즉시 작업이 실행된다.
    • 별도의 Thread에서 sleep()하면 되지 않을까? --> 이것 또한 별도의 Thread에서 작업되기 때문에 지연로딩으로 보기 힘들다.
  • 지속적으로 생성되는 데이터를 처리하기 어렵다.
    • CompletableFuture에서 데이터를 반환하고 나면, 다시 다른 값을 전달하기 어려움
    • ex) 데이터를 한번에 몰아서 주는 게 아니라, 받고 -> 처리, 받고 -> 처리 이런식이라고 가정했을 때. CompletableFuture에서 값을 한번에 모아서 complete를 시키던가 해야하는데 그러지 못 하는 제약이 있음.
profile
🤦🏻‍♂️

0개의 댓글