비동기를 학습하면서 pubsub 구조를 들어보게 됐다. 머릿속으로 대략적인 그림은 그려지지만 어떻게 구독자가 정보를 받게되는지 궁금해서 토비님의 강의를 통해 자바에서 Reactive Programming이 왜 나왔고 어떤 개념으로 구현되어있는지 가볍게 알아봤다.
링크를 참고하면 더 자세한 정보를 얻을 수 있다.
Reactive Streams가 나오게 된 이유는 비동기-논블로킹 쓰레드를 JVM을 통해서 구현하기 위해서 나왔다. 비동기 서블릿이 나온건 2009년이고 Node.js 보다 먼저 나왔지만 사람들이 관심이 없었다고 한다.
1부터 10까지 순회하면서 값을 출력하는 로직을 구현해보자.
import java.util.Iterator;
public class Ob {
public static void main(String[] args) {
Iterable<Integer> iter = () ->
new Iterator<>() {
int i = 0;
final static intMAX= 10;
public boolean hasNext() {
return i <MAX;
}
public Integer next() {
return ++i;
}
};
for (Integer i : iter) {
System.out.println("i = " + i);
}
}
}
Java에서 제공하는 Collection은 Iterable을 상속하고 있기 때문에 forEach문을 수행할 수 있다. Iterable 내부에는 Iterator가 존재하는데 hasNext를 통해 체크하고 다음 값이 존재하면 가져와서 사용하면 된다.
정확한 정의는 아닐수도 있을 것 같다. 토비님이 인용하시는 쌍대성이란 개념을 에릭 마이어는 궁극적인 기능은 같으나 반대 방향으로 표현하는 것을 말한다고 한다. 정의로 이해하기는 추상적인 개념인 것 같아서 Iterable과 Observable의 관계를 통해서 추론하면 이해하기 수월할 것이다.
Iterable과 쌍대성(Duality) 관계를 갖는 개념으로 Observable이 있다.
Iterable을 이용할 때 우리는 pull 방식을 이용한다. 값은 Iterator가 가지고 있고, mainThread가 Iterable에 접근해서 값을 끌어와서 출력하는 방식이다.
Observable은 Iterable과 마찬가지로 값을 제공하는 역할을 하지만, mainThread가 값을 끌어와서 사용하는 것이 아니라 push해서 값을 사용한다. Observable은 Source고 신호를 던진다. 이를 통해서 받은 값을 출력하는 방식으로 구성된다.
Source -> Event/Data -> Observer 순서대로 처리된다.
public class ObservableExample {
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
setChanged();
notifyObservers(i); //push
}
}
}
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println("arg = " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
io.run();
}
}
결과적으로는 Iterator와 같은 기능을 수행하지만 방법이 다르다. Observable은 관심이 있는 Observer들에게 값을 뿌릴 수 있다.
public class ObservableExample {
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
setChanged();
notifyObservers(i); //push
}
}
}
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(Thread.currentThread().getName() + " = " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io);
System.out.println(Thread.currentThread().getName() + " = " + "EXIT");
es.shutdown();
}
}
------
<콘솔 출력>
main = EXIT
pool-1-thread-1 = 1
pool-1-thread-1 = 2
pool-1-thread-1 = 3
pool-1-thread-1 = 4
pool-1-thread-1 = 5
pool-1-thread-1 = 6
pool-1-thread-1 = 7
pool-1-thread-1 = 8
pool-1-thread-1 = 9
pool-1-thread-1 = 10
push 방식을 이용하면 별개의 쓰레드에게 작업을 손쉽게 할당할 수 있다. 하지만 Iterable을 통해 구현하면 복잡하다.
이런 방식을 GoF 디자인 패턴의 옵저버 패턴이라한다. 하지만 이 구조에는 2가지 문제점이 존재한다.
이 문제를 해결하기 위해서 확장하여 옵저버 패턴을 개선했다. 개선된 옵저버 패턴이 리액티브 프로그래밍의 하나의 축이 된다.
https://www.reactive-streams.org/
링크를 참고하면 더 자세히 볼 수 있다.
onSubscribe onNext* (onError | onComplete)?
프로토콜의 핵심이다. onSubscribe은 필수적으로 실행되어야 한다. 원하는 만큼의 값을 전달해주면 onNext를 통해 그만큼 수행할 수 있는지 확인한다. 그리고 onError, onComplete를 상황에 맞게 반환해준다.
기본적으로 Publisher가 Subscriber한테 이벤트를 전달하는데 Subscription을 통해서 Subscriber는 원하는 개수를 조절할 수 있다.
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class PubSub {
public static void main(String[] args) {
//Publisher <-- Observable
//Subscriber <-- Observer
Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
Publisher p = new Publisher() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
while (n-- > 0) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> s = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("PubSub.onSubscribe");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("PubSub.onNext " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("PubSub.onError");
}
@Override
public void onComplete() {
System.out.println("PubSub.onComplete");
}
};
p.subscribe(s);
}
}
-------------------------------------------------
<콘솔 출력>
PubSub.onSubscribe
PubSub.onNext 1
PubSub.onNext 2
PubSub.onNext 3
PubSub.onNext 4
PubSub.onNext 5
PubSub.onComplete
n값을 통해서 원하는 값의 수량만큼 출력할 수 있다. batch를 하듯이 값이 존재할 때 까지 1개씩 값을 계속 요청한다.
추가적으로 spec에 나온 내용중 publisher가 Thread를 여러개 만들어서 병렬적으로 데이터를 보내줄 수 있는가?에 대한 질문에 안된다고 한다. Subscriber는 한 순간에 한 쓰레드에서 받고 연속적으로 데이터가 온다고 가정하고 받는다.
Publisher는 병렬적으로 요청을 보내지 않는다. 이렇게 구현하면 너무 복잡해져서 장점을 잃어버린다.
Future는 원시적인 Java 비동기 방법이다.
gRPC도 구글에서 만든 오픈소스 라이브러리인데, reactive 의 observable과 subscriber의 컨셉으로 구현되어있다.
https://github.com/reactive-streams/reactive-streams-jvm/
https://youtu.be/8fenTR3KOJo
https://techblog.woowahan.com/2619/