Java 8 : 안정적 비동기 프로그래밍 Completable Future

600g (Kim Dong Geun)·2021년 5월 10일
1

Future의 단순활용

자바 5부터는 미래의 어느 기점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공하고 있다. 비동기 계산을 모델링하는데, Future를 이용할 수 있으며, Future는 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공한다. 시간이 걸릴 수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있다.

  • Future 는 저수준의 스레드에 비해 직관적으로 이해하기 쉽다는 장점이 있다.
  • Future를 이용하려면 시간이 오래걸리는 작업을 Callable 객체 내부로 감산 다음에 ExecutorService에 제출해야 한다.
  • 다음 코드는 Java 8 이전의 예제 코드다.
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<double>() {
  public Double call() {
    return doSomeLongComputation();// 시간이 오래걸리는 Task
  }
});
doSomethingElse(); // 비동기 작업을 수행하는 동안 다른 작업을 수행.
try {
  Double result = future.get(1,TimeUnit.SECONDS);
} catch(ExecutionException ee){
  // 계산 중 예외 발생
} catch (InterruptedException ie){
  // 현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException te){
  // Future가 완료되기 전에 타임아웃 발생
}

위와 같은 유형의 프로그래밍에서는 ExecutorService 에서 제공하는 스레드가 시간이 오래 걸리는 작업을 처리하는 동안 우리 스레드로 다른 작업을 동시에 실행할 수 있다. 다른 작업을 처리하다가 시간이 오래 걸리는 작업의 결과가 필요한 시점이 되었을 때, Future의 get 메소드로 결과를 가져올 수 있다.

get 메소드를 호출했을 때 이미 계산이 완료되어 결과가 준비되었다면 즉시 결과를 반환하지만 결과가 준비되지 않았다면 작업이 완료될 때까지 쓰레드를 블록시킨다.

img

위 시나리오에 어떤 문제가 있는지 파악했는가?

오래 걸리는 작업이 영원히 끝나지 않는다면, 전체 작업이 영원히 block 되는 문제가 발생할 수 있다.

따라서 우리는 get 메소드를 오버로드해서 우리 스레드가 대기할 최대 타임아웃 시간을 설정하는 것이 좋다.

Future 제한

  • 첫번째로 살펴볼 간단한 예제는 Future인터페이스가 비동기 계산이 끝났는지 확인할 수 있는 isDone 메소드, 계산이 끝나길 기다리는 메소드, 결과 회수 메소드등을 제공함을 보여준다.

  • 하지만 이들 메소드만으로 간결한 동시 실행코드를 구현하기에는 충분하지 않다.

    • 예를들어 여러 Future의 결과가 있을 때 이들의 의존성을 표현하기가 어렵다.

    • 즉, 오래걸리는 A라는 계산이 끝나면 그 결과를 다른 오래 걸리는 계산 B로 전달하시오. 그리고 B의 결과가 나오면 다른 질의의 결과와 B의 결과를 조합하시오와 같은 요구사항을 쉽게 구현할 수 있어야 한다.

      그러나 Future로 위 작업을 구현하기란 쉽지 않다.

  • 다음과 같은 선언형 기능이 있다면 유용할 것이다.

    • 두 개의 비동기 계산 결과를 하나로 합친다. 두 가지 계산 결과는 서로 독립적일 수도 있으며 또는 두 번째 결과가 첫 번째 결과에 의존하는 상황일 수 있다.
    • Future 집합이 실행하는 모든 태스크의 완료를 기다린다.
    • Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다.
    • 프로그램적으로 Future를 완료시킨다
    • Future 완료 동작에 반응한다.(즉, 결과를 기다리면서 블록되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future의 결과로 원하는 추가동작을 수행할 수 있음)

이번에는 지금까지 설명한 기능을 선언형으로 이용할 수 있도록 자바 8에서 새로 제공하는 CompletableFuture 클래스를 살펴본다. Stream과 CompletableFuture는 비슷한 패턴, 즉 람다 파이프라이닝을 활용한다. 따라서 FutureCompletableFuture의 관계를 CollectionStream의 관계에 비유할 수 있다.

CompletableFuture로 비동기 어플리케이션 만들기

