Reactor: Flow Control (흐름 조절)

xellos·2022년 5월 15일
0

JAVA-Reactor

목록 보기
5/11

Flow Control

Flow Control은 생산자가 많은 이벤트를 발행하면서 구독자를 압박하지 않고 이벤트를 관리하는 것이다. 빠른 생산자는 많은 이벤트를 구독자에게 전달할 수 있다. 구독자는 이벤트를 받는 즉시 처리한다. 이러한 일련의 과정은 이벤트가 처리가능한 속도를 넘어서 전달되면 비효율을 초래할 수 있다.

이러한 비효율을 방지하기 위해 Reactor 에서 생산자는 이벤트를 묶어서 발생시킬 수 있다. 각각의 이벤트 묶음은 구독자에게 전달되며 많은 이벤트를 동시에 처리할 수 있게 한다.

1) groupBy 연산자

groupBy 연산자는 Flux를 배치 묶음으로 변환한다.

  • 각각의 요소는 키를 기준으로 그룹으로 묶어진다.
  • 생성된 그룹은 연산자에 의해 통지된다.
  • 작업시 요소들의 원래 순서를 잃을 수 있다. 각각의 요소 순서는 Key 생성 순서에 의해 재편된다.
  • 각각의 요소는 키와 관련되어 있기 때문에 생성된 그룹은 비어있을 수 없다.

예제

@Test
void groupBy() {
	fibonacciGenerator
    	.take(20)
        .groupBy(i -> {
        	List<Integer> divisors = Arrays.asList(2, 3, 5, 7);
            Optional<Integer> divisor = divisors.stream()
            	.filter(d -> i % d == 0).findFirst();
            
            return divisor.map(x -> "Divisible by " + x).orElse("Others");
        })
        .concatMap(x -> {
        	System.out.print("\n" + x.key());
            return x;
        })
        .subscribe(x -> System.out.print(" " + x));
}
  • 결과
  1. groupBy로 key - value 형태로 나누어진 데이터 셋이 방출된다. 여기서 key는 String, value는 List<Long>
  2. 방출된 데이터셋이 concatMap 메서드로 결합된다.
  3. 구독한 결과 List를 출력한다.
Divisible by 2
 0 2 8 34 144 610 2584
Others
 1 1 13 89 233 377 1597 4181
Divisible by 3
 3 21 987
Divisible by 5
 5 55

2) buffer 연산자

Buffer 연산자는 모든 Flux 요소를 모아서 List<T> 형태로 반환한다.

  • 위에서 본 groupBy 메서드 와는 다르게 요소의 원래 순서를 유지한다.
  • batchSize 를 넘기면 생성되는 List의 사이즈를 결정하고 그에 따라 나누어진 List를 반환한다.

예제 1 )

buffer의 첫 번째 인자로 batchSize를 받는다.

@Test
void buffer() {
	fibonacciGenerator
    	.take(100)
        .buffer(10)
        .subscribe(System.out::println);
}
  • 결과
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181]
[6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229]
[832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352, 24157817, 39088169, 63245986]
[102334155, 165580141, 267914296, 433494437, 701408733, 1134903170, 1836311903, 2971215073, 4807526976, 7778742049]
[12586269025, 20365011074, 32951280099, 53316291173, 86267571272, 139583862445, 225851433717, 365435296162, 591286729879, 956722026041]
[1548008755920, 2504730781961, 4052739537881, 6557470319842, 10610209857723, 17167680177565, 27777890035288, 44945570212853, 72723460248141, 117669030460994]
[190392490709135, 308061521170129, 498454011879264, 806515533049393, 1304969544928657, 2111485077978050, 3416454622906707, 5527939700884757, 8944394323791464, 14472334024676221]
[23416728348467685, 37889062373143906, 61305790721611591, 99194853094755497, 160500643816367088, 259695496911122585, 420196140727489673, 679891637638612258, 1100087778366101931, 1779979416004714189]
[2880067194370816120, 4660046610375530309, 7540113804746346429]

예제 2 )

첫 번째 인자로 batchSize, 두 번째 인자로는 skipSize를 받는다. 따라서 다음과 같은 특징을 가진다.

  • batchSize가 skipSize 보다 크면 값을 오버랩해서 가진다. skipSize의 스타팅 포지션은 이전 버퍼이다.
  • batchSize가 skipSize다 작으면 누락되는 값이 생긴다.
  • 두 값이 서로 같으면 skipSize를 사용하는 의미가 없다.
@Test
void buffer2() {
	fibonacciGenerator
    	.take(100)
        .buffer(2, 5)
        .subscribe(System.out::println);
}
  • 결과
[0, 1]
[5, 8]
[55, 89]
[610, 987]
[6765, 10946]
[75025, 121393]
[832040, 1346269]
[9227465, 14930352]
[102334155, 165580141]
[1134903170, 1836311903]
[12586269025, 20365011074]
[139583862445, 225851433717]
[1548008755920, 2504730781961]
[17167680177565, 27777890035288]
[190392490709135, 308061521170129]
[2111485077978050, 3416454622906707]
[23416728348467685, 37889062373143906]
[259695496911122585, 420196140727489673]
[2880067194370816120, 4660046610375530309]

