Reactive streams - Spring Webflux Project reactor [Flux, Mono] (4/17)

세젤게으름뱅이·2025년 5월 1일

Spring Webflux

목록 보기
14/16

Reactive streams - Project reactor

Publisher의 경우 구현 라이브러리에 따라 다르다.

  • Project reactor는 Spring Webflux에서 기반이 되는 라이브러리
  • RxJava는 Netflix에서 만들고, Rx를 Java 형태로 포팅해서 제공하는 라이브러리
  • 처음 시작이 C#이였지만 RxJs, RxRuby 등으로 지원
  • Mutiny는 Hibernate Reactive로서 만들어짐

Project reactor

  • Pivotal 사에서 개발
  • Spring reactor에서 사용
  • Mono와 Flux라는 publisher 제공
  • Mono, Flux는 Publisher를 상속받는 CorePublisher를 구현

Project reactor - Flux

  • 0 .. n개의 item 전달
  • 에러 발생시, error signal 전달 후 종료
  • 모든 item을 전달했다면 complete signal 전달 후 종료
  • backPressure 지원

Flux

-- SimpleSubscriber Class
@RequiredArgsConstructor 
public class SimpleSubscriber<T> implements Subscriber<T> { 
    private final Integer count; 
    @Override 
    public void onSubscribe(Subscription s) { 
        log.info("subscribe"); 
        s.request(count); 
        log.info("request: {}", count); 
    } 
    @SneakyThrows 
    @Override 
    public void onNext(T t) { 
        log.info("item: {}", t); 
        Thread.sleep(100); 
    } 
    @Override 
    public void onError(Throwable t) { 
        log.error("error: {}", t.getMessage()); 
    } 
    @Override 
    public void onComplete() { 
        log.info("complete"); 
    } 
}

-- FluxSimpleExample Class
@Slf4j 
public class FluxSimpleExample { 
    public static void main(String[] args) { 
        log.info("start main"); 
        getItems() 
            .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE)); 
        log.info("end main"); 
    } 
    private static Flux<Integer> getItems() { 
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5)); 
    } 
}
  • getItems()를 통해 Publisher인 Flux를 반환
  • 해당 Publisher에 subscribe()
  • onSubscribe() 호출시 Subscriber는 Subscription에 원하는 n개만큼의 request() 요청 가능

Flux - subscribeOn

@Slf4j
public class FluxSimpleSubscribeOnExample {
    @SneakyThrows
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .map(i -> {
                    log.info("map {}", i);
                    return i;
                })
                .subscribeOn(Schedulers.single())		// executor의 Single thread
                .subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
        log.info("end main");
        Thread.sleep(1000);
    }
    private static Flux<Integer> getItems() {
        return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
    }
}
  • 다른 Thread에서 따로 실행하고 싶다면, subscribeOn() 호출
  • Schedulers.single()은 executor의 SingleThread와 유사
  • end main이 먼저 출력된 이유
    • subscribeOn()은 Flux의 실행을 다른 Thread에서 수행하도록 예약
    • 예약만 할 뿐, 그 실행은 비동기적으로 나중에 수행
    • 때문에 main Thread에서는 end main을 출력

Flux - subscribe

@Slf4j
public class FluxNoSubscribeExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems();						// subscribe() X
        log.info("end main");
    }
}
private static Flux<Integer> getItems() {		// Flux를 반환했지만, subscribe하지 않으면 아무 일도 X
    return Flux.create(fluxSink -> {
        log.info("start getItems");
        for (int i = 0; i < 5; i++) {
            fluxSink.next(i);
        }
        fluxSink.complete();
        log.info("end getItems");
    });
}

lazy loading

  • subscribe하지 않으면, 아무 일도 일어나지 않는다.
  • CompletableFuture는 future가 반환되는 순간 지연로딩 없이 바로 실행되는 문제가 있었음.
  • getItems()는 Flux를 반환하였지만, subscribe() 하지 않아서 아무 일도 X
  • 위 결과는 start main과 end main만 출력

Flux - error