어떤 제품이나 서비스를 이용해야 하는 상황이라고 가정하자. 예산을 줄일 수 있도록 여러 온라인상점 중 가장 저렴한 가격을 제시하는 상점을 찾는 어플리케이션을 완성해가는 예제를 이용해서 Completable Future의 기능을 살펴보자. 이 어플리케이션을 만드는 동안 다음과 같은 기술을 배울 수 있다.

  1. 고객에게 비동기 API를 제공하는 방법을 배운다.
  2. 동기 API를 사용해야 할 때 코드를 비블록으로 만드는 방법을 배운다. 두 개의 비동기 동작을 파이프라인으로 만드는 방법과 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법을 살펴본다.
  3. 비동기 동작의 완료에 대응하는 방법을 배운다. 즉 모든 상점에서 가격 정보를 얻을 때까지 기다리는 것이 아니라, 각 상점에서 가격 정보를 얻을 때마다 즉시 최저가격을 찾는 어플리케이션을 갱신하는 방법을 설명한다.

동기 API와 비동기 API

전통적인 동기 API 에서는 메소드를 호출한 다음에 메소드가 계산을 완료할 때까지 기다렸다가 메소드가 반환되면 호출자는 반환된 값으로 계속 다른 동작을 수행한다. 호출자와 피호출자가 각각 다른 스레드에서 실행되는 상황이었더라도, 호출자는 피호출자의 동작 완료를 기다렸을 것이다. 이처럼 동기 API를 사용하는 상황을 블록 호출 이라고 한다.

반면 비동기 API 에서는 메소드가 즉시 반환되며, 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다. 이와같은 비동기 API를 사용하는 상황을 비블록 호출이라고 한다. 다른 스레드에 할당된 나머지 계산 결과는 콜백 메소드를 호출해서 전달하거나 호출자가 계산결과를 끝날때까지 기다림 메소드를 추가로 호출하면서 전달된다.

비동기 API 구현

최저 가격 어플리케이션을 구현하기 위해 먼저 각각의 상점에서 제공해야 하는 API부터 정의하자. 다음은 제품명에 해당하는 가격을 반환하는 메소드 정의 코드다.

public class Shop{
  public double getPrice(String product){
    // 구현해야할 것.
  }
}

getPrice 메소드는 상점의 데이터베이스를 이용해서 가격정보를 얻는 동시에 다른 외부서비스에도 접근할 것이다. 우리는 실제 호출할 서비스를 구현할 수 없으므로 Delay라는 메소드로 대출할 것이다. dealy는 인위적으로 1초를 지연하는 메소드다.

public static void delay(){
  try{
    Thread.sleep(1000L);
  } catch(InterruptedException e){
    throw new RuntimeException();
  }
}

위에서 구현한 delay를 이용해서 지연을 흉내 낸 다음에 임의의 계산값을 반환하도록 getPrice를 구현할 수 있다. 아무 계산값이나 반환하는 동작이 비정상적으로 보일 순 있다. 아래 코드에서 볼 수 있는 것처럼 제품명에 charAt을 적용해서 임의의 계산값을 반환한다.

public double getPrice(String product){
  return calculatePrice(product);
}

private double calculatePrice(String product){
  delay();
  return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

동기 메소드를 비동기 메소드로 변환

현재 동기 메소드 getPrice를 비동기 메소드로 변환하려면 다음 코드처럼 먼저 이름과 반환값을 바꿔야 한다.

public Future<Double> getPriceAsync(String product){
  ...
}

자바 5부터 비동기 계산의 결과를 표현할 수 있는 java.util.concurrent.Future인터페이스를 제공한다. (즉, 호출자 스레드가 블록되지 않고 다른 작업을 실행할 수 있다.) 간단히 말해, Future는 결과값의 핸들일 뿐이며 계산이 완료되면 get 메소드로 결과를 얻을 수 있다.

getPriceAsync메소드는 즉시 반환되므로 호출자 스레드는 다른 작업을 수행할 수 있다. 자바8의 새로운 CompletableFuture 클래스는 다음 예제에서 보여주는 것처럼 getPriceAsync를 쉽게 구현하는데 도움이 되는 기능을 제공한다.

public Future<Double> getPriceAsync(String product){
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  
  new Thread(()->{
    double price = calculatePrice(product); //다른 스레드에서 비동기적으로 계산 수행.
    futurePrice.complete(price); //오랜 시간이 걸리는 계산이 완료되면 Future에 값을 설정한다.
  }).start();
  
  return futurePrice;
}

위 코드에서 비동기 계산과 완료 결과를 포함하는 CompletableFuture 인스턴스를 만들었다. 그리고 실제 가격을 계산할 다른 스레드를 만든 다음에 오래 걸리는 계산 결과를 기다리지 않고 결과를 포함할 Future 인스턴스를 바로 반환했다. 요청한 제품의 가격 정보가 도착하면 complete 메소드를 이용해서 CompletableFuture를 종료할 수 있다. 다음 코드에서 보여주는것처럼 클라이언트는 getPriceAsync를 활용할 수 있다.

Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime()- start)/ 1_000_000);
System.out.println("Invocation returned after " + invocationTime + "msecs");

