interval()은 대부분의 리액티브 프레임워크나 라이브러리에서 제공되는 기능 중 하나로, 주어진 시간 간격마다 값을 생성하거나 이벤트를 발생시킵니다. 주로 타이머나 주기적인 작업을 구현할 때 사용됩니다.
const { interval } = require('rxjs');
// 1초마다 숫자를 생성하는 Observable 생성
const observable = interval(1000);
// 구독을 시작하여 값을 받아옴
const subscription = observable.subscribe(value => {
console.log(value);
});
// 몇 초 후에 구독을 해제
setTimeout(() => {
subscription.unsubscribe();
}, 5000);
이 예제에서 interval(1000)은 1초마다 값을 생성하는 Observable을 생성합니다. 구독자(subscriber)는 subscribe()를 호출하여 Observable의 값을 받아옵니다.
그리고 5초 후에 구독을 해제하도록 예약된 작업을 수행하고 있습니다.
결과적으로 위 예제는 처음 1초 후에 0을 출력하고, 그 다음부터는 1초 간격으로 1, 2, 3, 4를 출력하다가 5초 후에 구독이 해제되어 종료됩니다.
이와 같이 interval()은 주기적인 작업이나 타이머 기능을 간편하게 구현하는 데에 사용되는 리액티브 프로그래밍의 핵심적인 도구 중 하나입니다.
onBackpressureDrop() 연산자는 백프레셔 발생 시 소비자가 처리하지 못하는 데이터를 버릴 때 사용됩니다. 즉, 데이터 생산자가 빠른 속도로 데이터를 생성하더라도 소비자가 처리하는 속도보다 데이터가 더 많이 생성될 경우, onBackpressureDrop() 연산자를 사용하면 처리하지 못하는 데이터를 무시하고 넘어갑니다.
import reactor.core.publisher.Flux;
public class BackpressureExample {
public static void main(String[] args) {
Flux<Integer> producer = Flux.range(1, 10)
.onBackpressureDrop(); // 백프레셔 처리 전략 지정
producer.subscribe(
data -> System.out.println("Consumed: " + data),
error -> System.err.println("Error: " + error),
() -> System.out.println("Consumed all data")
);
}
}
실행결과:
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Error: reactor.core.Exceptions$ErrorCallbackNotImplemented: org.reactivestreams.Processor.onComplete
이 예제에서 Flux.range(1, 10)
은 1부터 10까지의 숫자를 생성하는 Flux를 생성합니다. 그리고 onBackpressureDrop()
을 호출하여 백프레셔 처리 전략을 지정합니다.
이 설정에 따라 소비자가 데이터를 처리하지 못할 때 해당 데이터는 무시됩니다.
1부터 5까지의 데이터는 정상적으로 처리되었으며, 6부터 10까지의 데이터는 백프레셔 처리로 인해 소비자가 처리하지 못하고 에러가 발생한 것을 볼 수 있습니다.
이처럼 onBackpressureDrop() 연산자는 데이터의 백프레셔 처리 전략을 지정하는 데 사용되며, 너무 많은 데이터가 발생하는 상황에서 소비자의 능력을 초과하지 않도록 도와줍니다.
doOnNext() 연산자를 사용하면 스트림의 각 데이터 항목을 가로채서 추가적인 동작을 수행할 수 있습니다. 이 연산자는 데이터의 변환 없이 원본 데이터를 그대로 전달하며, 연산자 체인 내에서 중간 단계로 활용됩니다.
import reactor.core.publisher.Flux;
public class DoOnNextExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 5)
.doOnNext(data -> System.out.println("Processing: " + data));
numbers.subscribe(data -> System.out.println("Consumed: " + data));
}
}
실행결과:
Processing: 1
Consumed: 1
Processing: 2
Consumed: 2
Processing: 3
Consumed: 3
Processing: 4
Consumed: 4
Processing: 5
Consumed: 5
이 예제에서 Flux.range(1, 5)
는 1부터 5까지의 숫자를 생성하는 Flux를 생성합니다. 그리고 doOnNext(data -> System.out.println("Processing: " + data))
를 사용하여 각 데이터가 처리되기 전에 "Processing: "이라는 메시지를 출력하는 사이드 이펙트를 추가하고 있습니다.
doOnNext() 연산자로 추가된 처리가 각 데이터 항목의 처리 전후에 실행되는 것을 확인할 수 있습니다. 이처럼 doOnNext()를 사용하면 데이터 스트림을 모니터링하거나 디버깅하는 데 유용하며, 데이터 변환 없이 사이드 이펙트를 관리할 수 있습니다.
publishOn() 연산자는 데이터가 다음 연산자로 전달되기 전에 현재 스레드에서 다른 스레드로 전환합니다. 이를 통해 네트워크 호출이나 I/O 작업과 같이 블로킹되는 작업을 다른 스레드에서 처리할 수 있습니다. 이렇게 하면 현재 스레드가 블로킹되지 않고 동시성을 유지하며 데이터를 처리할 수 있습니다.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class PublishOnExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> numbers = Flux.range(1, 5)
.publishOn(Schedulers.parallel()) // 다른 스레드로 전환
.map(data -> {
System.out.println("Mapping: " + data + " on " + Thread.currentThread().getName());
return data * 10;
});
numbers.subscribe(data -> System.out.println("Consumed: " + data + " on " + Thread.currentThread().getName()));
Thread.sleep(100); // 결과 확인을 위해 잠시 대기
}
}
실행결과:
Mapping: 1 on parallel-1
Consumed: 10 on main
Mapping: 2 on parallel-1
Consumed: 20 on main
Mapping: 3 on parallel-1
Consumed: 30 on main
Mapping: 4 on parallel-1
Consumed: 40 on main
Mapping: 5 on parallel-1
Consumed: 50 on main
이 예제에서 publishOn(Schedulers.parallel())
는 데이터 처리를 병렬로 수행하도록 스케줄러를 변경합니다. map() 연산자 내에서 데이터 변환을 하면서 어떤 스레드에서 작업이 수행되는지 출력하고 있습니다.
publishOn()를 사용하여 데이터 처리를 병렬 스레드로 전환한 결과, 데이터 변환과 소비가 다른 스레드에서 동시에 처리되고 있음을 확인할 수 있습니다. 이를 통해 리액티브 프로그래밍에서 동시성과 병렬성을 효과적으로 다룰 수 있습니다.
subscribe() 메서드를 호출하면 옵저버가 생성되고, 해당 옵저버는 옵저버블이 생성한 데이터 스트림을 구독하여 데이터 항목을 받아오며, 에러와 완료 이벤트를 처리합니다.
import reactor.core.publisher.Flux;
public class SubscribeExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 5);
numbers.subscribe(
data -> System.out.println("Consumed: " + data),
error -> System.err.println("Error: " + error),
() -> System.out.println("Consumed all data")
);
}
}
실행결과:
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed all data
이 예제에서 Flux.range(1, 5)는 1부터 5까지의 숫자를 생성하는 Flux를 생성합니다. 그리고 subscribe()를 호출하여 데이터를 처리하는 방식을 등록하고 있습니다. 이때 세 개의 매개변수를 사용하여 각각 데이터 항목 처리, 에러 처리, 완료 처리를 정의하고 있습니다.
subscribe()를 사용하여 데이터를 처리하면 각 데이터 항목이 출력되고, 모든 데이터 처리가 완료되면 "Consumed all data"가 출력됩니다. 에러가 발생하면 에러 처리 블록이 실행됩니다.
이처럼 subscribe()를 사용하여 옵저버블을 구독하면 데이터의 흐름을 비동기적으로 처리하고, 이벤트에 대한 처리도 설정할 수 있습니다.
onBackpressureLatest() 연산자는 백프레셔 발생 시 소비자가 처리하지 못하는 데이터를 그 중에서도 가장 최신의 데이터만 유지하고, 나머지 데이터는 버립니다. 이렇게 하면 최신 데이터만 유지하면서 소비자가 따라잡을 수 있을 때까지 데이터 쌓임을 제어할 수 있습니다.
import reactor.core.publisher.Flux;
public class BackpressureLatestExample {
public static void main(String[] args) {
Flux<Integer> producer = Flux.range(1, 10)
.onBackpressureLatest(); // 백프레셔 처리 전략 지정
producer.subscribe(
data -> System.out.println("Consumed: " + data),
error -> System.err.println("Error: " + error),
() -> System.out.println("Consumed all data")
);
}
}
실행결과:
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Error: reactor.core.Exceptions$ErrorCallbackNotImplemented: org.reactivestreams.Processor.onComplete
이 예제에서 Flux.range(1, 10)는 1부터 10까지의 숫자를 생성하는 Flux를 생성합니다. 그리고 onBackpressureLatest()를 호출하여 백프레셔 처리 전략을 지정합니다.
1부터 5까지의 데이터는 정상적으로 처리되었으며, 나머지 데이터는 백프레셔 처리로 인해 소비자가 처리하지 못하고 에러가 발생한 것을 볼 수 있습니다.
최신 데이터만 유지되었기 때문에 소비자는 처리할 수 있는 만큼의 데이터만 유지하고 나머지는 버린 것입니다.
onBackpressureLatest()는 백프레셔 상황에서 최신 데이터만 유지하면서 데이터의 쌓임을 제어하고 싶을 때 사용하는 유용한 연산자 중 하나입니다.
doOnRequest()는 리액티브 프로그래밍에서 사용되는 연산자 중 하나로, 옵저버블(또는 플로우)이 데이터를 요청할 때 추가적인 동작을 수행하기 위해 사용됩니다. 이 연산자를 사용하여 데이터 요청의 이벤트를 모니터링하거나 로깅하는 등의 작업을 처리할 수 있습니다.
import reactor.core.publisher.Flux;
public class DoOnRequestExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 5)
.doOnRequest(requested -> System.out.println("Requested: " + requested));
numbers.subscribe(
data -> System.out.println("Consumed: " + data),
error -> System.err.println("Error: " + error),
() -> System.out.println("Consumed all data")
);
}
}
실행결과:
Requested: 9223372036854775807
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed all data
이 예제에서 Flux.range(1, 5)는 1부터 5까지의 숫자를 생성하는 Flux를 생성합니다. 그리고 doOnRequest(requested -> System.out.println("Requested: " + requested))를 사용하여 데이터 요청 이벤트가 발생할 때마다 "Requested: " 메시지와 함께 요청된 데이터 개수를 출력하는 작업을 추가하고 있습니다.
Requested: 9223372036854775807는 초기에 백프레셔에 의한 데이터 요청이 발생했음을 나타내고, 각 데이터 항목이 처리될 때마다 데이터 요청 이벤트를 출력함을 확인할 수 있습니다.
doOnRequest()를 사용하여 데이터 요청 이벤트를 관찰하면 리액티브 스트림의 동작을 더 잘 이해하고, 데이터 흐름을 모니터링하거나 디버깅하는 데 도움이 됩니다.
just() 메서드는 주어진 값을 가지는 옵저버블(또는 플로우)을 생성합니다. 다음 예제에서는 Flux.just()로 여러 숫자를 생성하는 Flux를 만들고, Mono.just()로 단일 문자열을 갖는 Mono를 생성합니다.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class JustExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Mono<String> message = Mono.just("Hello, Reactive World!");
numbers.subscribe(System.out::println);
message.subscribe(System.out::println);
}
}
map() 메서드는 옵저버블의 각 데이터 항목을 변환합니다. 아래 예제는 map()을 사용하여 각 숫자를 제곱한 결과를 출력합니다.
import reactor.core.publisher.Flux;
public class MapExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 5);
Flux<Integer> squared = numbers.map(x -> x * x);
squared.subscribe(System.out::println);
}
}
filter() 메서드는 옵저버블의 데이터 중에서 조건을 만족하는 데이터만 선택합니다. 아래 예제는 filter()를 사용하여 짝수만 선택하여 출력합니다.
import reactor.core.publisher.Flux;
public class FilterExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0);
evenNumbers.subscribe(System.out::println);
}
}
flatMap() 메서드는 각 데이터 항목을 새로운 옵저버블로 변환한 후, 결과 옵저버블을 펼쳐서 하나의 스트림으로 만듭니다. 아래 예제는 각 숫자를 제곱한 후 그 결과를 출력합니다.
import reactor.core.publisher.Flux;
public class FlatMapExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 5);
Flux<Integer> squared = numbers.flatMap(x -> Flux.just(x * x));
squared.subscribe(System.out::println);
}
}
concat() 메서드는 여러 개의 옵저버블을 연결하여 순서대로 데이터를 발행합니다. 다음 예제는 두 개의 Flux를 concat()을 사용하여 연결하고 데이터를 출력합니다.
import reactor.core.publisher.Flux;
public class ConcatExample {
public static void main(String[] args) {
Flux<Integer> flux1 = Flux.range(1, 3);
Flux<Integer> flux2 = Flux.range(4, 3);
Flux<Integer> concatenated = Flux.concat(flux1, flux2);
concatenated.subscribe(System.out::println);
}
}
merge() 메서드는 여러 개의 옵저버블을 병합하여 동시에 데이터를 발행합니다. 다음 예제는 두 개의 Flux를 merge()를 사용하여 병합하고 데이터를 출력합니다.
import reactor.core.publisher.Flux;
public class MergeExample {
public static void main(String[] args) {
Flux<Integer> flux1 = Flux.range(1, 3);
Flux<Integer> flux2 = Flux.range(4, 3);
Flux<Integer> merged = Flux.merge(flux1, flux2);
merged.subscribe(System.out::println);
}
}
zip() 메서드는 여러 개의 옵저버블을 조합하여 데이터를 일대일로 쌍을 이루어 결합합니다. 다음 예제는 두 개의 Flux를 zip()을 사용하여 조합하고 데이터를 출력합니다.
import reactor.core.publisher.Flux;
public class ZipExample {
public static void main(String[] args) {
Flux<Integer> flux1 = Flux.range(1, 3);
Flux<Integer> flux2 = Flux.range(4, 3);
Flux<Tuple2<Integer, Integer>> zipped = Flux.zip(flux1, flux2);
zipped.subscribe(tuple -> System.out.println(tuple.getT1() + ", " + tuple.getT2()));
}
}
take() 메서드는 옵저버블에서 처음 몇 개의 데이터만 선택합니다. 아래 예제는 take()를 사용하여 첫 번째 3개의 숫자만 선택하여 출력합니다.
import reactor.core.publisher.Flux;
public class TakeExample {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> firstThree = numbers.take(3);
firstThree.subscribe(System.out::println);
}
}
interval() 함수는 주기적으로 지정한 시간 간격으로 숫자를 생성하는 Flux를 생성합니다. 아래 예제는 1초마다 숫자를 생성하여 출력합니다.
import reactor.core.publisher.Flux;
import java.time.Duration;
public class IntervalExample {
public static void main(String[] args) throws InterruptedException {
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));
intervalFlux.subscribe(System.out::println);
Thread.sleep(5000);
}
}