Publisher의 경우 구현 라이브러리에 따라 다르다.
- Project reactor는 Spring Webflux에서 기반이 되는 라이브러리
- RxJava는 Netflix에서 만들고, Rx를 Java 형태로 포팅해서 제공하는 라이브러리
- 처음 시작이 C#이였지만 RxJs, RxRuby 등으로 지원
- Mutiny는 Hibernate Reactive로서 만들어짐
- Pivotal 사에서 개발
- Spring reactor에서 사용
- Mono와 Flux라는 publisher 제공
- Mono, Flux는 Publisher를 상속받는 CorePublisher를 구현
- 0 .. n개의 item 전달
- 에러 발생시, error signal 전달 후 종료
- 모든 item을 전달했다면 complete signal 전달 후 종료
- backPressure 지원
-- SimpleSubscriber Class
@RequiredArgsConstructor
public class SimpleSubscriber<T> implements Subscriber<T> {
private final Integer count;
@Override
public void onSubscribe(Subscription s) {
log.info("subscribe");
s.request(count);
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(100);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
-- FluxSimpleExample Class
@Slf4j
public class FluxSimpleExample {
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
- getItems()를 통해 Publisher인 Flux를 반환
- 해당 Publisher에 subscribe()
- onSubscribe() 호출시 Subscriber는 Subscription에 원하는 n개만큼의 request() 요청 가능
@Slf4j
public class FluxSimpleSubscribeOnExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.map(i -> {
log.info("map {}", i);
return i;
})
.subscribeOn(Schedulers.single()) // executor의 Single thread
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
- 다른 Thread에서 따로 실행하고 싶다면, subscribeOn() 호출
- Schedulers.single()은 executor의 SingleThread와 유사
- end main이 먼저 출력된 이유
- subscribeOn()은 Flux의 실행을 다른 Thread에서 수행하도록 예약
- 예약만 할 뿐, 그 실행은 비동기적으로 나중에 수행
- 때문에 main Thread에서는 end main을 출력
@Slf4j
public class FluxNoSubscribeExample {
public static void main(String[] args) {
log.info("start main");
getItems(); // subscribe() X
log.info("end main");
}
}
private static Flux<Integer> getItems() { // Flux를 반환했지만, subscribe하지 않으면 아무 일도 X
return Flux.create(fluxSink -> {
log.info("start getItems");
for (int i = 0; i < 5; i++) {
fluxSink.next(i);
}
fluxSink.complete();
log.info("end getItems");
});
}
lazy loading
- subscribe하지 않으면, 아무 일도 일어나지 않는다.
- CompletableFuture는 future가 반환되는 순간 지연로딩 없이 바로 실행되는 문제가 있었음.
- getItems()는 Flux를 반환하였지만, subscribe() 하지 않아서 아무 일도 X
- 위 결과는 start main과 end main만 출력
@Slf4j
public class FluxErrorExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new SimpleSubscriber <>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
fluxSink.next(0);
fluxSink.next(1);
var error = new RuntimeException("error in flux");
fluxSink.error(error);
});
}
}
- Flux는 error 한번 검출시, Flux 진행 X
@Slf4j
public class FluxCompleteExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new SimpleSubscriber <>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
fluxSink.complete();
});
}
}
- complete() 호출시 즉각종료
- 0 .. 1개의 item 전달
- 에러가 발생하면 error signal 전달 후 종료
- 모든 item을 전달헀다면 complete signal 전달 후 종료
- 1개의 item만 전달하기 때문에 next 하나만 실행하면 complete가 보장됨
- 혹은 전달하지 않고, complete를 하면 값이 없다는 것을 의미
- 하나의 값이 있거나 없다
Mono와 Flux
- Mono는 Optional, 없거나 혹은 하나의 값
- Mono<Void'>로 특정 사건이 완료되는 시점을 가리킬 수도 있다.
- Flux는 List, 무한하거나 유한한 여러개의 값을 의미
@Slf4j
public class FluxToMonoExample {
public static void main(String[] args) {
log.info("start main");
Mono.from(getItems()).subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("end main");
}
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
- Mono.from으로 Flux를 Mono로
- 첫 번째 값만 전달.
- 1, 2, 3, 4, 5 중 1만 반환 후 complete
@Slf4j
public class FluxToListMonoExample {
public static void main(String[] args) {
log.info("start main");
getItems()
.collectList()
.subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("end main");
}
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
- collectList()
- Publisher인 Flux의 값들을 내부 배열로 저장 후 한번에 전달
- Flux의 값들을 collect 하다가 complete 이벤트가 발생하는 시점에 모은 값들을 전달
- 결과 - item : [1,2,3,4,5]
@Slf4j
public class MonoToFluxExample {
public static void main(String[] args) {
log.info("start main");
getItems().flux()
.subscribe(new SimpleSubscriber <>(Integer.MAX_VALUE));
log.info("end main");
}
private static Mono<List<Integer >> getItems() {
return Mono.just(List.of(1, 2, 3, 4, 5));
}
}
- flux()
- Mono<List<Integer'>>를 Flux<List<Integer'>>로 변환
- Mono.just(x)
- x 하나만 emit하는 Mono
- Reactor에서 단일 값을 감싸는 Publisher를 만드는 가장 기본적인 방법
- Mono.just(List.of(1, 2, 3, 4, 5))는 List 하나를 값으로 가지는 Mono 객체 생성
- 단 한 번만 onNext()로 emit(방출) 하고 complete
- Mono를 next 한 번 호출하고, onComplete를 호출하는 Flux로 변환
@Slf4j
public class ListMonoToFluxExample {
public static void main(String[] args) {
log.info("start main");
getItems() // Mono 상태
.flatMapMany(value -> Flux.fromIterable(value)) // flux 생성. flux로 flux를 생성? thenCompose와 유사
.subscribe(new SimpleSubscriber <>(Integer.MAX_VALUE));
log.info("end main");
}
private static Mono<List<Integer>> getItems() {
return Mono.just(List.of(1, 2, 3, 4, 5));
}
}
- flatMapMany() + Flux.fromIterable
- Mono의 값으로 여러 개의 값을 전달하는 Flux를 만들고 연결