//제품의 가격을계산하는 동안 다른작업을 수행
doSomethingElse();

//다른 상점 검색 등 다른 작업 수행
try{
  double price = futurePrice.get();
  System.out.printf("Price is %.2f%n",price);
} catch (Exception e){
  throw new RuntimeException(e);
}

long retreivalTime = ((System.nanoTime() - start)/ 1_000_000);
System.out,println("Price returned after " + retrevalTime+" msecs");

결과

Invocation returned after 43msecs
Price is 123.26
Price returned after 1045 msecs

에러처리 방법

지금까지 개발한 코드는 아무 문제없이 작동한다. 그런데 가격을 계산하는 동안 에러가 발생하면 어떻게 될까? 아마 이상한 일이 벌어질 것이다. 예외가 발생하면 해당 스레드에만 영향을 미친다. 즉 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬인다.결과적으로 클라이언트는 get 메소드가 반환될 때까지 영원히 기다리게될 수도 있다.

클라이언트는 타임아웃값을 받는 get 메소드의 오버로드 버전을 만들어 이 문제를 해결할 수 있다. 이처럼 블록 문제가 발생할 수 있는 상황에서는 타임아웃을 활용하는것이 좋다. 그래야 문제가 발생했을 때 클라이언트가 영원히 블록되지 않고, 타임아웃 시간이 지나면 TimeoutException을 받을 수 있다. 하지만 이때 제품가격 계산에 왜 에러가 발생했는지 알 수 있는 방법이 없다. 따라서 completableExceptionally 메소드를 이용하여 CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해야 한다.

public Future<Double> getPriceAsync(String product){  CompletableFuture<Double> futurePrice = new CompletableFuture<>();  new Thread(()->{    try{      double price = calculatePrice(product);      futurePrice.complete(price);    }catch (Exception ex){      futurePrice.completeExceptionally(ex);    }  }).start();  return futurePrice;}

이제 클라이언트는 가격 계산 메소드에서 발생한 예외 파라미터를 포함하는 ExecutionException 을 받게 된다. 예를들어 product not available이라는 RuntimeException 예외가 발생했다면 클라이언트는 다음과 같은 ExecutionException을 받을 것이다.

팩토리 메소드 supplyAsync로 Completable Future 만들기

지금까지 CompletableFuture를 직접 만들었다. 하지만 좀 더 간단하게 CompletableFuture를 만드는 방법도 있다. 예를들어 getPriceAsync 메소드를 다음처럼 간단하게 한행으로 구현할 수도 있다.

public Future<Double> getPriceAsync(String product){  return CompletableFuture.supplyAsync(()->calculatePrice(product));}

supplyAsync 메소드는 Supplier를 인수로 받아서 CompletableFuture를 반환한다. CompletableFuture는 Supplier를 실행해서 비동기적으로 결과를 생성한다. ForkJoinPool의 Executor 하나가 Supplier를 실행할 것이다. 하지만 두 번째 인수를 받는 오버로드 버전의 supplyAsync 메소드를 이용해서 다른 Executor를 지정할 수 있다.

결국 모든 다른 CompletableFuture의 팩토리 메소드에 Executor를 선택적으로 전달할 수 있다.

비블록 코드 만들기

우리는 동기 API를 이용해서 최저가격 검색 어플리케이션을 개발해야한다. 다음과 같은 상점 리스트가 있다고 가정하자

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),                                new Shop("LetsSaveBig"),                                new Shop("MyFavoriteShop"),                                new Shop("BuyItAll"));

그리고 다음처럼 제품명을 입력하면 상점 이름과 제품가격 문자열 정보를 포함하는 List를 반환하는 메소드를 구현해야 한다.

public List<String> findPrices(String product);

이전에 배운 스트림 기능을 이용하면 원하는 동작을 구현할 수 있을 것 같다. 그래서 결국 다음과 같은 코드를 구현할 것이다. (아래 코드에 어떤 문제가 있는지 생각해보자.)

public List<String> findPrices(String product){  return shops.stream()    .map(shop-> String.format("%S price is %.2f",shop.getName(),shop.getPrice(product)))    .collect(toList());}

상당히 간단한 코드다. 이제 findPrices 메소드로 원하는 제품의 가격을 검색할 수 있다.(아래 코드에서 볼 수 있는 것처럼 저자는 myPhone27S 라는 미래형 최신폰의 가격을 검색했다.) 또한 나중에 프로그램을 고치면서 성능이 얼마나 개선되었는지 확인할 수 있도록 가격을 찾는데 소요된 시간도 측정했다.

