Java 8 : CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

600g (Kim Dong Geun)·2021년 4월 19일
0

👨‍💻이번장은 작성하는데 왜이렇게 힘이 드는지...👨‍💻

포스팅 하나 작성하는데 2주걸렸다는 슬픈 현실..

동시성

  • 하나의 쓰레드나 프로세서의 여러 Task를 관리함으로써, 마치 동시에 작업이 수행되고 있는 것처럼 보이는 것

병렬성

  • Task의 실행을 하드웨어 수준으로 실행되며, 각각의 작업들이 독립적임

동시성을 구현하는 자바의 진화

버전사용 방법
Java 5 이전Runnable과 Thread를 이용하여 구현
Java 5ExecutorService, Callable<T>, Future<T>
Java 7Fork/Join 그리고 RecursiveTask
Java 8Stream, CompletableFuture
Java 9분산 비동기 프로그래밍은 명시적으로 지원 (발행 구독 프로토콜 지원 Flow API)

쓰레드와 높은 추상화

  • 운영체제는 같은 주소공간을 공유 하는 프로세스의 쓰레드를 사용하여 동시에 또는 협력적으로 Task를 수행

Executor와 쓰레드 풀

  • Java 5에서는 Executor 프레임워크와 쓰레드풀을 통해 쓰레드의 힘을 높은 수준으로 끌어올리는 태스크 제출실행을 분리 할 수 있는 기능을 제공

쓰레드의 문제

  • Java 쓰레드는 직접 운영체제 쓰레드에 접근
  • 운영체제 쓰레드는 만들고 종료하는데 있어 비용이 비쌈
  • 또한 쓰레드 숫자는 제한되어 있으므로 운영체제가 지원하는 쓰레드 초과 사용시 크래시 발생 가능

쓰레드 풀, 쓰레드 풀이 더 좋은 이유

  • Java의 ExecutorService는 Task를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 수집
  • 프로그램은 newFiexedThreadPool 같은 팩토리 메소드 중 하나를 이용하여, 쓰레드 풀을 만들어 사용
  • 위 메소드는 워커 쓰레드라 불리는 nThreads 포함하는 Executor Service를 만들고, 이들을 쓰레드 풀에 저장한다.
  • 쓰레드 풀에서 사용되지 않은 쓰레드로 제출된 테스크를 먼저 온 순서대로 저장한다.
  • 해당 쓰레드 풀이 종료되면 쓰레드를 풀로 반환
  • 이 방식의 장점은 하드웨어에 맞는 태스크를 유지함과 동시에 수 천개의 Task를 아무 오버헤드 없이 제출 가능

쓰레드 풀, 그리고 쓰레드 풀이 나쁜 이유

  • 거의 모든 부분에서 쓰레드를 직접 사용하는 것보다 쓰레드 풀을 이용하는 것이 바람직하지만 두가지 사항을 주의해야한다.

    • K쓰레드를 가진 쓰레드 풀은 오직 k만큼의 쓰레드를 동시에 실행할 수 있다. 초과로 제출된 태스크는 Queue에 저장되며, 이전에 태스크 중 하나가 종료되기 전까지는 쓰레드에 할당하지 않는다.

      불필요하게 많은 쓰레드를 만드는 일을 피할 수 있으므로 보통 이 상황이 문제가 되지는 않지만, Sleep 상태나 I/O를 기다리거나 네트워크 연결을 기다리는 태스크가 있다면 주의 해야한다. 해당 작업이 발생하는 경우 태스크가 워커 쓰레드에 할당된 상태를 유지하지만 아무 작업도 하지 않게 된다. (Waste한 상태가 되고 최악의 경우에는 Deadlock 상황이 발생할 수 있다.)

    • 중요한 코드를 실행하는 쓰레드가 죽는일이 발생하지 않도록 보통 자바 프로그램은 Main을 반환하기 전에 모든 쓰레드의 작업을 끝나길 기다린다. 따라서 프로그램을 종료하기 전에 모든 쓰레드 풀을 종료하는 습괕을 갖는 것이 중요하다.

쓰레드의 다른 추상화 : 중첩되지 않은 메소드 호출

  • 쓰레드 생성과 join()이 한 쌍처럼 중첩된 메소드 호출에 추가된 것을 엄격한 포크/조인 이라고 한다.

  • 반대의 경우 여유로운 포크/조인

