Publisher 와 Subscriber 사이에는 JAVA 의 stream 처럼 Operator 를 둘 수가 있는데요.
Operator 는 Pub /Sub 간 데이터 흐름에서 데이터를 중간에 가공할 수 있도록 해주는 역할을 하고 있습니다. JAVA 의 stream 과 같은 개념 입니다.
예제를 보면 이해가 훨씬 쉽기에 토비님 유뷰브에 있는 예제를 살짝 가공 하여 가져왔습니다.
@DisplayName("지난 챕터 내용 + 확장")
@Test
void operatorTest() {
Publisher<Integer> pub = iterPub(Stream.iterate(1,a->a+1).limit(10).collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, s->s*10); //이러한 걸 Operator 라고 한다.
Publisher<Integer> map2Pub = mapPub(mapPub, s->-s); //이러한 걸 Operator 라고 한다.
map2Pub.subscribe(logSub());
}
private Publisher<Integer> iterPub(Iterable<Integer> iter) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iter.forEach(sub::onNext);
sub.onComplete();
}catch(Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
private <T> Subscriber<T> logSub() {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe:");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T integer) {
log.info("onNext:{}",integer);
}
@Override
public void onError(Throwable t) {
log.info("onError:{}",t);
}
@Override
public void onComplete() {
log.info("onComplete");
}
};
}
위 테스트코드인 operatorTest() 내용을 보시면 mapPub 이라는 메서드 안에서 최초 선언한 Publisher 인 pub 와 function 인터페이스 구현체를 받아 Publisher 로 다시 리턴 하는 모습을 볼 수 있는데요.
그 다음 바로 map2Pub 메서드 안에서 다시 mapPub 과 function 인터페이스 구현체를 받아 다시 Publisher로 리턴하는 모습을 볼 수 있습니다.
이렇듯 Publisher 와 Subscriber 사이에는 Operator 가 여러 개 생길 수가 있습니다.
그리고 마지막 Publisher 에서 subscribe() 메서드를 통해 logSub 이라는 Subscriber 를 통해 Publisher 를 Subscribe 하는 형태 입니다.
전체 해석을 해본다면, IterPub() 를 통해 Publisher 를 생성 한 뒤
[ logPub() 내용을 보면 s.request(Long.MAX_VALUE);
라는 구문을 통해 Subscriber 는 Publisher 에게 가지고 있는 데이터를 그냥 몽땅 다 달라는 요청을 하는 것이며,
Publiser 는 iter.forEach(sub::onNext); 구문을 통해 iterator 를 모두 순회 한 뒤
sub.onComplete() 메서드를 통해 pub/sub 을 완료 처리 합니다. ]
mapPub operator 와 map2Pub operator 2개를 거친 뒤 subscriber 에게 데이터가 전달 되
는 형태 입니다.
결국 자바 스트림에 있는 Operator와 마찬가지로 Publisher 와 Subscriber 간 Operator 도 이와 같은 형태의 모습을 취하고 있다고 볼 수 있겠습니다.
Operator 개념 자체는 자바 스트림을 알고 있다면 쉽게 이해할 수 있는 내용입니다. 하지만 위 예제를 통해 Publisher 와 Subscriber 간 Operator 가 어떻게 작동 하는지에 대한 내용을 알 수 있기에 내용을 기록 해 봅니다.