오늘은 스케쥴러에 대해 알아보겠습니다.
reactive stream 에서의 스케쥴러는 operator 를 이용해 별도의 스레드에서 Pub/Sub 을 동작하게 하는 기능을 말합니다.
(webflux 에서는 PublishOn / SubscribeOn 오퍼레이터가 스케쥴러 기능 담당 합니다.)
먼저 Operator 를 이용해 Subscribe 를 별도의 스레드에서 동작 케 해보겠습니다.
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.info("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
};
위와 같은 간단한 Publisher 가 있을 때, Subscribe 하기 전에 Operator 를 중간에 끼워 보겠습니다.
Publisher<Integer> subOnPub = sub -> {
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(()-> pub.subscribe(sub));
};
#2장에서 봤었던 평범한 Operator 인데 ExecutorService 를 이용해 별도의 스레드가 pub을 subscribe 하도록 하였습니다.
마지막엔 아래처럼 Subscriber 를 구현 하여 Publisher 메시지를 받을 수 있게 됩니다.
subOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.info("onNext:{}" , integer);
}
@Override
public void onError(Throwable t) {
log.info("onError:{}", t);
}
@Override
public void onComplete() {
log.info("onComplete");
}
});
이렇게 되면 결국, 오퍼레이터를 통해 별도의 스레드에서 메시지를 subscribe 하게 됩니다.
exit 를 찍은 뒤 main 메서드가 종료되고 별도의 스레드로 pub과 sub 이 진행 되는 모습을 보실 수 있습니다.
SubscribeOn : Publising을 별도의 스레드에서 실행 하게하여 subscribe 하게 해달라!
Publish 가 Subscribe 보다 느릴 경우 사용 합니다.
이번엔 반대로 Publish 를 별도의 스레드에서 실행 해 보겠습니다.
위와 마찬가지로 아래 평범한? publisher 가 있습니다.
Publisher<Integer> pub = sub -> {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.info("request()");
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}
@Override
public void cancel() {
}
});
};
오퍼레이터를 구현하여 pub 을 subscribe 합니다. 여기서 1번과 다른 점은
subscribe 한 뒤 다시 publising 하는겁니다. (오퍼레이터니까 얼마든지 가능합니다)
마찬가지로 executorService 를 활용하여 별도의 스레드를 사용 하였습니다.
Publisher<Integer> pubOnPub = sub -> {
pub.subscribe(new Subscriber<Integer>() {
//실제 구현체들도 Publish 할 때도 single thread 로 처리 함
ExecutorService es = Executors.newSingleThreadExecutor();
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}
@Override
public void onNext(Integer integer) {
es.execute(()->sub.onNext(integer));
}
@Override
public void onError(Throwable t) {
es.execute(()->sub.onError(t));
}
@Override
public void onComplete() {
es.execute(()->sub.onComplete());
}
});
};
마지막으로는 메시지를 구독할 기본 Subscriber 를 만들어 줍니다.
(오퍼레이터를 subscribe 해줘야겠죠?)
pubOnPub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.info("onNext:{}" , integer);
}
@Override
public void onError(Throwable t) {
log.info("onError:{}", t);
}
@Override
public void onComplete() {
log.info("onComplete");
}
});
아래 결과는 subscribe 하는 부분만 별도의 스레드를 사용했음을 알 수 있습니다.
PublishOn : Pub 은 너무빠르니 메인에서 돌아가게끔하고, Sub 만 별도의 스레드로 처리하겠음!
Publish 가 Subscribe 보다 빠를 때 사용 합니다.
잠깐!!! PublishOn 의 경우 Publish 가 Subscribe 보다 빠를 때 사용한다고 하는데,
그럼 Publish를 너무 빠르게 해서 Subscribe 할때 순서 보장이 안되는거 아님?
위 주석내용 처럼 실제 Reactor 구현체도 singlethread 로 구현하기 떄문에 순서가 보장됨
executorService 의 경우 대기걸리면 블로킹 큐에 담겨서 기다려 집니다.
네, 가능합니다. 위 예제들처럼 직접 구현을 한다면 오퍼레이터가 하나 더 추가 하면 됩니다.
이번엔, 실제 reactor 구현체에서 둘 다 쓰는 예제를 보겠습니다.
public static void main(String[] args) {
Flux.range(1,10)
.publishOn(Schedulers.newSingle("pub"))
.log()
.subscribeOn(Schedulers.newSingle("sub"))
.subscribe(System.out::println);
System.out.println("exit");
}
이런 식으로 둘 다 사용할 수도 있습니다.
Flux 의 경우 was 나 netty 같은 컨테이너를 사용했으면 그 안에서 관리가 되었을텐데 main 함수에서 돌리면 별도의 스레드가 자동으로 꺼지지 않는다 이럴경우엔 pub이 끝나면 강제로 종료시켜줘야 할텐데 어떻게 할 수 있을까?
이 문제를 해결하기 위해 위에서 구현했던 연습이 빛을 발합니다...
방법은 스케쥴러를 변수처리 한 뒤, subscirber 구현체를 구현 해 주면 됩니다.
@OnComplete 될 때 스케쥴러들을 dispose (shutdown 임) 시켜 주면 됩니다!
Scheduler pub = Schedulers.newSingle("pub");
Scheduler sub = Schedulers.newSingle("sub");
Flux.range(1,10)
.publishOn(pub)
.log()
.subscribeOn(sub)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
log.info("onNext :" , integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
pub.dispose();
sub.dispose();
}
});
이상으로 스케쥴러에 대해 알아봤습니다.