쓰레드 생성과 join()이 한쌍이라는게 무슨 말인지 이해가 안되서, 엄격한 포크/조인 (Strict fork/join)에 대한 설명이 부실해서 찾아봤는데, (링크 참조) 엄격한 포크 이란 다음 조건을 갖춘 Fork-Join Execution Model을 뜻한다

  1. 태스크는 하나 혹은 하나 이상의 자식 태스크를 Fork 할 수 있고, 각 Task들은 부모 Task혹은 서로간 병렬로 실행할 수 있다.
  2. 태스크는 그들의 자식들이 끝나기 까지 완전히 기다려야 한다. 만약 A 태스크가 B 태스크를 기다리지 않아도 된다면, A 태스크는 B태스크의 자식이 아니다.
  3. 모든 자식 태스크가 완료 될때까지 해당 태스크를 완료할 수 없다.
  • 아무튼 위 메소드들을 사용학게 된다면 어떤 위험성이 따를까?

    • 쓰레드 실행은 메소드를 호출한 다음 코드와 동시에 실행되므로 Data가 Race Condition에 빠지지 않도록 주의해야한다.
    • 기존 실행 중이던 쓰레드가 종료되지 않은 상황에서 자바의 main() 메소드가 반환되면 어떻게 해야할까? (두가지 방법이 존재하는데 두가지 방법 모두 안전하지 않은 방법.)
      • 어플리케이션을 종료하지 못하고 모든 쓰레드가 실행을 종료할 때까지 기다리는 것
      • 어플리케이션 종료를 방해하는 쓰레드를 강제종료시키고 어플리케이션을 종료하는 것.
    • Java Thread는 setDaemon() 메소드를 이용해, 데몬 혹은 비데몬으로 구분시킬 수 있다.
    • 데몬 쓰레드는 어플리케이션이 종료될 때 강제 종료되므로 데이터 일관성을 파괴하지 않는 동작을 수행할 때 유용하게 활용할 수 잇는 반면, main() 메소드는 모든 비데몬 쓰레드가 종료될 때까지 프로그램을 종료하지 않고 기다린다.

우리는 쓰레드에 무엇을 바라는가?

  • 모든 하드 웨어 쓰레드를 활용해 병렬성의 장점을 극대화하도록 프로그램 구조를 만드는 것

    즉, 프로그램을 작은 태스크 단위로 구조화하는 것이 목표다.

동기 API 와 비동기 API

  • 다음과 같은 메소드가 있다 가정하자
int y = f(x);
int z = g(x);
System.out.println(y+z);

//f와 g를 실행하는데 오랜 시간이 걸린다고 가정하자.
  • 또한 f와 g가 서로 상호작용하지 않는다고 가정하고 동시에 실행하면 시간을 단축할 수 있다.
class ThreadExample{
  public static void main(String[] args) throws InterruptedException {
    int x = 1337;
    Result result = new Result();
    
    Thread t1 = new Thread(()-> {result.left = f(x);});
    Thread t2 = new Thread(()-> {result.right = g(y);});
    t1.start();
    t2.start();
    t1.join();
    t2.join();
    System.out.println(result.left + result.right);
     
  }
  
  private static class Result {
    private int left;
    private int right;
  }
}
  • 해당 코드를 Runnable 대신 Future API 인터페이스를 이용해 코드를 단순화 할 수 도 있다.
public class ExecutorServiceExample {
  public static void main(String[] args) throws ExecutionException, InterruptedException{
    int x = 1337;
    ExecutorService executorService = Executors.newFiexedThreadPool(2);
		Future<Integer> y = executorService.submit(()->f(x));
    Future<Integer> z = executorService.submit(()->g(x));
    System.out.println(y.get()+z.get());
    
    executorService.shutdown();
  }
}

하지만 여전히 위 코드도 명시적인 submit 메소드 호출 같은 불필요한 코드로 오염되었다. 명시적 반복으로 병렬화를 수행하던 코드를 스트림을 이요ㅇ해 내부 반복으로 바꾼것처럼 비슷한 방법으로 이를 해결해야한다.

  • 문제의 해결방법은 비동기 API 라는 기능으로 API를 바꿔서 해결할 수 있다
    • 첫번째 방법인 자바의 futureCompletableFuture를이용하면 이 문제를 조금 개선할 수 있다.