예제 3: bufferWhile )

  • Predicate를 인자로 받아서 하나의 buffer를 생성한다.
  • 조건에 맞는 요소만 버퍼에 담고 일치하지 않는 요소가 등장하면 종료하며 버퍼를 반환한다.
@Test
void bufferWhile() {
	fibonacciGenerator
    	.take(100)
        .bufferWhile(x -> x < 50)
        .subscribe(System.out::println);
}
  • 결과
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

예제 4: bufferUntil )

  • Predicate를 인자로 받는다. 여러 버퍼를 반환한다.
  • 조건에 맞는 요소를 찾으면 현재 생성중인 버퍼를 완성하고 다음 버퍼를 생성한다.
@Test
void bufferUntil() {
	fibonacciGenerator
    	.take(50)
        .bufferUntil(x -> x % 2 == 0)
        .subscribe(System.out::println):
}
  • 결과
[0]
[1, 1, 2]
[3, 5, 8]
[13, 21, 34]
[55, 89, 144]
[233, 377, 610]
[987, 1597, 2584]
[4181, 6765, 10946]
[17711, 28657, 46368]
[75025, 121393, 196418]
[317811, 514229, 832040]
[1346269, 2178309, 3524578]
[5702887, 9227465, 14930352]
[24157817, 39088169, 63245986]
[102334155, 165580141, 267914296]
[433494437, 701408733, 1134903170]
[1836311903, 2971215073, 4807526976]
[7778742049]

예제 5: 지정된 시간 내에 발행된 데이터를 연속적으로 버퍼로 만든다 )

  • 파라미터로 시간을 지정한다.
  • 해당 시간 동안 발행된 데이터를 버퍼로 만들고, 모든 값을 다 버퍼로 만들때 까지 주기를 반복한다.
@Test
void bufferWithTimePeriod() {
	fibonacciGenerator
    	.buffer(Duration.ofNanos(1))
        .subscribe(System.out::println);
}
  • 결과
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377 ... 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]
[24157817, 39088169, 63245986, 102334155, 165580141, ... 1779979416004714189, 2880067194370816120, 4660046610375530309, 7540113804746346429]

예제 6: 버퍼의 데이터 타입변환 )

  • 첫 번째 인자로 buffer의 사이즈를 넘긴다. 두 번째 인자로 Supplier 를 넘기면 버퍼의 데이터 타입을 변환할 수 있다.
  • 요소의 타입이 아닌 요소를 보관할 버퍼의 타입을 바꾸는것이다.
@Test
void bufferContertType() {
	fibonacciGenerator
    	.buffer(5, HashSet::new)
        .subscribe(System.out::println);
}
  • 결과
[0, 1, 2, 3, 5]
[34, 21, 55, 8, 13]
[144, 610, 89, 233, 377]
[4181, 2584, 987, 1597, 6765]
[46368, 75025, 28657, 10946, 17711]
[121393, 196418, 514229, 832040, 317811]
[5702887, 2178309, 9227465, 3524578, 1346269]
[102334155, 14930352, 63245986, 24157817, 39088169]
[267914296, 165580141, 433494437, 701408733, 1134903170]
[12586269025, 7778742049, 2971215073, 1836311903, 4807526976]
[139583862445, 32951280099, 86267571272, 20365011074, 53316291173]
[591286729879, 365435296162, 956722026041, 225851433717, 1548008755920]
[6557470319842, 10610209857723, 2504730781961, 17167680177565, 4052739537881]
[190392490709135, 117669030460994, 44945570212853, 72723460248141, 27777890035288]
[1304969544928657, 2111485077978050, 498454011879264, 308061521170129, 806515533049393]
[3416454622906707, 8944394323791464, 14472334024676221, 5527939700884757, 23416728348467685]
[160500643816367088, 61305790721611591, 259695496911122585, 99194853094755497, 37889062373143906]
[1779979416004714189, 2880067194370816120, 420196140727489673, 1100087778366101931, 679891637638612258]
[7540113804746346429, 4660046610375530309]

3) sample 연산자

지정된 시간동안 발생하는 이벤트중 마지막 이벤트만 받고 나머지는 버린다.

  • 이 연산자는 빠르고, 논블로킹 이벤트를 수행하는 click 이벤트나 검색어 입력 같은 상황에 쓸 수 있다.
  • 이러한 상황에서는 데이터를 선별하기 위해 이벤트 흐름을 조절해야 한다.
@Test
void sample() throws InterruptedException() {
	CountDownLatch latch = new CountDownLatch(1);
    fibonacciGenerator
    	.delayElements(Duration.ofMillis(100L))
        .sample(Duration.ofSeconds(1))
        .subscribe(
        	x -> System.out.println(x),
            e -> latch.countDown(),
            () -> latch.countDown()
      	);
        
    latch.await();
}
  • 결과
21
1597
196418
14930352
1836311903
225851433717
17167680177565
2111485077978050
160500643816367088
7540113804746346429

0개의 댓글