long start = System.nanoTime();System.out.println(findPrices("myPhone27S"));long duration = (System.nanoTime() - start) / 1_000_000;System.out.println("Done in " + duration + " msecs");

다음은 예제 실행 결과다

[BestPrice price is 123.26, LetsSaveBig prive is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]Done in 4032 msecs

4개의 상점에서 가격을 검색하는 동안 각각 1초의 대기시간이 있으므로 전체 가격 검색 결과는 4초가 조금 더 걸린다. 이제 어떻게 성능을 개선시킬 수 있을까?

병렬 스트림으로 요청 병렬화하기

병렬 스트림을 이용해서 순차 계산을 병렬로 처리해서 성능을 개선할 수 있다.

public List<String> findPrices(String product){  return shops.parallelStream()    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))    .collect(toList());}

이제 다시 예제 코드를 실행해서 성능을 확인해보자

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13,BuyItAll price is 184.74]Done in 1180 msecs

간단하게 성능을 4초대에서 1초로 줄일 수 있었다.

이를 더 개선할 수 없을까? CompletableFuture 기능을 활용해서 findPrices 메소드의 동기호출을 비동기 호출로 변경하여 보자.

CompletableFuture로 비동기 호출 구현하기

팩토리 메소드 supplyAsyncCompletableFuture를 만들 수 있음을 배웠다. 배운 지식을 활용하자.

List<CompletableFuture<String>> priceFutures = shops.stream()  .map(shop -> CompletableFuture.supplyAsync(  ()-> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product))))	.collect(toList());

위 코드로 CompleatableFuture를 포함하는 List<CompletableFuture<String>>를 얻을 수 있다. 하지만 우리가 재구현하는 findPrices 메소드의 반환 형식은 List<String> 이므로 모든 CompletableFuture의 동작이 완료되고 결과를 추출한 다음에 리스트를 반환해야한다.

두번째 map 연산을 List<<CompletableFuture<String>>에 적용할 수 있다. 즉, 리스트의 모든 CompletableFutureJoin연산을 호출해서 모든 동작이 끝나기를 기다린다. CompletableFuture 클래스의 join 메소드는 Future인터페이스의 get 메소드와 같은 의미를 갖는다. 다만 join은 아무 예외를 발생시키지 않는점이 다르다. 따라서 두 번째 map의 람다 표현식을 try/catch로 감쌀 필요가 없다.

public List<String> findPrices(String product){  List<CompletableFuture<String>> priceFutures =     shops.stream()    .map(shop -> CompletableFuture.supplyAsync(()-> shop.getName() + " price is "+                                               shop.getPrice(product)))    .collect(Collectors.toList());    return priceFutures.stream()    .map(CompletableFuture::join)    .collect(toList());}

결과는 다음과 같다.

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13,BuyItAll price is 184.74]Done in 2005 msecs

2초 정도로 순차 스트림보다 빠르고, 병렬 스트림을 사용했을 때 보다는 2배나 느리다. 또한 순차 스트림 버전의 코드를 조금만 고쳐서 병렬 스트림을 만들 수 있다는 사실을 감안하면 실망스러운 결과이다.

CompletableFuture를 사용한 코드는 시간낭비였던 것일까? 이 코드를 실행하는 기기가 4개의 스레드를 병렬로 실행할 수 있는 기기라는 점을 착안 해서 문제를 해결해 나가보자.

더 확장성이 좋은 해결방법

병렬 스트림 버전의 코드는 정확히 4개의 상점에 하나의 스레드를 할당해서 4개의 작업을 병렬로 수행하면서 검색 시간을 최소화할 수 있었다. 만약 검색해야할 다섯 번째 상점이 추가 됐다면 어떻게 될까? 다음 출력 결과에서 보여주는 것처럼 순차 버전에서는 시간이 1초정도 늘어났다.

  • 순차 스트림 버전
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13,BuyItAll price is 184.74]Done in 5025 msecs
  • 병렬 스트림 버전
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13,BuyItAll price is 184.74]Done in 2177 msecs
  • CompletableFuture
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13,BuyItAll price is 184.74]Done in 2006 msecs