@Slf4j
public class FluxErrorExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber <>(Integer.MAX_VALUE));
        log.info("end main");
    }
    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            fluxSink.next(0);
            fluxSink.next(1);
            var error = new RuntimeException("error in flux");
            fluxSink.error(error);
        });
    }
} 
  • Flux는 error 한번 검출시, Flux 진행 X

Flux - complete

@Slf4j
public class FluxCompleteExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().subscribe(new SimpleSubscriber <>(Integer.MAX_VALUE));
        log.info("end main");
    }
    private static Flux<Integer> getItems() {
        return Flux.create(fluxSink -> {
            fluxSink.complete();
        });
    }
} 
  • complete() 호출시 즉각종료

Project reactor - Mono

  • 0 .. 1개의 item 전달
  • 에러가 발생하면 error signal 전달 후 종료
  • 모든 item을 전달헀다면 complete signal 전달 후 종료

Q. Flux에서 하나의 값만 넘겨주면 되는데 왜 Mono를 사용할까?

Mono

  • 1개의 item만 전달하기 때문에 next 하나만 실행하면 complete가 보장됨
    • 혹은 전달하지 않고, complete를 하면 값이 없다는 것을 의미
  • 하나의 값이 있거나 없다

Mono와 Flux

  • Mono는 Optional, 없거나 혹은 하나의 값
  • Mono<Void'>로 특정 사건이 완료되는 시점을 가리킬 수도 있다.
  • Flux는 List, 무한하거나 유한한 여러개의 값을 의미

Flux를 Mono로

@Slf4j
public class FluxToMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        Mono.from(getItems()).subscribe(
                new SimpleSubscriber<>(Integer.MAX_VALUE)
        );
        log.info("end main");
    }
}
private static Flux<Integer> getItems() {
    return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
  • Mono.from으로 Flux를 Mono로
  • 첫 번째 값만 전달.
  • 1, 2, 3, 4, 5 중 1만 반환 후 complete
@Slf4j
public class FluxToListMonoExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()
                .collectList()
                .subscribe(
                        new SimpleSubscriber<>(Integer.MAX_VALUE)
                );
        log.info("end main");
    }
}
private static Flux<Integer> getItems() {
    return Flux.fromIterable(List.of(1, 2, 3, 4, 5));
}
  • collectList()
    • Publisher인 Flux의 값들을 내부 배열로 저장 후 한번에 전달
  • Flux의 값들을 collect 하다가 complete 이벤트가 발생하는 시점에 모은 값들을 전달
  • 결과 - item : [1,2,3,4,5]

Mono를 Flux로

@Slf4j
public class MonoToFluxExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems().flux()
                .subscribe(new SimpleSubscriber <>(Integer.MAX_VALUE));
        log.info("end main");
    }
    private static Mono<List<Integer >> getItems() {
        return Mono.just(List.of(1, 2, 3, 4, 5));
    }
} 
  • flux()
    • Mono<List<Integer'>>를 Flux<List<Integer'>>로 변환
  • Mono.just(x)
    • x 하나만 emit하는 Mono
    • Reactor에서 단일 값을 감싸는 Publisher를 만드는 가장 기본적인 방법
    • Mono.just(List.of(1, 2, 3, 4, 5))는 List 하나를 값으로 가지는 Mono 객체 생성
    • 단 한 번만 onNext()로 emit(방출) 하고 complete
  • Mono를 next 한 번 호출하고, onComplete를 호출하는 Flux로 변환

Mono안의 원소를 하나씩 emit하려면?

@Slf4j
public class ListMonoToFluxExample {
    public static void main(String[] args) {
        log.info("start main");
        getItems()				// Mono 상태
                .flatMapMany(value -> Flux.fromIterable(value))	// flux 생성. flux로 flux를 생성? thenCompose와 유사
                .subscribe(new SimpleSubscriber <>(Integer.MAX_VALUE));
        log.info("end main");
    }
    private static Mono<List<Integer>> getItems() {
        return Mono.just(List.of(1, 2, 3, 4, 5));
    }
}
  • flatMapMany() + Flux.fromIterable
  • Mono의 값으로 여러 개의 값을 전달하는 Flux를 만들고 연결
profile
🤦🏻‍♂️

0개의 댓글