✏️ 예제 1
- sub 쪽에서 buffer 를 1로 제한해 backpressure 를 구현
Flux.range(1, 5)
.doOnNext(logger::pubLog)
.doOnRequest(logger::reqLog)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Integer value) {
sleep(2000L);
System.out.println("처리한 데이터 : " + value);
request(1);
}
private void sleep(Long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Flux.range(1, 5)
.doOnNext(logger::pubLog)
.doOnRequest(logger::reqLog)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Integer value) {
sleep(2000L);
System.out.println("처리한 데이터 : " + value);
request(1);
}
});
요청한 데이터 수 : 1
공급한 데이터 : 1
처리한 데이터 : 1
요청한 데이터 수 : 1
공급한 데이터 : 2
처리한 데이터 : 2
요청한 데이터 수 : 1
공급한 데이터 : 3
처리한 데이터 : 3
요청한 데이터 수 : 1
공급한 데이터 : 4
처리한 데이터 : 4
요청한 데이터 수 : 1
공급한 데이터 : 5
처리한 데이터 : 5
요청한 데이터 수 : 1
✏️ 예제 2
public static int count = 0;
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5)
.doOnNext(BackpressureExample2::pubLog)
.doOnRequest(BackpressureExample2::reqLog);
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2);
}
@Override
protected void hookOnNext(Integer value) {
count++;
System.out.println("처리한 데이터 : " + value);
if (count == 2) {
sleep(2000L);
request(2);
count = 0;
}
}
}
요청한 데이터 수 : 2
공급한 데이터 : 1
처리한 데이터 : 1
공급한 데이터 : 2
처리한 데이터 : 2
요청한 데이터 수 : 2
공급한 데이터 : 3
처리한 데이터 : 3
공급한 데이터 : 4
처리한 데이터 : 4
요청한 데이터 수 : 2
공급한 데이터 : 5
처리한 데이터 : 5