Spring WebFlux & 리액티브 프로그래밍의 핵심 개념 정리: 비동기, 논블로킹, Flux와 Mono

궁금하면 500원·2024년 11월 7일

미생의 스프링

목록 보기
24/48

Spring WebFlux, 리액티브 프로그래밍 및 관련 기술들에 대한 매우 좋은 개요를 제공합니다.
하지만 개념들이 조금 복잡할 수 있기 때문에, 일부 부분은 예시 코드나 실습을 통해 더 명확하게 전달할 수 있습니다.

(1) Spring WebFlux와 비동기, 논블로킹 I/O

비동기, 논블로킹 I/O와 관련된 개념을 좀 더 명확히 하고, 실제 예제를 통해 보여주면 이해가 쉬워질 것입니다.

비동기와 논블로킹의 차이점을 코드로 설명합니다.
아래 예제는 Mono와 Flux를 사용하여 비동기 및 논블로킹 방식을 구현하는 방법을 설명합니다.

import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

public class ReactiveExample {
    public static void main(String[] args) {

        // Mono 예제 - 비동기 처리
        Mono<String> monoExample = Mono.fromCallable(() -> {
            Thread.sleep(1000); // 시간이 걸리는 작업
            return "Hello from Mono!";
        });

        // Flux 예제 - 비동기 스트림 처리
        Flux<Integer> fluxExample = Flux.range(1, 5)
                .map(i -> {
                    try {
                        Thread.sleep(500); // 비동기적 데이터 처리
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i * 2;
                });

        // Mono 구독
        monoExample.subscribe(System.out::println);

        // Flux 구독
        fluxExample.subscribe(System.out::println);
    }
}
  • Mono는 0 또는 1개의 데이터만 처리합니다.
    위 예제에서는 Mono.fromCallable()을 사용해 비동기적으로 1초가 걸리는 작업을 처리합니다.

  • Flux는 0개 이상의 데이터를 처리하며, Flux.range()로 1부터 5까지의 값을 비동기적으로 처리합니다.
    각 값마다 500ms씩 대기 후 처리됩니다.

위 코드에서는 각각의 비동기적 작업이 실행되고, 데이터 스트림이 순차적으로 출력됩니다.
이 예제는 비동기적이지만 논블로킹으로 실행됩니다.

(2) 리액티브 스트림, 백프레셔 및 Publisher/Subscriber

리액티브 프로그래밍에서는 데이터 흐름을 Publisher가 생성하고, 이를 Subscriber가 소비하는 구조입니다.

Subscriber가 데이터를 처리하는 속도에 따라 Publisher가 데이터를 보내는 속도를 조절하는 것이 백프레셔(Backpressure)입니다.

import reactor.core.publisher.Flux;
import reactor.core.publisher.BaseSubscriber;

public class BackpressureExample {
    public static void main(String[] args) {
        // Flux 생성
        Flux<Integer> numberStream = Flux.range(1, 100)
                .map(i -> {
                    try {
                        Thread.sleep(10);  // 데이터 처리 지연
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                });

        // Subscriber 생성 - 백프레셔를 적용
        numberStream.subscribe(new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnNext(Integer value) {
                System.out.println(value);
                if (value == 10) {
                    // 10개만 받고 나머지는 무시
                    cancel();
                }
            }
        });
    }
}

위 예제에서 BaseSubscriber를 사용하여, Subscriber가 데이터를 받아 처리하고, 10개 이후에는 cancel() 메서드를 호출하여 스트림을 중지합니다.
이는 백프레셔의 간단한 구현 예시입니다.

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글