리액티브 프로그래밍은 비동기 및 이벤트 기반 애플리케이션을 개발하기 위한 패러다임으로, 데이터 스트림과 변경 사항의 흐름을 처리하는 방식을 강조합니다.
Java에서 리액티브 프로그래밍을 구현하기 위해 주로 사용되는 것은 Project Reactor와 RxJava 같은 라이브러리입니다.
아래에서는 리액티브 프로그래밍에서 중요한 개념과 코드 예제를 함께 설명하겠습니다.
Publisher : 데이터 스트림을 생성하고 변화를 Subscriber에게 알리는 역할을 합니다.
Subscriber : Publisher에서 발생하는 데이터 스트림의 변화를 구독하여 처리합니다.
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Publisher<Integer> publisher = subscriber -> {
subscriber.onSubscribe(new Subscription() {
int count = 0;
public void request(long n) {
while (count < n) {
subscriber.onNext(count);
count++;
}
if (count == n) {
subscriber.onComplete();
}
}
public void cancel() {
// 구독 취소 로직
}
});
};
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Subscription subscription;
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(5); // 최대 5개의 요소 요청
}
public void onNext(Integer item) {
// 각 요소 처리 로직
}
public void onError(Throwable throwable) {
// 에러 처리 로직
}
public void onComplete() {
// 작업 완료 처리 로직
}
};
publisher.subscribe(subscriber);
Flux : 0개 이상의 요소를 발생시키는 데이터 스트림을 나타냅니다.
Mono : 0 또는 1개의 요소를 발생시키는 데이터 스트림을 나타냅니다.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); // Flux 생성
Mono<String> mono = Mono.just("Hello, Reactive Programming!"); // Mono 생성
map : 각 요소에 함수를 적용하여 변환합니다.
filter : 조건을 만족하는 요소만을 선택합니다.
flatMap : 각 요소를 다른 Flux나 Mono로 변환하고, 그 결과를 단일 스트림으로 병합합니다.
Flux<Integer> numbers = Flux.range(1, 5)
.map(num -> num * 2)
.filter(num -> num > 5)
.flatMap(num -> Flux.range(num, 2));
Schedulers : 리액터가 제공하는 스케줄러 클래스로, 스레드 풀과 작업 스케줄링을 관리합니다.
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
Flux<Integer> flux = Flux.range(1, 10)
.subscribeOn(Schedulers.parallel()); // 병렬 스케줄러 사용
Cold Stream : 구독할 때마다 데이터를 처음부터 다시 발생시키는 스트림입니다.
Hot Stream : 데이터를 발생시킨 후, 구독한 시점부터 데이터를 전달하는 스트림입니다.
import reactor.core.publisher.ConnectableFlux;
Flux<Integer> coldStream = Flux.range(1, 5);
ConnectableFlux<Integer> hotStream = coldStream.publish();
hotStream.connect(); // Hot 스트림 시작