✏️ 기본적인 사용방법
- 아래 코드를 보면 Mono 에서 data 를 배포할 때
deferContextual()
를 호출해 context 를 조회해 data 를 생성하고 있다.
- 이후
subscribeOn()
과 publishOn
을 사용해 publisher 와 subscriber 의 Thread 를 각각 나누고,
parallel -1 에서 context 를 한번 더 조회해 data 를 수정했다.
- 마지막으로 사용할 context 를 정의하기 위해
contextWrite()
를 호출해 값을 push 했다.
Mono<String> mono = Mono.deferContextual(ctx ->
Mono
.just("Hello" + " " + ctx.get(key))
.doOnNext(Start::doOnNext)
)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.transformDeferredContextual((mono2, ctx) ->
mono2.map(data ->
data + " " + ctx.get(key)))
.contextWrite(context -> context.put(key, "Reactor"));
mono.subscribe(Start::onNext);
- 코드를 실행하면 doOnNext 와 onNext 로그의 Thread 가 다른 것을 확인할 수 있다.
- parallel - 1 에서 한번 더 context 를 더해줬기 때문에 onNext 로그에서 context 값이 한번 더 출력되는 것을 확인할 수 있다.
- 즉, context 는 Thread 가 다르더라도 공유될 수 있다는 의미이다.
[boundedElastic-1] INFO -- doOnNext() : Hello Reactor
[parallel-1] INFO -- onNext() : Hello Reactor Reactor
✏️ http 통신 예시
zip(Mono1, Mono2)
- 두개의 Mono 의 emit 된 값을 tuple 이라는 자료구조를 이용해 하나로 합쳐주는 Operator
- 아래 예제에서는 각각 body 와 header 값으로 합처 http message 로 사용할 수 있게됨
private String HEADER_TOKEN = "authToken";
public void http() {
Mono<String> mono = postBook(
Mono.just(
new Book("ab-11-11-11",
"Reactor"))
)
.contextWrite(context -> context.put(HEADER_TOKEN, "adsfadsf"));
mono.subscribe(Start::onNext);
}
private Mono<String> postBook(Mono<Book> book) {
return Mono.zip(book, Mono.deferContextual(ctx -> Mono.just(ctx.get(HEADER_TOKEN))))
.flatMap(tuple -> Mono.just(tuple))
.flatMap(tuple -> {
String response = "POST the book(" +
tuple.getT1().getIsbn() + ", " +
tuple.getT1().getName() +
"with token : " +
tuple.getT2() + ")";
return Mono.just(response);
});
}
[main] INFO -- onNext : POST the book(ab-11-11-11, Reactorwith token : adsfadsf)