Future API

대안을 이용하면 f,g의 시그니처가 다음처럼 바뀐다.

Future<Integer> f(int x);
Future<Integer> g(int x);

그리고 다음처럼 호출이 바뀐다.

Future<Integer> y = f(x);
Future<Integer> z = g(x);
System.out.println(y.get()+z.get());

메소드 f는 호출 즉시 자신의 원래 바디를 평가하는 태스크를 포함하는 Future를 반환한다. 마찬가지로 메소드 g도 Future를 반환하며 세 번째 코드는 get() 메소드를 이용해 두 Future가 완료되어 결과가 합쳐지기를 기다린다.

예제에서는 API는 그대로 유지하고 g를 그대로 호출하면서 f에만 Future를 적용할 수 있었다.

하지만 API는 그대로 유지하고 g를 그대로 호출하면서 f에만 Future를 적용할 수 있었다.

하지만 조금 더 큰 프로그램에서는 두 가지 이유로 이런 방식을 사용하지 않는다.

다른 상황에서도 g에도 Future 형식이 필요할 수 있으므로 API 형식을 통일하는 것이 바람직하다.

병렬 하드웨어로 프로그램 실행 속도를 극대화할며ㅕㄴ 여러 작은 하지만 합리적인 크기의 태스크로 나누는 것이 좋다.

리액티브 형식의 API

두 번째 대안에서 핵심은 f,g의 시그니처를 바꿔서 콜백 형식의 프로그래밍을 이용하는 것이다.

void f(int x, IntConsumer dealWithResult);

f가 값을 반환하지 않는데, 어떻게 프로그램이 동작할까 (콜백으로 겠지..?)

f에 추가 인수로 콜백(람다)를 전달해서 f의 바디에서는 return 문으로 반환하는 것이 아니라 결과가 준비되면 이를 람다로 호출하는 태스크로 만드는 것이 비결이다.

다시말해, f는 바디를 실행하면서 태스크를 만든 다음 즉시 반환하므로 코드 형식이 다음과 같이 바뀐다.

public class CallbackStyleExample{
  public static void main(String[] args){
    int x = 1337;
    Result result = new Result();
    
    f(x, (int y)-> {
      result.left = y;
      System.out.println((result.left + result.right));
    });
    
    g(x, (int z)-> {
      result.right = z;
      System.out.println((result.left + result.right));
    })
  }
}

하지만 결과가 달라졌다. f와 g의 호출 합계를 정확하게 출력하지 않고 상황에 따라 먼저 계산된 결과를 출력한다. 왜냐하면 락을 사용하지 않으므로 값을 두 번 출력할 수 있을 뿐 더러 때로는 +에 제공된 두 피연산자가 호출되기 전에 업데이트될 수도 있다. 다음처럼 두 가지 방법으로 이 문제를 해결할 수 있다.

  • if-then-else를 이용해 적절한 락을 이용해 두 콜백이 모두 호출되었는지 확인한 다음 println을 호출해 원하는 기능을 수행할 수 있다.

  • 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 이용하는 것이 더 적절하다.

    리액티브 형식의 프로그래밍으로 메소드 f와 g는 dealWithResult 콜백을 여러번 호출 할 수 있다. 원래의 f,g 함수는 오직 한 번만 return을 사용하도록 되어있다. 마찬가지로 Future도 한 번만 완료되면 그 결과는 get()으로 얻을 수 있다. 리액티브 형식의 비동기 API는 자연스럽게 일련의 값을, Future, 형식의 API는 일회성의 값을 처리하는데 적절하다.

즉 정리하자면 Future 형식의 API는 일회성의 값을 처리하는 데 적합 하고, 리액티브 형식의 비동기 API는 자연스럽게 일련의 값을 처리하는데 적합하다.

잠자기

  • 사람과 상호작용하거나 어떤 일이 일정속도로 제한되어 일어나는 상황의 어플리케이션을 만들 때 자연스럽게 sleep 메소드를 사용할 수 있다. 하지만 이 Sleep 메소드는 스레드는 잠들어도 여전히 시스템 자원을 점유한다. 이는 스레드가 많아지고 그 중 대부분이 잠을 잔다면 시스템 효율이 급격하게 떨어지는 결과를 보여진다.

  • 쓰레드 풀을 사용한다면 잠을 자는 태스크는 다른 태스크가 시작되지 못하게 막으므로 자원을 소비한다는 사실을 기억 (+ 모든 블록동작또한 마찬가지)

  • 이상황에서 태스크를 기다리는 일을 만들지 않거나 아니면 코드에서 예외를 일으키는 방법으로 처리할 수 있다.

  • 예를들어 다음 코드의 차이를 보자.

