Reactor는 데이터를 조작하는 다양한 연산자를 제공한다. 이 연산자는 스트림을 받아서 다른 데이터로 구성된 스트림을 반환한다.
본격적으로 연산자를 보기 전에 스트림 데이터를 생성해보자. 아래의 코드는 피보나치 수열을 생성하는 코드이다.
Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
if(state.getT1() < 0) sink.complete();
else sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.subscribe(System.out::println);
0
1
1
2
3
...
1779979416004714189
2880067194370816120
4660046610375530309
7540113804746346429
@Test
void filter() {
fibonacciGenerator
.filter(a -> a % 2 == 0)
.subscribe(System.out::println);
}
0
2
8
34
144
610
...
679891637638612258
2880067194370816120
@Test
void filterWhen() {
fibonacciGenerator
.filterWhen(a -> Mono.just(a < 10))
.subscribe(System.out.println);
}
0
1
1
2
3
5
8
@Test
void take() {
fibonacciGenerator
.take(10)
.subscribe(System.out::println);
}
0
1
1
2
3
5
8
13
21
34
@Test
void takeLast() {
fibonacciGenerator
.takeLast(10)
.subscribe(System.out::println);
}
99194853094755497
160500643816367088
259695496911122585
420196140727489673
679891637638612258
1100087778366101931
1779979416004714189
2880067194370816120
4660046610375530309
7540113804746346429
@Test
void last() {
fibonacciGenerator
.last()
.subscribe(System.out::println);
}
7540113804746346429
위의 take 연산자는 데이터를 선택하는 관점이었다면, skip은 데이터를 거르는 관점이다.
@Test
void skip1() {
fibonacciGenerator
.skip(90)
.subscribe(System.out::println);
}
2880067194370816120
4660046610375530309
7540113804746346429
데이터의 발행후 지정된 시간만큼 데이터를 거른다.
@Test
void skip2() {
fibonacciGenerator
.skip(Duration.ofMillis(10))
.subscribe(System.out::println);
}
데이터의 발행후 뒤에서부터 지정된 수만큼의 데이터를 거른다.
@Test
void skipLast() {
fibonacciGenerator
.skipLst(90)
.subscribe(System.out::println);
}
0
1
1
데이터 발행후 특정 조건에 최초로 일치하는 데이터를 만나기 전까지 모든 데이터를 거른다.
@Test
void skipUntil() {
fibonacciGenerator
.skipUntil(t -> t > 10000000000000000L)
.subscribe(System.out::println);
}
14472334024676221
23416728348467685
37889062373143906
61305790721611591
99194853094755497
160500643816367088
259695496911122585
420196140727489673
679891637638612258
1100087778366101931
1779979416004714189
2880067194370816120
4660046610375530309
7540113804746346429
데이터를 다른 포맷으로 변경하는것은 매우 흔한 일입니다. Reactor는 이러한 상황을 위하여 다양한 연산자를 제공하며, 이를 이용하면 단순히 데이터의 변환 뿐만 아닌 수정도 가능합니다.
아래의 예제는 피보나치 수열에서 넘어오는 수를 로마숫자로 변환하는 예제입니다.
public class RomanNumber {
TreeMap<Integer, String> romanMap = mew TreeMap<>();
RomanNumber() {
romanMap.put(1000, "M");
romanMap.put(900, "CM");
romanMap.put(500, "D");
romanMap.put(400, "CD");
romanMap.put(100, "C");
romanMap.put(90, "XC");
romanMap.put(50, "L");
romanMap.put(40, "XL");
romanMap.put(10, "X");
romanMap.put(9, "IX");
romanMap.put(5, "V");
romanMap.put(4, "IV");
romanMap.put(41, "I");
}
String toRomanNumeral(int number) {
int l = romanMap.floorKey(number);
if(number == l) return romanMap.get(number);
return romanMap.get(l) + toRomanNumeral(number - 1);
}
}
@Test
void map() {
RomanNumber numberConvertor = new RomanNumber();
fibonacciGenerator
.skip(1)
.take(10)
.map(t -> numberConvertor.toRomanNumeral(t.intValue()))
.subscribe(System.out::println);
}
I
I
II
III
V
VVVV
XXXX
XXXXXXXXXXXX
XXXXXXXXXXXXXXXXXXXXXXXXX
LLLLLL
위에서 본 map() 연산자 메서드는 one-to-one 변환에는 효과적이지만 one-to-n 변환은 다룰 수 없습니다.
public class Factorization {
public static Collection<Integer> findFactor(int number) {
ArrayList<Integer> factors = new ArrayList<>();
for(int i = 1; i <= number; i++) {
if(number % i == 0) factors.add(i);
}
return factors;
}
}
@Test
void map2() {
fibonacciGenerator
.skip(1)
.take(10)
.map(t -> Factorization.findFactor(t.intValue()))
.subscribe(System.out::println);
}
[1]
[1]
[1, 2]
[1, 3]
[1, 5]
[1, 2, 4, 8]
[1, 13]
[1, 3, 7, 21]
[1, 2, 17, 34]
[1, 5, 11, 55]
위에서 생성된 정수 배열의 집합을 받아서 다시 정수 요소를 가지는 스트림으로 변환하기 위해서 flatMap을 사용할 수 있습니다.
@Test
void flatMap() {
fibonacciGenerator
.skip(1)
.take(10)
.flatMap(t -> Flux.fromIterable(Factorization.findFactor(t.intValue())))
.subscribe(System.out::println);
}
1
1
1
2
1
3
1
5
1
2
4
8
1
13
1
3
7
21
1
2
17
34
1
5
11
55
@Test
void repeat() {
fibonacciGenerator
.take(10)
.repeat(2)
.subscribe(System.out::println);
}
0
1
1
2
3
5
8
13
21
34
0
1
1
2
3
5
8
13
21
34
0
1
1
2
3
5
8
13
21
34
collect 연산자는 stream에서 발행되는 데이터를 모아서 Collection 형태로 가공할 수 있는 연산자를 제공합니다.
@Test
void collectList() {
fibonacciGenerator
.take(10)
.collectList()
.subscribe(System.out::println)
}
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
@Test
void collectSortList() {
fibonacciGenerator
.take(10)
.collectSortedList((x, y) -> -1 * Long.compare(x, y))
.subscribe(System.out::println);
}
[34, 21, 13, 8, 5, 3, 2, 1, 1, 0]
@Test
void collectMap() {
fibonacciGenerator
.take(10)
.collectMap(t -> t % 2 == 0 ? "even" : "odd")
.subscribe(System.out::println);
}
{even=34, odd=21}
@Test
void collectMultiMap() {
fibonacciGenerator
.take(10)
.collectMultiMap(t -> t % 2 == 0 ? "even" : "odd")
.subscribe(System.out::println);
}
{even=[0, 2, 8, 34], odd=[1, 1, 3, 5, 13, 21]}
reduce메서는 스트림의 모든 데이터를 통합하여 하나의 값으로 반환합니다.
@Test
void reduce() {
fibonacciGenerator
.take(10)
.reduce((x, y) -> x + y)
.subscribe(System.out::println);
}
88
Predicate 구현체를 인자로 받아서 모든 연산자가 조건을 만족하는지 검증합니다.
@Test
void all() {
fibonacciGenerator
.take(10)
.all(x -> x > 0)
.subscribe(System.out::println);
}
false
Predicate 구현체를 인자로 받아서 하나의 연산자라도 조건을 만족하는지 검증합니다.
@Test
void concatWith() {
fibonacciGenerator
.take(10)
.any(x -> x > 0)
.subscribe(System.out::println);
}
true
공급자는 하나의 공급자로 제한되지 않습니다. 따라서 각기 다른 이벤트 공급자가 발행하는 데이터를 병합할 수 있는 연산자가 제공됩니다.
@Test
void concatWith() {
fibonacciGenerator
.take(10)
.concatWith(Flux.just(new Long[] {-1L, -2L, -3L, -4L})
.subscribe(System.out::println);
}
0
1
1
2
3
5
8
13
21
34
-1
-2
-3
-4