Reactor: 데이터와 스트림 처리

xellos·2022년 5월 14일
0

JAVA-Reactor

목록 보기
4/11
post-thumbnail

소개

Reactor는 데이터를 조작하는 다양한 연산자를 제공한다. 이 연산자는 스트림을 받아서 다른 데이터로 구성된 스트림을 반환한다.

Generating Data

본격적으로 연산자를 보기 전에 스트림 데이터를 생성해보자. 아래의 코드는 피보나치 수열을 생성하는 코드이다.

Flux<Long> fibonacciGenerator = Flux.generate(
	() -> Tuples.<Long, Long>of(0L, 1L),
    (state, sink) -> {
    	if(state.getT1() < 0) sink.complete();
        else sink.next(state.getT1());
        return Tuples.of(state.getT2(), state.getT1() + state.getT2());
    });
    
fibonacciGenerator.subscribe(System.out::println);
  • 결과
0
1
1
2
3
...
1779979416004714189
2880067194370816120
4660046610375530309
7540113804746346429

데이터 필터링 연산자

1) filter 연산자

filter 메서드: synchronous한 평가방식

@Test
void filter() {
	fibonacciGenerator
    	.filter(a -> a % 2 == 0)
        .subscribe(System.out::println);
}
  • 결과
0
2
8
34
144
610
...
679891637638612258
2880067194370816120

filterWhen 메서드: asynchronous한 평가방식

@Test
void filterWhen() {
	fibonacciGenerator
    	.filterWhen(a -> Mono.just(a < 10))
        .subscribe(System.out.println);
}
  • 결과
0
1
1
2
3
5
8

2) take 연산자

take 메서드: 순서상 앞에서 받고 싶은 수만큼의 데이터만 받는다.

@Test
void take() {
	fibonacciGenerator
    	.take(10)
        .subscribe(System.out::println);
}
  • 결과
0
1
1
2
3
5
8
13
21
34

takeLast 메서드: 순서상 뒤에서 받고 싶은 수만큼의 데이터만 받는다.

@Test
void takeLast() {
	fibonacciGenerator
    	.takeLast(10)
        .subscribe(System.out::println);
}
  • 결과
99194853094755497
160500643816367088
259695496911122585
420196140727489673
679891637638612258
1100087778366101931
1779979416004714189
2880067194370816120
4660046610375530309
7540113804746346429

Last 메서드: 마지막 값만 받는다.

@Test
void last() {
	fibonacciGenerator
    	.last()
        .subscribe(System.out::println);
}
  • 결과
7540113804746346429

3) skip 연산자

위의 take 연산자는 데이터를 선택하는 관점이었다면, skip은 데이터를 거르는 관점이다.

skip(count) 메서드

@Test
void skip1() {
	fibonacciGenerator
    	.skip(90)
        .subscribe(System.out::println);
}
  • 결과
2880067194370816120
4660046610375530309
7540113804746346429

skip(Duration) 메서드

데이터의 발행후 지정된 시간만큼 데이터를 거른다.

@Test
void skip2() {
	fibonacciGenerator
    	.skip(Duration.ofMillis(10))
        .subscribe(System.out::println);
}
  • 결과: 결과 없음

skipLast(count) 메서드

데이터의 발행후 뒤에서부터 지정된 수만큼의 데이터를 거른다.

@Test
void skipLast() {
	fibonacciGenerator
    	.skipLst(90)
        .subscribe(System.out::println);
}
  • 결과
0
1
1

skipUntil(Boolean or Predicate) 메서드

데이터 발행후 특정 조건에 최초로 일치하는 데이터를 만나기 전까지 모든 데이터를 거른다.

@Test
void skipUntil() {
	fibonacciGenerator
    	.skipUntil(t -> t > 10000000000000000L)
        .subscribe(System.out::println);
}
  • 결과
14472334024676221
23416728348467685
37889062373143906
61305790721611591
99194853094755497
160500643816367088
259695496911122585
420196140727489673
679891637638612258
1100087778366101931
1779979416004714189
2880067194370816120
4660046610375530309
7540113804746346429

4) 그밖의 연산자

  • distinct: 넘어오는 데이터 스트림에서 유니크한 요소를 선택하기 위해 사용하는 연산자.
  • distinctUntilChanged: 첫 번째 집합의 distinct 요소를 선택하기 위해 사용하는 연산자.
  • ignoreElmenets: 데이터 요소를 완전히 무시하기 위해 사용하는 연산자.
  • single: 오직 하나의 데이터만 선택하기 위해 사용하는 연산자.
  • elementAt: 스트림에서 특정한 위치의 데이터만 가져오기 위해 사용하는 연산자.

데이터 변환

데이터를 다른 포맷으로 변경하는것은 매우 흔한 일입니다. Reactor는 이러한 상황을 위하여 다양한 연산자를 제공하며, 이를 이용하면 단순히 데이터의 변환 뿐만 아닌 수정도 가능합니다.

1) Map 연산자

아래의 예제는 피보나치 수열에서 넘어오는 수를 로마숫자로 변환하는 예제입니다.

보조를 위한 클래스

public class RomanNumber {
	TreeMap<Integer, String> romanMap = mew TreeMap<>();
    
    RomanNumber() {
    	romanMap.put(1000, "M");
        romanMap.put(900, "CM");
        romanMap.put(500, "D");
        romanMap.put(400, "CD");
        romanMap.put(100, "C");
        romanMap.put(90, "XC");
        romanMap.put(50, "L");
        romanMap.put(40, "XL");
        romanMap.put(10, "X");
        romanMap.put(9, "IX");
        romanMap.put(5, "V");
        romanMap.put(4, "IV");
        romanMap.put(41, "I");
    }
    