work1();
Thread.sleep(1000);
work2();

이를 코드 B와 비교하자.

public class ScheduledExecutorServiceExample {
  public static void main(String[] args){
    ScheduledExecutorService scheduledExccutorService = Executors.newScheduledThreadPool(1);
    
    work1();
    scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10, TimeUint.SECONDS);
    
    scheduledExecutorService.shutdown();
  }
  
  public static void work1(){
    System.out.println("Hello from Work1");
  }
  
  public static void work2(){
    System.out.println("HEllo from work2!");
  }
}

A와 B의 호출 결과는 동일하다. 하지만 내부적 동작에서 그 차이를 보인다.

A코드는 자는 동안 귀중한 스레드 자원을 점유하는 반면, B는 다른 작업이 실행될 수 있도록 허용한다는 점이다. (쓰레드를 사용할 필요가 없이 메모리만 조금 더 사용했다.)

태스크를 만들때 이런 특징을 잘 활용해야 한다. 태스크가 실행되면 귀중한 자원을 점유하므로 태스크가 끝나서 자원을 해체하기 전까지 태스크를 계속 실행해야 하는 것이 바람직하다. 태스크를 블록하는 것 보다는 다음 작업을 태스크로 제출하고 현재 태스크는 종료하는 것이 바람직하다.

현실성 확인

새로운 시스템을 설계할 때 시스템을 많고 작은 동시 실행되는 태스크로 설계해서 블록할 수 있는 모든 동작을 비동기 호출로 구현한다면 병렬 하드웨어를 최대한 활용할 수 있다. 하지만 모든 것은 비동기 라는 설계 원칙을 어겨야한다. 실제로 자바의 개선된 동시성 API를 이용해 효율을 얻을 수 있는 상황을 찾아보고 모든 API를 비동기로 만드는 것을 따지지 말고 동시성 API를 사용해보길 권장한다.

비동기 API에서 예외는 어떻게 처리하는가?

Future나 리액티브 형식의 비동기 API에서는 호출된 메서드의 실제 바디는 별도의 스레드에서 호출되며 이때 발생하는 어떤 에러는 이미 호출자의 실행범위와는 관계가 없는 상황이 된다. 예상치 못한 일이 일어나면 예외를 발생시켜 다른 동작이 실행되어야 한다. 어떻게 이를 실현할 수 있을까?

  • Future를 구현한 CompleatableFuture에서는 런타임 get() 메소드에 예외를 처리할 수 있는 기능을 제공하며 예외에서 회복할 수 있도록 exceptionally() 같은 메소드도 제공한다.
  • 리액티브 형식의 비동기 API 에서는 return 대신 기존 콜백이 호출되므로 예외가 발생했을 때 실행될 추가 콜백을 만들어 인터페이스를 바꿔야한다.
void f(int x, Consumer<Integer> dealWithResult, Consumer<Throwable> dealWithException);
  • f의 바디는 다음을 수행할 수 있다.

    dealWithException(e);

콜백이 여개면 따로 제공하는 것보다는 한 객체로 이 메소드를 감싸는 것이 좋다. 예를들어 자바 9 플로 API에서는 여러 콜백을 한 객체(네 개의 콜백을 각각 대표하는 네 메소드를 포함하는 Subscriber<T> 클래스)로 감싼다. 다음은 그 예제다.

void onComplete();
void onError(Throwable throwable);
void onNext(T Item);
void f(int x, Subscriber<Integer> s);
  • 값이 있을때 onNext()
  • 도중에 에러가 발생했을 때 onError()
  • 값을 다 소진했거나 에러가 발생해서 더이상 처리할 데이터가 없을때 onComplete()

f의 바디는 다음처럼 Throwable을 가리키는 t로 예외가 일어났음을 가리킨다.

s.onError(t);