병렬 스트림 버전보다 아주 조금 빠르다. 하지만 여전히 만족할 수 있는 수준은 아니다. 예를들어 아홉개의 상점이 있다고 가정하면 병렬 스트림 버전은 3143밀리초, CompletableFiuture 버전은 3009 밀리초가 소요된다.
두가지 버전 모두 내부적으로 Runtime.getRuntime().availableProcessors()가 반환하는 스레드 수를 사용하면서 비슷한 결과가 된다. 결과적으로 비슷하지만 CompletableFuture는 병렬 스트림 버전에 비해 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다. 따라서 Executor로 스레드 풀의 크기를 조절하는 등 어플리케이션에 맞는 최적화된 설정을 만들 수 있다. 이 기능으로 어플리케이션의 성능을 향상 시킬수 있는지 보자.

커스텀 Executor 사용하기

우리는 실제로 필요한 작업량을 고려한 풀에서 관리하는 스레드 수에 맞게 Executor를 만들 수 있으면 좋을 것이다. 풀에서 관리하는 스레드 수를 어떻게 결정할 수 있을까?

스레드 풀 크기 조절

자바 병렬 프로그래밍에서는 스레드 풀의 최적값을 찾는 방법을 제안한다. 스레드 풀이 너무크면 CPU와 메모리 자원을 서로 경쟁하느라 시간을 낭비할 수 있다. 반면 스레드 풀이 너무 작으면 CPU의 일부 코어는 활용되지 않을수도 있다. 게츠는 다음 공식으로 대략적인 CPU 활용 비율을 계산할 수 있다
N(threads)=N(cpu)U(cpu)(1+W/C)N_(threads_)=N_(cpu_) * U_(cpu)(1+W/C)
NcpuNcpu Runtime.getRuntime().availableProcessors()가 반환하는 코어의수
UcpuUcpu는 0과 1사이의 값을 갖는 CPU 활용 비율
W/CW/C는 대기시간과 계산시간의 비율

우리 어플리케이션은 상점의 응답을 대략 99퍼센트의 시간만큼 기다리므로 W/C 비율을 100으로 간주할 수 있다. 즉, 대상 CPU활용률이 100퍼센트라면 400스레드를 갖는 풀을 만들어야함을 의미한다. 하지만 상점수보다 많은 스레드를 가지고 있어봐야 사용할 가능성이 전혀 없으므로 상점 수보다 많은 스레드를 갖는 것은 낭비일뿐이다. 따라서 한 상점에 하나의 스레드가 할당될 수 있도록, 가격정보를 검색하려는 상점 수 만큼 스레드를 갖도록 Executor를 설정한다. 스레드 수가 너무 많으면 오히려 서버가 크래시될 수 있으므로 하나의 Executor에서 사용할 스레드의 최대 개수는 100 이하로 설정하는 것이 바람직하다.

private final Executor executor =	Executors.newFiexedThreadPool(Math.min(shops.size(),100), new ThreadFactory(){    public Thread newThread(Runnable r) {    Thread t = new Thread(r);    t.setDaemon(true);    return t;    }});

우리가 만드는 풀은 데몬 쓰레드를 포함한다. 자바에서 일반 스레드가 실행 중이면 자바 프로그램은 종료되지 않는다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는 일반 스레드가 있으면 문제가 될 수 있다. 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다. 두 스레드의 성능은 같다. 이제 새로운 Executor를 팩토리 메소드 supplyAsync의 두 번째 인수로 전달할 수 있다. 예를들어 다음 코드처럼 제품가격 정보를 얻는 CompletableFuture를 만들 수 있다.

CompletableFuture.supplyAsync(()-> shop.getName() + "price is " + shop.getPrice(product, executor);

CompletableFuture의 결과를 확인하니 다섯 개의 상점을 검색할 때는 1021밀리초, 아홉개의 상점을 검색할 때는 1022밀리초가 소요된다. 이전에 계산한 것처럼 400개의 상점까지 이 같은 성능을 유지할 수 있다. 따라서 비동기 동작을 많이 사용하는 상황에서는 지금 살펴본 기법이 가장효과적일수도 있음을 기억하자.

스트림 병렬화와 CompletableFuture 병렬화

지금까지 컬렉션 계산을 병렬화하는 두 가지 방법을 살펴봤다. 하나는 병렬스트림으로 변환해서 컬렉션을 처리하는 방법이고 다른 하나는 컬렉션을 반복하면서 CompletableFuture 내부의 연산으로 만드는 것이다. CompletableFuture를 이용하면 전체적인 계산이 블록되지 않도록 스레드 풀의 크기를 조절할 수 있다.
다음을 참고하면 어떤 병렬화 기법을 사용할 것인지 선택하는데 도움이 된다.

  • I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간단하며 효율적일 수 있다. (모든 스레드가 계산작업을 수행하는 상황에서는 프로세서 코어수 이상의 스레드를 가질 필요가 없다.)
  • 반면 작업이 I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더많은 유연성을 제공하며 대기/계산(W/C) 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 Lazy Option때문에 스트림에서 I/O를 시렞로 언제 처리할지 예측하기 어려운 문제도 있다.

비동기 작업 파이프라인 만들기.

우리와 계약을 맺은 모든 상점이 하나의 할인 서비스를 사용하기로 했다고 가정하자. 할인 서비스에서는 서로 다른 할인율을 제공하는 다섯 가지 코드를 제공한다. 이를 다음처럼 Discount.Code로 표현할 수 있다.

public class Discount {    public enum Code{        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);                private final int precentage;                Code(int percentage){            this.percentage = percentage;        }    }}

