[Reactor Practice] Blocking to Reactive

kimchj·2024년 3월 12일

Reactor

목록 보기
11/11

1. Flux.defer(() -> Publisher)

Mono supplier 파라미터로 받아 구독 시 해당 supplier 구독하여 반환되는 값 전달 (구독될 때, 값이 캡처되어 반환)

2. subscribeOn(Scheduler)

subscriber chain을 함께 별도의 쓰레드로 분리

Blocking to Reactive Reactor Practice

package io.pivotal.literx;

//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;

import io.pivotal.literx.domain.User;
import io.pivotal.literx.repository.BlockingRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
 * Learn how to call blocking code from Reactive one with adapted concurrency strategy for
 * blocking code that produces or receives data.
 *
 * For those who know RxJava:
 *  - RxJava subscribeOn = Reactor subscribeOn
 *  - RxJava observeOn = Reactor publishOn
 *  - RxJava Schedulers.io <==> Reactor Schedulers.elastic
 *
 * @author Sebastien Deleuze
 * @see Flux#subscribeOn(Scheduler)
 * @see Flux#publishOn(Scheduler)
 * @see Schedulers
 */
public class Part11BlockingToReactive {

//========================================================================================

	// TODO Create a Flux for reading all users from the blocking repository deferred until the flux is subscribed, and run it with a bounded elastic scheduler
	Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
		return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.boundedElastic());
	}

//========================================================================================

	// TODO Insert users contained in the Flux parameter in the blocking repository using a bounded elastic scheduler and return a Mono<Void> that signal the end of the operation
	Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
		return flux.publishOn(Schedulers.boundedElastic()).doOnNext(user -> repository.save(user)).then();
	}

}
profile
ㅎㅇ

0개의 댓글