여러 콜백을 포함하는 API를 파일이나 키보드 장치에서 숫자를 읽는 작업과 비교해보자. 이들 장치가 수동적인 데이터 구조체가 아니라 "여기 번호가 나왔어요"나 "숫자가 아니라 잘못된 형식의 아이템이 나왔어요" 같은 일련의 데이터를 만들어낸 다음 마지막으로 "더이상 처리할 데이터가 없어요(파일의 끝) " 알림을 만든다.

보통 이런 종류의 호출을 메시지 또는 이벤트 라고 부른다. 예를들어 파일 리더가 3, 7, 42,를 읽은 다음 잘못된 형식의 숫자 이벤트를 내보내고 이어서 2, 파일의 끝 이벤트를 차례로 생성했다고 가정하자.

이런 이벤트를 API의 일부로 보자면 API는 이벤트의 순서(채널 프로토콜이라 불리는) 에는 전혀 개의치 않는다. 실제 부속 문서에서는 "onComplete 이벤트 다음에는 아무 이벤트도 일어나지 않음" 같은 구문을 사용해 프로토콜을 정의한다.

박스와 채널 모델

동시성 모델을 가장 잘 설계하고 개념화하려면 그림이 필요하다. 우리는 이 기법을 박스와 채널 모델이라고 부른다. 이전 예제인 f(x) + g(x)의 계산을 일반화해서 정수와 관련된 간단한 상황이 있다고 가정하자

f나 g를 호출하거나 p 함수에 인수 x를 이용해 호출하고 그결과를 q1과 q2에 전달하며 다시 이 두 호출의 결과로 함수r을 호출한 다음 결과를 출력한다.

자바로 두가지 방법으로 구현해 어떤 문제가 있는지 확인하자. 다음은 첫 번째 구현방법이다.

int t = p(x);
System.out.println( r(q1(t),q2(t)));

겉보기엔 깔끔해 보이는 코드지만 자바가 q1,q2를 차례로 호출하는데 이는 하드웨어 병렬성의 활용과 거리가 멀다.

Future를 이용해 f,g를 병렬로 평가하는 방법도 있다.

int t = p(x);
Future<Integer> a1 = executorService.submit(()-> q1(t));
Future<Integer> a2 = executorService.submit(()-> q2(t));
System.out.println( r(a1.get(), a2.get()));