또한 getPrice도 다음과 같이 바뀐다.

public String getPrice(String product){    double price = calculatePrice(product);    Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];    return String.format("%s:%.2f:%s", name, price, code);    }    private  double calculatePrice(String product) {    delay();    return random.nextDouble() * product.charAt(0) + product.charAt(1);    

할인서비스 구현

우리의 최저가격 검색 어플리케이션은 여러 상점에서 가격 정보를 얻어오고, 결과 문자열을 파싱하고, 할인 서버에 질의를 보낼 준비가 되었다. 할인 서버에서 할인율을 확인해서 최종 가격을 계산할 수 있는데 이를 Quote 클래스로 캡슐화 했다.

public class Quote{
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;
    
    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }
    
    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }
    
    public String getShopName(){
        return shopName;
    )
    
    public double getPrice(){
        return price;
    }
    
    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

상점에서 얻은 문자열을 정적 팩토리 메소드 parse로 넘겨주면 상점이름, 할인전 가격, 할인된 가격 정보를 포함하는 Quote 클래스 인스턴스가 생성된다.
다음코드에서 보여주는 것처럼 Discount 서비스에서 Quote 객체를 인수로 받아 할인된 가격 문자열을 반환하는 applyDiscount 메소드도 제공한다.

public class Discount{
    public enum Code{
        //소스코드 생략
    }
    
    public static String applyDiscount(Quote quote){
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }
    
    private static double apply(double price, Code code) {
        delay();
        return format(price * (100 - code.percentage) / 100 );
    }
}

할인 서비스 사용

Discount는 원격 서비스이므로 다음코드에서 보여주는 것처럼 1초의 지연을 추가한다. 일단 가장 쉬운방법(순차적 동기방식)으로 findPrices 메소드를 구현한다.

public List<String> findPrices(String product) {
    return shops.stream()
        .map(shop-> shop.getPrice(product))
        .map(Quote::parse)
        .map(Discount::applyDiscount)
        .collect(toList());
}

위 코드는 다음과 같이 작동한다.

  1. 첫 번째 연산에서는 각 상점을 요청한 제품의 가격과 할인 코드로 변환한다.
  2. 이들 문자열을 파싱해서 Quote 객체를 만든다.
  3. 원격 Discount 서비스에 접근해서 최종 할인가격을 계산하고 가격에 대응하는 상점 이름을 포함하는 문자열을 반환한다.

위 코드는 성능 최적화와는 거리가 멀다. 실제 벤치 마크로 실제 성능은 다음과 같다.

[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]
Done in 10028 msecs]

예상햇듯이 순차적으로 다섯 상점에 가격 정보를 요청하느라 5초가 소요 됐고, 다섯 상점에서 반환한 가격 정보에 할인 코드를 적용할 수 있도록 할인 서비스에 5초가 소요되었다. 하지만 병렬 스트림을 이용하면 성능을 쉽게 개선할 수 있다는 사실은 이미 확인했다. 하지만 병렬 스트림에서는 스트림이 사용하는 스레드 풀의 크기가 고정되어 있어서 상점 수가 늘어났을 때처럼 검색 대상이 확장되었을 때 유연하게 대응할 수 없다는 사실도 확인했다. 따라서 CompletableFuture에서 수행하는 태스크를 설정할 수 있는 커스텀 Executor를 정의함으로써 CPU 사용을 극대화할 수 있다.

동기 작업과 비동기 작업 조합하기

이제 CompletableFuture에서 제공하는 기능으로 findPrice 메소드를 비동기적으로 재구현하자.

public List<String> findPrices(string product){
    List<CompletableFuture<String>> priceFutures = 
        shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote -> 
                CompletableFuture.supplyAsync(
                    ()-> Discount.applyDiscount(quote), executor)))
                    .collect(toList());
                    
    return priceFutures.stream()
        .map(CompletableFuture::join)
        .collect(toList());
}

각 map 과정에 대해 설명을 하자면,