    String toRomanNumeral(int number) {
    	int l = romanMap.floorKey(number);
        if(number == l) return romanMap.get(number);
        return romanMap.get(l) + toRomanNumeral(number - 1);
    }
}

map 메서드 사용

@Test
void map() {
	RomanNumber numberConvertor = new RomanNumber();
    fibonacciGenerator
    	.skip(1)
        .take(10)
        .map(t -> numberConvertor.toRomanNumeral(t.intValue()))
        .subscribe(System.out::println);
}
  • 결과
I
I
II
III
V
VVVV
XXXX
XXXXXXXXXXXX
XXXXXXXXXXXXXXXXXXXXXXXXX
LLLLLL

2) flatMap 연산자

위에서 본 map() 연산자 메서드는 one-to-one 변환에는 효과적이지만 one-to-n 변환은 다룰 수 없습니다.

Map을 다루는 예제

  • 보조 클래스: 인수분해
public class Factorization {
	public static Collection<Integer> findFactor(int number) {
    	ArrayList<Integer> factors = new ArrayList<>();
        for(int i = 1; i <= number; i++) {
        	if(number % i == 0) factors.add(i);
        }
        return factors;
    }
}
  • 실행
@Test
void map2() {
	fibonacciGenerator
    	.skip(1)
        .take(10)
        .map(t -> Factorization.findFactor(t.intValue()))
        .subscribe(System.out::println);
}
  • 결과
[1]
[1]
[1, 2]
[1, 3]
[1, 5]
[1, 2, 4, 8]
[1, 13]
[1, 3, 7, 21]
[1, 2, 17, 34]
[1, 5, 11, 55]

flatMap으로 변환

위에서 생성된 정수 배열의 집합을 받아서 다시 정수 요소를 가지는 스트림으로 변환하기 위해서 flatMap을 사용할 수 있습니다.

@Test
void flatMap() {
	fibonacciGenerator
    	.skip(1)
        .take(10)
        .flatMap(t -> Flux.fromIterable(Factorization.findFactor(t.intValue())))
        .subscribe(System.out::println);
}
  • 결과
1
1
1
2
1
3
1
5
1
2
4
8
1
13
1
3
7
21
1
2
17
34
1
5
11
55

3) repeat 연산자

코드 예제

@Test
void repeat() {
	fibonacciGenerator
    	.take(10)
        .repeat(2)
        .subscribe(System.out::println);
}
  • 결과
0
1
1
2
3
5
8
13
21
34
0
1
1
2
3
5
8
13
21
34
0
1
1
2
3
5
8
13
21
34

4) collect 연산자

collect 연산자는 stream에서 발행되는 데이터를 모아서 Collection 형태로 가공할 수 있는 연산자를 제공합니다.

collectList: 발행되는 데이터를 받아서 List의 형태로 반환합니다.

@Test
void collectList() {
	fibonacciGenerator
    	.take(10)
        .collectList()
        .subscribe(System.out::println)
}
  • 결과
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

collectSortList: 발행되는 데이터를 받아서 Comparator를 제공하여 SortedList 형태로 반환합니다.

@Test
void collectSortList() {
	fibonacciGenerator
    	.take(10)
        .collectSortedList((x, y) -> -1 * Long.compare(x, y))
        .subscribe(System.out::println);
}
  • 결과
[34, 21, 13, 8, 5, 3, 2, 1, 1, 0]

collectMap: 발행되는 데이터를 받아서 Map의 형태로 반환합니다.

@Test
void collectMap() {
	fibonacciGenerator
    	.take(10)
        .collectMap(t -> t % 2 == 0 ? "even" : "odd")
        .subscribe(System.out::println);
}
  • 결과
    이때, 결과는 HashMap과 같이 같은 Key가 있으면 나중에 넘어온 결과로 데이터를 덮어씁니다.
{even=34, odd=21}

collectMultiMap

@Test
void collectMultiMap() {
	fibonacciGenerator
    	.take(10)
        .collectMultiMap(t -> t % 2 == 0 ? "even" : "odd")
        .subscribe(System.out::println);
}
  • 결과
    위의 collectMap 메서드와는 다르게 value 를 배열로하여 key가 중복되어도 데이터가 유실되지 않는다.
{even=[0, 2, 8, 34], odd=[1, 1, 3, 5, 13, 21]}

5) reduce 연산자

reduce메서는 스트림의 모든 데이터를 통합하여 하나의 값으로 반환합니다.

코드 예제

@Test
void reduce() {
	fibonacciGenerator
    	.take(10)
        .reduce((x, y) -> x + y)
        .subscribe(System.out::println);
}
  • 결과
88

6) 조건 검증 연산자

all 메서드

Predicate 구현체를 인자로 받아서 모든 연산자가 조건을 만족하는지 검증합니다.

@Test
void all() {
	fibonacciGenerator
    	.take(10)
        .all(x -> x > 0)
        .subscribe(System.out::println);
}
  • 결과
false

any 메서드

Predicate 구현체를 인자로 받아서 하나의 연산자라도 조건을 만족하는지 검증합니다.

@Test
void concatWith() {
	fibonacciGenerator
    	.take(10)
        .any(x -> x > 0)
        .subscribe(System.out::println);
}
  • 결과
true

7) 데이터 병합 연산자

공급자는 하나의 공급자로 제한되지 않습니다. 따라서 각기 다른 이벤트 공급자가 발행하는 데이터를 병합할 수 있는 연산자가 제공됩니다.

concatWith 연산자

@Test
void concatWith() {
	fibonacciGenerator
    	.take(10)
        .concatWith(Flux.just(new Long[] {-1L, -2L, -3L, -4L})
        .subscribe(System.out::println);
}
  • 결과
0
1
1
2
3
5
8
13
21
34
-1
-2
-3
-4

0개의 댓글