Flow Control은 생산자가 많은 이벤트를 발행하면서 구독자를 압박하지 않고 이벤트를 관리하는 것이다. 빠른 생산자는 많은 이벤트를 구독자에게 전달할 수 있다. 구독자는 이벤트를 받는 즉시 처리한다. 이러한 일련의 과정은 이벤트가 처리가능한 속도를 넘어서 전달되면 비효율을 초래할 수 있다.
이러한 비효율을 방지하기 위해 Reactor 에서 생산자는 이벤트를 묶어서 발생시킬 수 있다. 각각의 이벤트 묶음은 구독자에게 전달되며 많은 이벤트를 동시에 처리할 수 있게 한다.
groupBy 연산자는 Flux를 배치 묶음으로 변환한다.
@Test
void groupBy() {
fibonacciGenerator
.take(20)
.groupBy(i -> {
List<Integer> divisors = Arrays.asList(2, 3, 5, 7);
Optional<Integer> divisor = divisors.stream()
.filter(d -> i % d == 0).findFirst();
return divisor.map(x -> "Divisible by " + x).orElse("Others");
})
.concatMap(x -> {
System.out.print("\n" + x.key());
return x;
})
.subscribe(x -> System.out.print(" " + x));
}
Divisible by 2
0 2 8 34 144 610 2584
Others
1 1 13 89 233 377 1597 4181
Divisible by 3
3 21 987
Divisible by 5
5 55
Buffer 연산자는 모든 Flux 요소를 모아서 List<T> 형태로 반환한다.
buffer의 첫 번째 인자로 batchSize를 받는다.
@Test
void buffer() {
fibonacciGenerator
.take(100)
.buffer(10)
.subscribe(System.out::println);
}
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181]
[6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229]
[832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352, 24157817, 39088169, 63245986]
[102334155, 165580141, 267914296, 433494437, 701408733, 1134903170, 1836311903, 2971215073, 4807526976, 7778742049]
[12586269025, 20365011074, 32951280099, 53316291173, 86267571272, 139583862445, 225851433717, 365435296162, 591286729879, 956722026041]
[1548008755920, 2504730781961, 4052739537881, 6557470319842, 10610209857723, 17167680177565, 27777890035288, 44945570212853, 72723460248141, 117669030460994]
[190392490709135, 308061521170129, 498454011879264, 806515533049393, 1304969544928657, 2111485077978050, 3416454622906707, 5527939700884757, 8944394323791464, 14472334024676221]
[23416728348467685, 37889062373143906, 61305790721611591, 99194853094755497, 160500643816367088, 259695496911122585, 420196140727489673, 679891637638612258, 1100087778366101931, 1779979416004714189]
[2880067194370816120, 4660046610375530309, 7540113804746346429]
첫 번째 인자로 batchSize, 두 번째 인자로는 skipSize를 받는다. 따라서 다음과 같은 특징을 가진다.
@Test
void buffer2() {
fibonacciGenerator
.take(100)
.buffer(2, 5)
.subscribe(System.out::println);
}
[0, 1]
[5, 8]
[55, 89]
[610, 987]
[6765, 10946]
[75025, 121393]
[832040, 1346269]
[9227465, 14930352]
[102334155, 165580141]
[1134903170, 1836311903]
[12586269025, 20365011074]
[139583862445, 225851433717]
[1548008755920, 2504730781961]
[17167680177565, 27777890035288]
[190392490709135, 308061521170129]
[2111485077978050, 3416454622906707]
[23416728348467685, 37889062373143906]
[259695496911122585, 420196140727489673]
[2880067194370816120, 4660046610375530309]
@Test
void bufferWhile() {
fibonacciGenerator
.take(100)
.bufferWhile(x -> x < 50)
.subscribe(System.out::println);
}
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
@Test
void bufferUntil() {
fibonacciGenerator
.take(50)
.bufferUntil(x -> x % 2 == 0)
.subscribe(System.out::println):
}
[0]
[1, 1, 2]
[3, 5, 8]
[13, 21, 34]
[55, 89, 144]
[233, 377, 610]
[987, 1597, 2584]
[4181, 6765, 10946]
[17711, 28657, 46368]
[75025, 121393, 196418]
[317811, 514229, 832040]
[1346269, 2178309, 3524578]
[5702887, 9227465, 14930352]
[24157817, 39088169, 63245986]
[102334155, 165580141, 267914296]
[433494437, 701408733, 1134903170]
[1836311903, 2971215073, 4807526976]
[7778742049]
@Test
void bufferWithTimePeriod() {
fibonacciGenerator
.buffer(Duration.ofNanos(1))
.subscribe(System.out::println);
}
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377 ... 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]
[24157817, 39088169, 63245986, 102334155, 165580141, ... 1779979416004714189, 2880067194370816120, 4660046610375530309, 7540113804746346429]
@Test
void bufferContertType() {
fibonacciGenerator
.buffer(5, HashSet::new)
.subscribe(System.out::println);
}
[0, 1, 2, 3, 5]
[34, 21, 55, 8, 13]
[144, 610, 89, 233, 377]
[4181, 2584, 987, 1597, 6765]
[46368, 75025, 28657, 10946, 17711]
[121393, 196418, 514229, 832040, 317811]
[5702887, 2178309, 9227465, 3524578, 1346269]
[102334155, 14930352, 63245986, 24157817, 39088169]
[267914296, 165580141, 433494437, 701408733, 1134903170]
[12586269025, 7778742049, 2971215073, 1836311903, 4807526976]
[139583862445, 32951280099, 86267571272, 20365011074, 53316291173]
[591286729879, 365435296162, 956722026041, 225851433717, 1548008755920]
[6557470319842, 10610209857723, 2504730781961, 17167680177565, 4052739537881]
[190392490709135, 117669030460994, 44945570212853, 72723460248141, 27777890035288]
[1304969544928657, 2111485077978050, 498454011879264, 308061521170129, 806515533049393]
[3416454622906707, 8944394323791464, 14472334024676221, 5527939700884757, 23416728348467685]
[160500643816367088, 61305790721611591, 259695496911122585, 99194853094755497, 37889062373143906]
[1779979416004714189, 2880067194370816120, 420196140727489673, 1100087778366101931, 679891637638612258]
[7540113804746346429, 4660046610375530309]
지정된 시간동안 발생하는 이벤트중 마지막 이벤트만 받고 나머지는 버린다.
@Test
void sample() throws InterruptedException() {
CountDownLatch latch = new CountDownLatch(1);
fibonacciGenerator
.delayElements(Duration.ofMillis(100L))
.sample(Duration.ofSeconds(1))
.subscribe(
x -> System.out.println(x),
e -> latch.countDown(),
() -> latch.countDown()
);
latch.await();
}
21
1597
196418
14930352
1836311903
225851433717
17167680177565
2111485077978050
160500643816367088
7540113804746346429