System.out.println(r(q1(t),q2(t)+ s(x));

위 코드에서 병렬성을 극대화하려면 모든 다섯 함수(p, q1,q2,r,s)를 Future로 감싸야하기 때문이다. 시스템에서 많은 작업이 동시에 실행되고 있지 않다면 이 방법도 잘 동작할 수 있다. 하지만 시스템이 커지고 각각의 많은 박스와 채널 다이어그램이 등장하고 각각의 박스는 내부적으로 자신만의 박스와 채널을 사용한다면 문제가 달라진다.

많은 태스크가 get() 메소드를 호출해 Future가 끝나기를 기다리는 상태에 놓을 수 있다. 결과적으로 하드웨어의 병렬성을 제대로 활용하지 못하거나 심지어 데드락에 걸릴 수 있다. 또한 이런 대규모 시스템 구조가 얼마나 많은 수의 get()을 감당할 수 있는지 이해하기 어렵다.

자바 8에서는 CompletableFuture와 Combinator 를 이용해 문제를 해결한다. 두 Function이 있을때 compose, andThen() 등을 이용해 다른 Function을 얻을 수 있다는 사실을 확인했다.

add1은 정수 1을 더하고 dble은 정수를 두배로 만든다고 가정하면 인수를 두배로 만들고 결과에 2를 더하는 Function을 다음처럼 구할 수 있다.

Function<Integer,Integer> myfun = add1.andThen(dble);

하지만 박스와 채널 다이어그램은 콤비네이터로도 직접 멋지게 코딩할 수 있다. Function p, q1, q2, BiFunction r로 간단하게 구현할 수 있다.

p.thenBoth(q1,q2).thenCombie(r)

안타깝게도 thenBoththenCombie는 자바 Function과 BiFunction 클래스의 일부가 아니다. 앞으로 콤비네이터CompletableFuture의 사상이 얼마나 비슷하며 get()을 이용해 태스크가 기다리게 만드는 일을 피할 수 있는지 설명한다고 한다...

CompletableFuture와 콤비네이터를 이용한 동시성

동시 코딩 작업을 Future 인터페이스로 생각하도록 유도한다는 점이 Future 인터페이스의 문제다. 하지만 역사적으로 주어진 연산으로 Future를 만들고, 이를 실행하고 종료되길 기다리는 등 Future는 FutureTask 구현을 뛰어 넘는 몇가지 동작을 제공한다. 이후 버전의 자바에서는 앞서 설명한 RecursiveTask 같은 더 구조화된 지원을 제공했다.

자바 8에서는 Future 인터페이스의 구현인 CompletableFuture를 이용해 Futre를 조합할 수 있는 기능을 추가 했다. 그럼 CompletableFuture라고 부르는 이유는 뭘까? 일반적으로 Future는 실행해서 get()으로 결과를 얻을 수 있는 Callable로 만들어진다. 하지만 CompletableFuture는 실행할 코드 없이 Future를 만들 수 있도록 허용하며 complete()메소드를 이용해 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고 get()으로 값을 얻을 수 있도록 허용한다. (그래서 Completable Future라고 불린다. )

  • f(x)와 g(x)를 동시에 실행해 합계를 구하는 코드를 다음처럼 구현할 수 있다.
public class CFComplete {
  public static void main(String[] args) throws ExecutionException, InterruptedException{
    ExecutorService executorService = Executors.newFiexedthreadPool(10);
    int x = 1337;
    
    CompletableFuture<Integer> a = new CompletableFuture<>();
    executorService.submit(()-> a.complete(f(x)));
    int b = g(x);
    System.out.println(a.get()+b);
    
    executorService.shutdown();
  }
}
  • 또는 다음처럼 구현할 수 있다.
public class CFComplete{
  public static void main(String[] args) throws ExecutionException, InterruptedException{
		ExecutorService executorService = Executors.newFiexedThreadPool(10);
    int x = 1337;
    
    CompletableFuture<Integer> a = new completableFuture<>();
    executorService.submit(()-> b.complete(g(x)));
    int a = f(x);
    System.out.println(a + b.get());
    
    executorService.shutdown();
  }
}

위 두 코드는 f(x)의 실행이 끝나지 않거나 아니면 g(x)의 실행이 끝나지 않는 상황에서 get()을 기다려야 하므로 프로세싱 자원을 낭비할 수 있다. 자바 8의 CompletableFuture를 사용하면 이 상황을 해결할 수 있다.

CompletableFuture<T>에 thenCombine 메소드를 사용함으로써 두 연산 결과를 더 효과적으로 더할 수 있다. thenCombine메소드는 다음과 같은 시그니처를 갖고 있다.

CompletableFuture<V> thenCombine(CompletableFuture<U> other,
                                BiFunction<T,U,V> fn);

이 메소드는 두개의 CompletableFuture 값을 받아 한개의 새값을 만든다. 처음 두 작업이 끝나면 두 결과 모두에 fn을 적용하고 블록하지 않은 상태로 결과 Future를 반환한다.

public class CFCombine {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    int x = 1337;
    
    CompletableFuture<Integer> a = new CompletableFuture<>();
    CompletableFuture<Integer> b = new CompletableFuture<>();
    CompletableFuture<Integer> c = a.thenCombine(b, (y,z) -> y + z);
    executorService.submit(()-> a.complete(f(x)));
    executorService.submit(()-> b.complete(g(x)));
    
    System.out.println(c.get());
    executorService.shutdown();
  }
}

thenCombine 행이 핵심이다. Future a와 Futureb의 결과를 알지 못한 상태에서 thenCombine은 두 연산이 끝났을 때 스레드 풀에서 실행된 연산을 만든다. 결과를 추가하는 세번째 연산c는 다른 두 작업이 끝날 때까지는 스레드에서 실행되지 않는다. 따라서 기존의 두 가지 버전의 코드에서 발생했던 블록 문제가 어디서도 일어나지 않는다. Future의 연산이 두 번째로 종료되는 상황에서 실제 필요한 스레드는 한 개지만 스레드 풀의 두 스레드가 여전히 활성화된 상태다. 이전의 두 버전에서 y+z 연산은 f(x) 또는 g(x)를 실행한 같은 스레드에서 수행했다. 반면 thenCombine을 이용하면 f(x)와 g(x)가 끝난 다음에야 덧셈계산이 실행된다.

발행-구독 그리고 리액티브 프로그래밍

