Reactor는 Reactive Streams 명세를 기반으로한 JVM에서 Reactive Non-Blocking 애플리케이션을 생성하기 위한 Java 라이브러리 입니다.
앞에서는 Mono와 Flux를 생성하는 방법과 그들이 가지고 있는 데이터를 변환하는 방법에 대하여 알아보았습니다. 이번에는 Mono와 Flux 두 클래스가 어떻게 동작하는지 살펴보겠습니다.
정의상, 모든 스트림은 Lazy 합니다. 이 뜻은 사용자가 스트림을 소비하기 전까지 스트림은 어떠한 동작도 실행하지 않는다는 것을 말합니다. Mono와 Flux는 subscribe() 라는 메서드를 통하여 스트림을 소비할 수 있습니다.
이러한 접근방식의 최대 강점은 명령형 방식으로 변환을 수행하는 것보다 메모리를 적게 사용하고 모든 작업이 Thread-safe 하다는 것입니다.
public void fluxAreLazy() {
Instant beginning = Instant.now();
Flux<Integer> flux = Flux.range(1, 5)
.flatMap(n -> {
try { Thread.sleep(100); return Mono.just(n * n); }
catch (InterruptedException e) { return Mono.error(e); }
});
System.out.println("After step1, program runs for: " +
Utils.timeDifference(beginning, Instant.now()));
flux.subscribe(System.out::println);
System.out.println("The whole test last: " + Utils.timePastFrom(beginning));
}
After step1, program runs for: 516ms
1
4
9
16
25
The whole test last: 1059ms
Flux와 Mono는 변경할 수 없습니다. 이는 그들중 어떤 것의 인스턴스도 전혀 수정할 수 없다는것을 의미합니다. 어떤 메서드를 호출하면 Flux 또는 Mono의 새 인스턴스가 반환됩니다.
Flux<Integer> flux = Flux.range(1, 5);
flux.flatMap(n -> {
try { Thread.sleep(100); return Mono.just(n * n); }
catch(InterruptedException e) { return Mono.error(e); }
});
flux.subscribe(System.out::println);
1
2
3
4
5
수정한 Flux를 사용하기 위해서는 아래와 같이 데이터를 가공한 Flux를 받는 변수를 선언해서 사용해야 한다.
Flux<Integer> flux2 = Flux.range(1, 5);
Flux<Integer> flux3 = flux2.flatMap(n -> {
try { Thread.sleep(100); return Mono.just(n * n); }
catch (InterruptedException e) { return Mono.error(e); }
});
flux3.subscribe(System.out::println);
1
4
9
16
26
여기서 무한하다는 것은 종료되지 않는것을 의미합니다.
//이를 증명하기 위해서는 시계와 같이 flux ticking을 생성합니다.
Flux.interval(Duration.ofMillis(100))
.map(i -> "Tick : " + i)
.subscribe(System.out::println);
//여기서 Flux는 절대로 값을 사출하지 않으며 종료되지 않습니다.
위의 무한한 Flux는 종료될 수 있는데 그 사용방법은 아래와 같다.
Disposable disposable = Flux.interval(Duration.ofMillis(100))
.map(i -> "Tick : " + i)
.subscribe(System.out::println);
try { Thread.sleep(1000); }
catch (InterruptedException e) { e.printStackTrace(); }
disposable.dispose();
System.out.println("Stopped flux");
Flux.range(1, 3)
.flatMap(n -> {
System.out.println("In flatMap n = " + n + " --- Thread is : " +
Thread.currentThread.getName());
try {
Thread.sleep(100);
System.out.println("After Thread.sleep n = " + n);
return Mono.just(n):
} catch (InterruptedException e) {
return Mono.error(e);
}
})
.map(n -> { System.out.println("In map n = " + n); return n; })
.subscribe(System.out::println);