Mono supplier 파라미터로 받아 구독 시 해당 supplier 구독하여 반환되는 값 전달 (구독될 때, 값이 캡처되어 반환)
subscriber chain을 함께 별도의 쓰레드로 분리
package io.pivotal.literx;
//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;
import io.pivotal.literx.domain.User;
import io.pivotal.literx.repository.BlockingRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
* Learn how to call blocking code from Reactive one with adapted concurrency strategy for
* blocking code that produces or receives data.
*
* For those who know RxJava:
* - RxJava subscribeOn = Reactor subscribeOn
* - RxJava observeOn = Reactor publishOn
* - RxJava Schedulers.io <==> Reactor Schedulers.elastic
*
* @author Sebastien Deleuze
* @see Flux#subscribeOn(Scheduler)
* @see Flux#publishOn(Scheduler)
* @see Schedulers
*/
public class Part11BlockingToReactive {
//========================================================================================
// TODO Create a Flux for reading all users from the blocking repository deferred until the flux is subscribed, and run it with a bounded elastic scheduler
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.boundedElastic());
}
//========================================================================================
// TODO Insert users contained in the Flux parameter in the blocking repository using a bounded elastic scheduler and return a Mono<Void> that signal the end of the operation
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.boundedElastic()).doOnNext(user -> repository.save(user)).then();
}
}