CompletableFuture [Executor, Future] (4/1)

세젤게으름뱅이·2025년 4월 4일

Spring Webflux

목록 보기
4/16

CompletableFuture

  • 2014년에 발표된 Java 8에서 처음 도입
  • 비동기 프로그래밍 지원
  • Lambda, Method reference 등 Java 8의 새로운 기능 지원

Method reference

  • :: 연산자를 이용해서 함수에 대한 참조를 간결하게 표현
  • method reference
    • "특정" 객체 자체 메소드에 간단한 참조
  • static method reference
    • static 메소드 참조atic 메소드 참조
  • instance method reference
    • "특쟁 객체 X", 클래스 자체가 가진 인스턴스 메소드를 참조
  • constructor method reference
@RequiredArgsConstructor
public static class Person {
    @Getter
    private final String name;
    public Boolean compareTo(Person o) {
        return o.name.compareTo(name) > 0;
    }
}
public static void print(String name) {
    System.out.println(name);
} 
public static void main(String[] args) {
    var target = new Person("f");
    Consumer<String> staticPrint = MethodReferenceExample::print;
    Stream.of("a", "b", "g", "h")
            .map(Person::new) // constructor reference 
            .filter(target::compareTo) // method reference 
            .map(Person::getName) // instance method reference 
            .forEach(staticPrint); // static method reference
}

CompletableFuture 클래스

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>   

Future

  • 비동기적인 작업을 수행
  • 해당 작업이 완료되면 결과를 반환하는 인터페이스

CompletionStage

  • 비동기적인 작업을 수행행
  • 해당 작업이 완료되면 결과를 처리하거나, 다른 CompletionStage를 연결하는 인터페이스



ExecutorService

  • Thread pool을 이용하여 비동기적으로 작업을 실행하고 관리
  • 별도의 Thread를 생성하고 관리하지 않아도 되므로, 코드를 간결하게 유지 가능
  • pool을 이용하여 자원을 효율적으로 관리

ExecutorService 메소드

  • execute : Runnable 인터페이스를 구현한 작업을 Thread pool에서 비동기적으로 실행
    • Runnable은 따로 반환값이 없기에, 단지 비동기적으로 실행만 수행.
  • submit : Callable 인터페이스를 구현한 작업을 Thread pool에서 비동기적으로 실행하고, 해당 작업 결과를 Future 객체로 반환
    • execute와 달리 반환값 존재
  • shutdown : ExecuteService를 종료. 더 이상 task를 받지 않음.
    • shutdown하지 않으면 작업이 있다고 간주하고 대기하는 문제가 생긴다. (작업진행중)
    • 명시적 종료를 하지 않으면서, Thread들이 메모리 & 리소스 점유하게 된다.
    • 이는 결국 메모리(리소스) leak까지 이어질 수 있다. GC가 해당 pool을 수거할 수 없을 것

Executors - ExecutorService 생성

  • newSingleThreadPoolExecutor : 단일 Thread로 구성된 ThreadPool 생성. 한 번에 하나의 작업만 실행.
  • newFixedThreadPool : 고정된 크기의 ThreadPool을 생성. 크기는 인자로 주어진 n과 동일
    • 적절한 size를 구하게 된다면 ThreadPool의 최적화가 가능하다.
  • newScheduledThreadPool : 스케줄링 기능을 갖춘 고정 크기의 ThreadPool을 생성. 주기적이거나 지연이 발생하는 작업을 실행
  • newWorkStealingPool : work steal 알고리즘을 사용하는 ForkJoinPool 생성
    • work stealing은 작업이 비는 Thread가 바쁜 Thread의 일을 "훔쳐오는" 방식이다.
    • ForkJoinPool은 작업을 쪼개서 병렬 처리를 하고 합친다. 내부적으로 work stealing을 활용해 바쁜 Thread를 도울 것이다.



Future 인터페이스

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}
  • isCancelled()와 isDone()으로 비동기 작업에 대해 확인을 할 수 있어보인다. 후에 CompletableFuture에서 위 두가지로만 관리했을 시에 단점을 어떻게 보완하는지 알 수 있다.
  • 위의 Executors와 간단한 FutureHelper 클래스로 예시를 풀어나갈 것이다.
    public static Future<Integer> getFuture() {
      var executor = Executors.newSingleThreadExecutor();
      try {
          return executor.submit(() -> {
              return 1;
          });
      } finally {
          executor.shutdown();
      }
    } 
    }
    public static Future<Integer> getFutureCompleteAfter1s() {
      var executor = Executors.newSingleThreadExecutor();
      try {
          return executor.submit(() -> {
              Thread.sleep(1000);
              return 1;
          });
      } finally {
          executor.shutdown();
      }
    }
  • getFuture : 싱글 스레드를 생성하여, "1" 반환
    • submit()은 인자로 Callable 구현체가 필요하다. 그리고 리턴은 Future가 될거고.
  • getFutureCompleteAfter1S : 싱글스레드를 생성하여, 1초 대기 후에 "1"반환