FutureCompletableFuture은 독립적 실행과 병렬성이라는 정식적 모델에 기반한다. 연산이 끝나면 get()으로 Future의 결과를 얻을 수 있다. 따라서 Future는 한 번만 실행해 결과를 제공한다.

반면 리액티브 프로그래밍은 시간이 흐르면서 여러 Future 같은 객체를 통해 여러 결과를 제공한다. 반면 온도계 객채를 예로 생각해보자. 이 객체는 매 초마다 온도 값을 반복적으로 제공한다. 또 다른 예로 웹 서버 컴포넌트 응답을 기다리는 리스너 객체를 생각할 수 있다. 이 객체는 네트워크에 HTTP 요청이 발생하길 기다렸다가 이후에 결과 데이터를 생산한다. 그리고 다른 코드에서 온도 값 또는 네트워크 결과를 처리한다.

두 플로를 합치는 예제

두 정보 소스로부터 발생하는 이벤트를 합쳐서 다른 구독자가 볼 수 있도록 발행하는 예를 통해 발행-구독의 특징을 간단하게 확인할 수 있다. 사실 이 기능은 수식을 포함하는 스프레드시트의 셀에서 흔히 제공하는 동작이다. "=C1 + C2"라는 공식을 포함하는 스프레드시트 셀 C3를 만들자. C1이나 C2의 값이 갱신되면 C3에도 새로운 값이 반영된다. 다음 코드는 셀의 값을 더할 수만 있다고 가정하자.

private class SimpleCell {
  private int value = 0;
  private String name;
  public SimpleCell(String name){
    this.name = name;
  }
}

아직은 코드가 단순한 편이며 다음처럼 몇 개의 셀을 초기화할 수 있다.

SimpleCell C2 = new SimpleCell("C2");
SimpleCell C1 = new SimpleCell("C1");

c1이나 c2의 값이 바뀌었을 때 c3가 두 값을 더하도록 어떻게 지정할 수 있을까? c1과 c2에 이벤트가 발생했을 때 c3를 구독하도록 만들어야 한다. 그러려면 다음과 같은 인터페이스 Publisher<T> 가 필요하다.

interface Publisher<T>{
  void subscribe(Subscriber<? super T> subscriber);
}

이 인터페이스는 통신할 구독자를 인수로 받는다. Subscriber<T>인터페이스는 onNext라는 정보를 전달할 단순 메소드를 포함하며 구현자가 필요한대로 이 메소드를 구현할 수 있다.

interface Subscriber<T>{
  void onNext(T t);
}

이 두 개념을 어떻게 합칠 수 있을까? 사실 Cell은 Publisher(셀의 이벤트에 구독할 수 있음)이며 동시에 Subscriber(다른 셀의 이벤트에 반응함)임을 알 수 있다.

private class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {
  private int value=0;
  private String name="";
  private List<Subscriber> subscribers = new ArrayList<>();
  
  public SimpleCell(String name){
    this.name = name;
  }
  
  @Override
  public void subscribe(Subscriber<? super Integer> subscriber){
    subscribers.add(subscriber);
  }
  
  private void notifyAllSubscribers() {
    subscribers.forEach(subscriber -> subscriber.onNext(this.value));
  }
  
  @Override
  public void onNext(Integer newValue){
    this.value = newValue;
    System.out.println(this.name + ":" + this.value);
    notifyAllSubscribers();
  }
}

다음 간단한 예제를 시도해보자.

SimpleCell C3 = new SimpleCell("C3");
SimpleCell C2 = new SimpleCell("C2");
SimpleCell C1 = new SimpleCell("C1");

C1.subscribe(c3);

c1.onNext(10); //C1의 값을 10으로 갱신
c2.onNext(20); //C2의 값을 20으로 갱신

C3는 직접 C1을 구독하므로 다음과 같은 결과가 출력된다.

C1: 10

C3: 10

C2: 20

그렇다면 C3 = C1 + C2를 어떻게 구현할까? 왼쪽과 오른쪽의 연산결과를 저장할 수 있는 별도의 클래스가 필요하다.da

public class ArithmeticCell extends SimpleCell {
  private int left;
  private int right;
  
 	public ArithmeticCell(String name){
    super(name);
  }
  public void setLeft(int left){
    this.left = left;
    onNext(left + this.right);
  }
  
  public void setRight(int right){
    this.right = right;
    onNext(right + this.left);
  }
  
}