가격 정보 얻기

팩토리 메소드 supplyAsync에 람다 표현식을 전달해서 비동기적으로 상점에서 정보를 조회했다. 첫 번째 변환의 결과에 람다 표현식을 전달해서 비동기적으로 상점에서 정보를 조회했다. 첫 번째 변환의 결과는 Stream<CompletableFuture<String>> 이다. 각 CompletableFuture는 작업이 끝났을 때 해당 상점에서 반환하는 문자열 정보를 포함한다.

Quote 파싱하기

두 번째 변환 과정에서는 첫 번째 결과 문자열을 Quote로 변환한다. 파싱 동작에서는 원격 서비스나 I/O 작업이 없으므로 원하는 즉시 지연 없는 동작을 수행할 수 있다. 따라서 첫 번째 과정에서 생성된 CompletableFuture에 thenApply 메소드를 호출한다음에 문자열을 Quote 인스턴스로 변환하는 Function으로 전달한다.

thenApply 메소드는 CompletableFuture가 끝날때까지 블록하지 않는다는 점을 주의해야 한다. 즉, CompletableFuture가 동작을 완전히 완료한 다음ㅇ테 thenApply 메소드로 전달된 람다 표현식을 적용할 수 있다.

따라서 CompletableFuture<String> 을 CompletableFuture<Quote> 로 변환할 것이다. 이는 마치 CompletableFuture의 결과물로 무엇을 할지 지정하는 것과 같다. 즉 스트림 파이프라인과 비슷한 기능을했다.

CompletableFuture를 조합해서 할인된 가격 계산하기

세번째 map 연산에서는 상점에서 받은 할인전 가격에 원격 Discount 서비스에서 제공하는 할인율을 적용해야 한다. 이번에는 원격 실행이 포함되므로 이전 두 변환과 다르며 동기적으로 작업을 수행해야 한다.

람다 표현식으로 이 동작을 팩토리 메소드 supplySync 에 전달할 수 있다. 그러면 다른 CompletableFuture가 반환된다. 결국 두 가지 CompletableFuture로 이루어진 연쇄적으로 수행되는 두 개의 동작을 만들 수 있다.

  • 상점에서 가격정보를 얻어와서 Quote로 변환하기
  • 변환된 Quote를 Discount 서비스로 전달해서 할인된 최종가격 획득하기

자바8의 CompletableFuture API는 이와 같이 두 비동기 연산을 파이프라인으로 만들 수 있도록 thenCompse 메소드를 제공한다. thenCompose 메소드는 첫 번째 연산의 결과를 두 번째 연산으로 전달한다. 즉, 첫 번째 CompletableFurture에 thenCompse 메소드를 호출하고 Function에 넘겨주는 식으로 두 CompletableFuture를 조합할 수 있다. Function은 첫 번재 CompletableFuture 반환결과를 인수로 받고 두 번째 CompletableFuture를 반환하는데, 두 번재 CompletableFuture는 첫 번째 CompletableFuture의 결괄르 계산의 입력으로 사용한다.

thenCompose를 비동기로 수행할 수 있는 thenComposeAsync 또한 지원한다.

독립 CompletableFuture와 비독립 CompletableFuture

위에서는 첫번째 CompletableFuture에 thenCompose 메소드를 실행한 다음에 실행 결과를 첫 번째 실행 결과를 입력으로 받는 두번째 CompletableFuture로 전달했다. 실전에서는 독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 하는 상황이 종종 발생한다. 물론 첫 번째 CompletableFuture의 동작 완료와 관계없이 두 번째 CompletableFuture를 실행할 수 있어야 한다.

이런 상황에서는 thenCombine 메소드를 사용한다. thenCombine 메소드는 BiFunction 을 두번째 인수로 받는다. BiFunction은 두 개의 CompletableFuture 결과를 어떻게 합칠지 정의한다. thenCompose와 마찬가지로 thenCombine 메소드에도 Async버전이 존재한다.

thenCombineAsync 메소드에서는 BiFunction이 정의하는 조합 동작이 스레드 풀로 제출되면서 별도의 태스크에서 비동기적으로 수행된다.

Future<Double> futurePriceInUSD =
  CompletableFuture.supplyAsync(()-> shop.getPrice(product))
  .thenCombine(
		CompletableFuture.supplyAsync(
    	()-> exchangeService.getRate(Money.EUR, Money.USD)),
		(price,rate)-> price*rate
	));

Future의 리플렉션과 CompletableFuture의 리플렉션