Future : isDone(), isCancelled()

  • future의 상태를 반환
  • isDone : task가 완료되었다면, 원인과 상관없이 true 반환
    • "원인과 상관없다" -> Exeception도 포함이기 때문에, 정확한 진단을 내리기가 모호하다.
    • complete여도 true, cancel시 canceled 되어도 true를 반환.
  • isCancelled : task가 명시적으로 취소된 경우, true 반환
    • 확인이 필요하다면, 명시적으로 취소가 되었는가를 확인해야 한다.

Future : get() - 매우 중요- 매우 중요

  • 결과를 구할 때까지 Thread가 계속 block
    • future의 상태값이 isDone()일 때까지 대기
  • future에서 무한 루프나 오랜 시간이 걸린다면 thread가 blocking 상태 유지
Future future = FutureHelper.getFuture(); 	// Future 반환
assert !future.isDone(); 					// 즉시 반환을 판단하기 힘들기 때문에, false 
assert !future.isCancelled();				// 명시적으로 cancel을 하지 않아서 false	
  --
var result = future.get(); 					// get()은 future의 상태값이 isDone() true일 때까지 대기
assert result.equals(1); 					// 정상 리턴 후 equals() true 
assert future.isDone(); 					// get()으로 값을 가져온 이상, isDone은 완료된 시점
assert !future.isCancelled(); 				// 명시적으로 cancel을 하지 않아서 false
  • 문제점
    • 무한 while loop나 Thread sleep이 무한이면, future는 계속 동작하므로 (무한대기), future를 호출했던 Thread는 blocking 상태가 된다.

Future : get(long timeout, TimeUnit unit)

  • 결과를 구할 때까지 timeout동안 thread가 block
  • timeout이 넘어가도, future가 반환되지 않는다면 TimeoutException 발생
Future future = FutureHelper.getFutureCompleteAfter1s();  // 1초 후 Future 반환
var result = future.get(1500, TimeUnit.MILLISECONDS);     // 1.5초 block 상태 후  get()
assert result.equals(1);								  // true 반환
  Future futureToTimeout = FutureHelper.getFutureCompleteAfter1s(); // 1초 후 Future 반환
Exception exception = null; 
try {
        futureToTimeout.get(500, TimeUnit.MILLISECONDS); 	// 0.5초 후 Future 반환
} catch (TimeoutException e) {							// 예외발생				
exception = e; 
}
        assert exception != null; 
  • 문제점
    • get()은 최악의 경우 무한 blocking이 가능하므로, timeout을 끼고 사용하는 게 안전하다.

Future : calcel(boolean mayInterruptIfRunning)

  • future의 작업 실행을 취소
  • 취소할 수 없는 상황이라면 false를 반환
  • mayInterruptIfRunning가 false라면 시작하지 않은 작업에 대해서만 취소
 Future future = FutureHelper.getFuture();		// Future 반환
var successToCancel = future.cancel(true); 		// 명시적 cancel
assert future.isCancelled();					// 명시적으로 cancel했기 때문에 true 
assert future.isDone(); 						// 명시적으로 cancel하여 canceled 상태기 때문에 true
assert successToCancel;							// true
  --
successToCancel = future.cancel(true); 
assert future.isCancelled(); 
assert future.isDone(); 
assert !successToCancel;

Future 인터페이스의 한계

  • cancel을 제외하고 외부에서 future를 컨트롤할 수 없다.
  • 반환된 결과를 get()해서 접근하기 때문에 비동기 처리가 어렵다.
    • 반환된 결과 = future. future를 반환 받아야만 다음 작업이 가능
  • 완료되거나 에러가 발생했는지 구분하기 어렵다.
    • 위에서 기술한 것처럼, 중간에 예외가 발생하더라도 Exception처리는 isDone()에서 true이기 때문에 직접 catch 하는 수 밖에 없어보인다.
profile
🤦🏻‍♂️

0개의 댓글