Project Reactor
- Pivotal에서 개발
- Spring Reactor에서 사용
- Mono와 Flux Publisher 제공
- Reactive Streams를 매우 잘 준수함
Project Reactor - Flux
- 0..n개의 Item을 전달
- 에러가 발생하면 error signal을 전달하고 종료
- backpressure 지원
Flux - Example
@RequiredArgsConstructor
public class SimpleSubscriber<T> implements Subscriber<T> {
private final Integer count;
@Override
public void onSubscribe(Subscription s) {
log.info("subscribe");
s.request(count);
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(100);
}
@Override
public void onError(Throwable t) {
log.error("error: {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
@Slf4j
public class FluxSimpleExample {
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
output
09:32:48 [main] - start main
09:32:48 [main] - subscribe
09:32:48 [main] - request: 2147483647
09:32:48 [main] - item: 1
09:32:48 [main] - item: 2
09:32:48 [main] - item: 3
09:32:49 [main] - item: 4
09:32:49 [main] - item: 5
09:32:49 [main] - complete
09:32:49 [main] - end main
- 고정된 5개의 수를 Subscripber개를 전달함
Flux - subscribeOn
- Subscripber 다른 스레드에서 실행하고 싶을 때 사용
@Slf4j
public class FluxSimpleSubscribeOnExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.map(i -> {
log.info("map {}", i);
return i;
})
.subscribeOn(Schedulers.single())
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
output
09:57:21 [main] - start main
09:57:21 [main] - subscribe
09:57:21 [main] - request: 2147483647
09:57:21 [main] - end main
09:57:21 [single-1] - map 1
09:57:21 [single-1] - item: 1
09:57:21 [single-1] - map 2
09:57:21 [single-1] - item: 2
09:57:22 [single-1] - map 3
09:57:22 [single-1] - item: 3
09:57:22 [single-1] - map 4
09:57:22 [single-1] - item: 4
09:57:22 [single-1] - map 5
09:57:22 [single-1] - item: 5
09:57:22 [single-1] - complete
- single-1 스레드에서 Subscripbe가 실행됨.
Flux - subscribe
@Slf4j
public class FluxNoSubscribeExample {
public static void main(String[] args) {
log.info("start main");
getItems();
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
log.info("start getItems");
for (int i = 0; i < 5; i++) {
fluxSink.next(i);
}
fluxSink.complete();
log.info("end getItems");
});
}
output
09:21:03 [main] - start main
09:21:03 [main] - end main
- CompletionFuture의 문제점 중 하나는 반환하는 함수를 실행하는 순간 지연 로딩을 할 수 없는 채로 함수가 실행된다는 것.
- 반면 Flux는 subscribe하지 않으면 아무 일도 일어나지 않는다
- getItems()를 호출헤서 Flux를 반환받긴 했지만, subscribe 하지 않아 아무일도 일어나지 않았음.
Flux - backPressure
@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber<T> implements Subscriber<T> {
private final Integer count;
@Override
public void onSubscribe(Subscription s) {
log.info("subscribe");
s.request(count);
log.info("request: {}", count);
}
}
@Slf4j
public class FluxSimpleRequestThreeExample {
public static void main(String[] args) {
getItems().subscribe(new SimpleSubscriber<>(3));
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
output
09:02:41 [main] - subscribe
09:02:41 [main] - request: 3
09:02:41 [main] - item: 1
09:02:41 [main] - item: 2
09:02:41 [main] - item: 3
- backpressure 개수인 3개만 요청 받고 종료됨
Flux - backPressure (Continuous Request)
@Slf4j
public class ContinuousRequestSubscriber<T>
implements Subscriber<T> {
private final Integer count = 1;
private Subscription subscription = null;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
log.info("subscribe");
s.request(count);
log.info("request: {}", count);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("item: {}", t);
Thread.sleep(1000);
subscription.request(1);
log.info("request: {}", count);
}
}
@Slf4j
public class FluxContinuousRequestSubscriberExample {
public static void main(String[] args) {
getItems().subscribe(new ContinuousRequestSubscriber<>());
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
output
09:27:29 [main] - subscribe
09:27:29 [main] - request: 1
09:27:29 [main] - item: 1
09:27:30 [main] - request: 1
09:27:30 [main] - item: 2
09:27:31 [main] - request: 1
09:27:31 [main] - item: 3
09:27:32 [main] - request: 1
09:27:32 [main] - item: 4
09:27:33 [main] - request: 1
09:27:33 [main] - item: 5
09:27:34 [main] - request: 1
09:27:34 [main] - complete
Flux - error
@Slf4j
public class FluxErrorExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
fluxSink.next(0);
fluxSink.next(1);
var error = new RuntimeException("error in flux");
fluxSink.error(error);
});
}
}
output
10:08:09 [main] - start main
10:08:09 [main] - subscribe
10:08:09 [main] - request: 2147483647
10:08:09 [main] - item: 0
10:08:09 [main] - item: 1
10:08:10 [main] - error: error in flux
10:08:10 [main] - end main
Flux - complete
@Slf4j
public class FluxCompleteExample {
public static void main(String[] args) {
log.info("start main");
getItems().subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.create(fluxSink -> {
fluxSink.complete();
});
}
}
output
10:08:54 [main] - start main
10:08:54 [main] - subscribe
10:08:54 [main] - request: 2147483647
10:08:54 [main] - complete
10:08:54 [main] - end main
Project reactor - Mono
- 0..1개의 item을 전달
- 에러가 발생하면 error signal 전달하고 종료
- 모든 item을 전달했다면 complete signal 전달하고 종료
- onNext를 호출 이후 자동으로 complete를 호출
Flux에서 하나의 값만 넘겨주면 되는데 왜 Mono를 사용할까?
Mono
@Slf4j
public class MonoSimpleExample {
@SneakyThrows
public static void main(String[] args) {
log.info("start main");
getItems()
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
Thread.sleep(1000);
}
private static Mono<Integer> getItems() {
return Mono.create(monoSink -> {
monoSink.success(1);
});
}
}
- 1개의 item만 전달하기 때문에
next
하나만 실행하면 complete
가 보장된다.
- 혹은 전달하지 않고 complete를 하면 값이 없다는 것을 의미함
- 하나의 값이 있거나 없다. (
Optinal
)
Mono와 Flux
Mono<T>: Optional<T>
- 없거나 혹은 하나의 값
Mono<Void>
로 특정 사건이 완료되는 시점을 가리킬 수도 있다
Flux<T>: List<T>
Flux를 Mono로
@Slf4j
public class FluxToMonoExample {
public static void main(String[] args) {
log.info("start main");
Mono.from(getItems()).subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
output
10:46:05 [main] - start main
10:46:06 [main] - subscribe
10:46:06 [main] - request: 2147483647
10:46:06 [main] - item: 1
10:46:06 [main] - complete
10:46:06 [main] - end main
- Mono.from으로 Flux를 Mono로 변경하면 첫 번쨰 값만 전달된다.
- 그러면 저 리스트를 통째로 받는 방법이 있을까?
Flux를 Mono로 (collectList)
@Slf4j
public class FluxToListMonoExample {
public static void main(String[] args) {
log.info("start main");
getItems()
.collectList()
.subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("end main");
}
private static Flux<Integer> getItems() {
return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
}
output
10:41:41 [main] - start main
10:41:42 [main] - subscribe
10:41:42 [main] - request: 2147483647
10:41:42 [main] - item: [1, 2, 3, 4, 5]
10:41:42 [main] - complete
10:41:42 [main] - end main
- Flux의 값들을 collect 하고 complete 이벤트가 발생하는 시점에 모은 값들을 전달
Mono를 Flux로 (flux()
)
@Slf4j
public class MonoToFluxExample {
public static void main(String[] args) {
log.info("start main");
getItems().flux()
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Mono<List<Integer>> getItems() {
return Mono.just(List.of(1, 2, 3, 4, 5));
}
}
output
10:52:25 [main] - start main
10:52:25 [main] - subscribe
10:52:25 [main] - request: 2147483647
10:52:25 [main] - item: [1, 2, 3, 4, 5]
10:52:25 [main] - complete
10:52:25 [main] - end main
flux()
함수로 Mono를 next 한 번 호출하고 onComplete를 호출하는 Flux로 변환했다.
Mono를 Flux로 (flatMapMany()
)
@Slf4j
public class ListMonoToFluxExample {
public static void main(String[] args) {
log.info("start main");
getItems()
.flatMapMany(value -> Flux.fromIterable(value))
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("end main");
}
private static Mono<List<Integer>> getItems() {
return Mono.just(List.of(1, 2, 3, 4, 5));
}
}
output
10:54:52 [main] - start main
10:54:52 [main] - Using Slf4j logging framework
10:54:52 [main] - subscribe
10:54:52 [main] - request: 2147483647
10:54:52 [main] - item: 1
10:54:52 [main] - item: 2
10:54:52 [main] - item: 3
10:54:52 [main] - item: 4
10:54:53 [main] - item: 5
10:54:53 [main] - complete
10:54:53 [main] - end main
flatMapMany()
함수로 Mono의 값으로 여러 개의 값을 전달하는 Flux를 만들고 연결함.
많은 도움이 되었습니다, 감사합니다.