자바 8 이전의 Future에 비해 CompletableFuture가 어떤 큰 이점을 제공하는지 명확히 보여준다. CompletableFuture는 람다표현식을 사용한다. 이미 살펴본 것처럼 람다 덕분에 다양한 동기 태스크, 비동기 태스크를 활용해서 복잡한 연산 수행방법을 효과적으로 쉽게 정의할 수 잇는 선언형 API를 만들 수 있다. 자바7로 구현하면서 실질적으로 CompletableFuture를 이용했을 때 얻을 수 있는 코드 가독성의 이점이 무엇인지 확인할 수 있다.

ExecutorService executor = Executors.newCachedThreadPool();
final Future<Double> futureRate = executor.submit(new Callable<Double>(){
  public Double call(){
    return exchangeService.getRate(Money.EUR, Money.USD);
  }
});
Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
  public Double call(){
    double priceInEUR = shop.getPrice(product);
    return priceInEUR * future.get();
  }
});

타임아웃 효과적으로 사용하기

Future의 계산결과를 읽을 떄는 무한정 기다리는 상황이 발생할 수 있으므로 블록을 하지 않는 것이 좋다. 자바9에서는 CompletableFuture에서 제공하는 몇 가지 기능을 이용해 이런 문제를 해결할 수 있다. orTimeout 메소드는 지정된 시간이 지난 후에 CompletableFuture를 TimeoutException 으로 완료하면서 또 다른 CompletableFuture를 반환할 수 있도록 내부적으로 ScheduledThreadExecutor를 활용한다.

이 메소드를 이용하면 계산 파이프라인을 연결하고 여기서 TimeoutException 이 발생했을 때 사용자가 쉽게 이해할 수 있는 메시지를 제공할 수 있다. 다음 코드에서 보여주는 것처럼 Future가 3초후에 작업을 끝내지 못할 경우 TimeoutException이 발생하도록 메서드 체인 끝에 orTimeout 메소드를 추가할 수 있다.

Future<Double> futurePriceInUSD = 
  CompletableFuture.supplyAsync(()->shop.getPrice(product))
  .thenCombine(
		CompletableFuture.supplyAsync() -> exchangeService.getRate(Money.EUR, Money.USD)),
		(price, rate) -> price * rate
  )
  .orTimeout(3, TImeUnit.SECONDS);
Future<Double> futurePriceInUSD =
  CompletableFuture.supplyAsync(()-> shop.getPrice(product))
  .thenCombine(
		CompletableFuture.supplyAsync(
    ()->exchangeService.getrate(Money.EUR,Money.USD),
	(price, rate) -> price * rate
)
  .orTimeout(3, TimeUnit.SECONDS);

orTimeout 메소드처럼 completeOnTimeout 메소드는 CompletableFuture를 반환하므로 이 결과를 다른 CompletableFuture 메소드와 연결할 수 있다. 지금까지 두 가지 타임아웃을 설정했다. 한 개는 3초 이내에 연산을 마치지 못하는 상황에서 발생하고 다른 타임아웃은 1초 이후에 환율을 얻지 못했을 때 발생한다.

CompletableFuture의 종료에 대응하는 방법

실제 상황에서는 네트워크 지연이 1초로 딱 떨어지지 않는다. 다양한 네트워크 부하가 발생하며 지연시간은 그에 따라 천차만별 다르다. 우리는 0.5초에서 2.5초 사이의 임의의 지연으로 이를 시뮬레이션 해보도록 하자.

private static final Random random = new Random();
public static void randomDelay(){
  int delay = 500 + random.nextInt(2000);
  try {
    Thread.sleep(delay);
  } catch (InterruptedException e){
    throw new RuntimeException(e);
  }
}

최저가격 검색 어플리케이션 리팩터링

먼저 모든 가격정보를 포함할 때까지 리스트 생성을 기다리지 않도록 프로그램을 고쳐야 한다. 그러려면 상점에 필요한 일련의 연산 실행 정보를 포함하는 CompletableFuture의 스트림을 직접 제어해야한다.

public Stream<CompletableFuture<String>> findPricesStream(String product){
  return shops.stream()
    .map(shop -> CompletableFuture.supplyAsnyc(
	    ()->shop.getPrice(product), executor
    ))
    .map(future -> future.thenApply(Quote::parse))
    .map(future -> future.thenCompose(quote -> CompletableFuture.spplyAsync(
    () -> discount.applyDiscount(quote),exector
    )));
}
profile
수동적인 과신과 행운이 아닌, 능동적인 노력과 치열함

1개의 댓글

comment-user-thumbnail
2022년 6월 4일

덕분에 많은 자바 비동기에 대해서 많은 이해가 되었습니다!!

답글 달기