다음처럼 조금 더 실용적인 예제를 시도할 수 있다.

ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");

c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight); //subscribe는 Subscribe(? suber Integer)를 인수로 가지는 함수를 받을수 있음.

c1.onNext(10);
c2.onNext(20);
c1.onNext(15);

즉 클래스 그자체가 Subscribe를 상속받기 때문에, 인스턴스 그자체도 넣을 수 있고
Subscriber는 Integer를 인수로가지고 있는 메소드 인터페이스를 인수로 가질수 있다는 말로 이해할 수 있겠다.

데이터가 발행자(생산자)에서 구독자(소비자)로 흐름에 착안해 개발자는 이를 업스트림 또는 다운스트림 이라 부른다. 위 예제에서 데이터 new Value는 업스트림 onNext() 메소드로 전달되고 notifyAllSubscribers()호출을 통해 다운스트림 onNext() 호출로 전달된다.

지금까지 발행-구독 핵심 개념을 확인했다. 실제로 Java9의 Flow API를 이용하려면 onNext() 이벤트 외에 onError()onComplete같은 메소드를 통해 데이터 흐름에서 예외가 발생하거나 데이터 흐름이 종료되었음을 알 수 있어야 한다.

간단하지만 플로 인터페이스의 개념을 복잡하게 만든 두가지 기능은 바로 압력역압력이다.

처음에는 압력과 역압력의 기능이 중요해보일수 있지 않지만, 스레드 활용에서 이들 기능은 필수이다.

  • 기존의 온도계 예제에서 온도계가 매 초 마다 온도를 보고했는데, 기능이 업그레이드 되면서 매 밀리초마다 온도계를 보고한다고 가정하자
    • 우리 프로그램은 이렇게 빠른 속도로 발생하는 이벤트를 아무 문제없이 처리할 수 있을까?
    • 매 초 마다 수천개의 메시지가 onNext 로 전달된다면 어떤일이 일어날까.
    • 이러한 상황을 압력(pressure)라고 부른다.

역압력

Subscriber 객체 (onNext, onError, onComplete 메소드를 포함) 를 어떻게 Publisher에게 전달해 발행자가 필요한 메소드를 호출할 수 있는지 살펴봤다.

이 객체는 Publisher -> Subscriber로 정보를 전달한다.

정보의 흐름 속도를 역압력으로 제어, 즉 Subscriber 에서 Publisher로 정보를 요청해야 할 필요가 있을 수 있다.

Publisher는 여러 Subscriber를 갖고 있으므로 역압력 요청이 한 연결에만 영향을 미쳐야 한다는 것이 문제가 될 수 있다.

  • 따라서 Java9 플로 API의 Subscriber 인터페이스는 네 번째 메소드를 포함한다.
void onSubscribe(Subscription subscription);

Publisher와 Subscriber 사이의 채널이 연결되면 첫 이벤트로 이 메소드가 호출된다. Subscription 객체는 다음처럼 Subscriber와 Publisher와 통신할 수 있는 메소드를 포함한다.

interface Subscription {
  void cancel();
  void request(long n);
}

Publisher는 Subscription 객체를 만들어 Subscriber로 전달하면 Subscriber는 이를 이용해 Publihser로 정보를 보낼 수 있다.

실제 역압력의 간단한 형태

  • 한번에 한 개의 이벤트를 처리하도록 발행-구독 연결을 구성하려면 다음과 같은 작업이 필요하다.
    • Subscriber가 OnSubscribe로 전달된 Subscription 객체를 subscription 같은 필드에 로컬로 저장한다.
    • Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동작에 channel.request(1)을 추가해 오직 한 이벤트만 요청한다.
    • 요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다.

구현이 간단해 보일 수 있지만 역압력을 구현하려면 여러가지 장단점을 생각해야 한다.

  • 여러 Subscriber가 있을 때 이벤트를 가장 느린 속도로 보낼 것인가? 아니면 각 Subscriber에게 보내지 않은 데이터를 저장할 별도의 큐를 가질 것인가?
  • 큐가 너무 커지면 어떻게 해야 할까?
  • Subscriber가 준비가 안되었다면 큐의 데이터를 폐기할 것인가?
profile
수동적인 과신과 행운이 아닌, 능동적인 노력과 치열함

0개의 댓글