Spring webflux 개념 정리 (w. Netty, Reactor)

🔥Log·2025년 2월 14일
0

스프링

목록 보기
19/19

🧐 Webflux ?


Spring webflux는 비동기, 논블로킹 방식으로 동작하는 웹 프레임워크이다.
Reactive 프로그래밍 모델을 사용해서 높은 동시 처리 성능을 보여준다.

주로 MSA 환경, 높은 처리량 확보, 실시간 데이터 처리가 필요한 경우에 많이 사용한다.

Reactive stack

비동기, Non-blocking애플리케이션을 만들려면, 해당 애플리케이션의 모든 부분에 비동기, Non-blocking 적으로 동작해야한다.
그래서, 일반적인 Spring servlet 스택과 달리 Reactive 애플리케이션을 만들기 위한 스택이 별도로 존재하고, 위와 같은 기술들이 사용된다.
다시 말해서, 위에 명시된 모든 기술들이 각자의 방법으로 비동기적으로 데이터 또는 로직을 처리한다고 할 수 있다.

Spring webflux의 역할

Reative stack에서 Spring webflux는 서버 애플리케이션이 담당하는 다양한 기능들을 비동기, Non-blocking 방식으로 처리하는 역할을 한다고 보면 된다.

이러한 처리를 위해서 Reactive streams의 구현체인 Reactor 라는 기술을 채택해서 사용하고 있다.

이후로는 Netty와 Reactor에 대해서 좀 더 자세히 알아보자.



🧙‍♂️ Netty


Netty의 특징

  • 비동기 이벤트 기반의 네트워크 애플리케이션 프레임워크이다.
  • HTTP 통신 뿐만 아니라 다양한 프로토콜을 지원한다.
  • Java IO, Java NIO, selector를 기반으로 적은 리소스로 높은 성능을 보장한다.
  • EventLoop라는 요소가 존재해서, IO 작업을 비동기 적으로 처리할 수 있다.

EventLoop

Event loop은 Netty에서 비동기적인 처리가 가능하도록 하는 요소이고, 아래와 같은 구성 요소로 구성되어 있다.

  • TaskQueue: EventLoop는 주기적으로 처리해야할 작업을 TaskQueue에 할당한다.
  • EventExecutor: Task를 실행하는 쓰레드풀
  • Selector: Java NIO의 핵심 컴포넌트로, 여러 채널을 관리하면서 각 채널에서 이벤트가 발생할 때까지 기다리는 역할을 한다. I/O 멀티플렉싱을 통해서 하나의 쓰레드로 여러 I/O 작업을 처리할 수 있다.

EventLoopGroup

N개의 EventLoop을 하나의 Group으로 지정할 수 있는데, 이를 EventLoopGroup이라고 부른다.
하나의 EventLoop은 하나의 쓰레드를 사용하여 처리를 함으로 N개의 EventLoop을 하나의 EventLoopGroup으로 설정하면, N개의 쓰레드를 사용하여 더 높은 처리량을 확보할 수 있다.

Channel

Java NIO의 기반이 되는 요소로, 읽기, 쓰기와 같은 I/O 작업을 수행할 수 있는 연결을 나타내는 추상적인 개념 또는 요소이다.
클라이언트가 서버에 연결을 요청할 때, 새로운 채널이 EventLoop에 등록된다.

이 기능 또는 요소는 웹소켓 통신에 주로 사용된다.



🧙‍♂️ Reactor


Reactor의 특징

  • Reactor는 비동기 데이터 스트림 처리를 위한 표준인 Reactive streams의 구현체이다. (Spring webflux에서 사용됨)
  • 이벤트를 만드는 Publisher와 이벤트를 받아서 처리하는 Subscriber로 구성되어 비동기적인 처리를 수행한다. (⭐)
  • Subscriber에는 .request라는 메서드가 있는데, 이는 Publisher에게 쌓여 있는 이벤트를 요청하는 메서드이고, 한번에 가져올 이벤트의 수를 매개변수로 넘겨줄 수 있다. 이러한 특징 또는 기능을 Backpressure라고 부르며, 데이터 처리의 안정성을 높이는 역할을 한다.

Reactor Publisher

  • 퍼블리셔에는 MonoFlux가 존재한다.
  • Mono는 0 또는 1개의 데이터를 Subscirber에게 전달하는 퍼블리셔이고, Flux는 0 ~ N개의 데이터를 Subscriber에게 전달하는 퍼블리셔이다.
  • Publisher를 .subscribe() 하지 않으면, 아무 일도 일어나지 않는다.


🧙‍♂️ Publisher의 연산자들


1) .just()

별도의 처리 없이 Subscriber에게 바로 값을 전달하는 메서드이다.
Subscriber가 받게 되는 이벤트는 onNext이다.

Mono.just(1)
	.subscribe(v -> {
		log.info("value : " + v);
    });
    
Flux.just(1, 2, 3)
	.subscribe(v -> {
		log.info("value : " + v);
    });

2) .error()

Subscriber에게 에러를 전달하는 메서드이다. Subscriber 입장에서는 onError 이벤트만 받게 된다.

Mono.error(new RuntimeException("err"))
	.subscribe(v -> {
		log.info("value : " + v); // 실행 안됨
    }, error -> {
    	log.error("error : " + error);
    });
    
    
Flux.error(new RuntimeException("err"))
	.subscribe(v -> {
		log.info("value : " + v);  // 실행 안됨
    }, error -> {
    	log.error("error : " + error);
    });

3) .empty()

Subscriber에게 아무 값도 전달하지 않고, onComplete 이벤트만 전달하는 메서드이다.

Mono.empty()
	.subscribe(v -> {
		log.info("value : " + v); // 실행 안됨
    }, null, () -> {
    	log.info("complete");
    });
    
Flux.empty()
	.subscribe(v -> {
		log.info("value : " + v); // 실행 안됨
    }, null, () -> {
    	log.info("complete");
    });

4) .fromXXX() - Mono

다양한 연산 결과를 그 결과를 Subscriber에게 전달하는 경우엔 아래와 같은 메서드들을 사용할 수 있다.

  • fromCallable: Callable 함수형 인터페이스를 실행하고 반환값을 onNext로 전달
  • fromFuture: Future를 받아서 done 상태가 되면, 반환값을 onNext로 전달
  • fromSupplier: Supplier 함수형 인터페이서를 실행하고 반환 값을 onNext 전달
  • fromRunnable: Runnable 함수형 인터페이스를 실행하고 onComplete 이벤트를 전달
Mono.fromCallable(() -> {
	return 1;
}).subscribe(v -> {
	log.info("value : " + v);
});

Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
	return 1;
})).subscribe(v -> {
	log.info("value : " + v);
});

Mono.fromSupplier(() -> {
	return 1;
}).subscribe(v -> {
	log.info("value : " + v);
});

Mono.fromRunnable(() -> {
	// 아무 것도 안해도 됨
}).subscribe(null, null, v -> {
	log.info("value : " + v);
});

5) .fromXXX() - Flux

위에서 Mono의 .fromXXX 메서드들을 살펴봤다면, 이번에는 Flux의 .fromXXX 메서드들을 알아보자.

  • fromIterable: Iterable을 받아서 각각의 값을 onNext로 전달
  • fromStream: Stream을 받아서 각각의 값을 onNext로 전달
  • fromArray: Array를 받아서 각각의 값을 onNext로 전달
  • range(start, n): start부터 시작해서 1씩 커진 값을 n개 만큼 onNext로 전달
Flux.fromIterable(
	List.of(1, 2, 3)
).subscribe(v -> {
	log.info("value : " + v);
});

Flux.fromStream(
	IntStream.range(1, 3).boxed()
).subscribe(v -> {
	log.info("value : " + v);
});

Flux.fromArray(
	new Integer[]{1, 2, 3}
).subscribe(v -> {
	log.info("value : " + v);
});

Flux.range(1, 5).subscribe(v -> {
	log.info("value : " + v);
});

6) generate

조건문을 사용하거나, Callback 함수를 사용해야하는 경우 유연하게 Flux를 다루기 위해서 사용되는 메서드이다.

동기적으로 Flux를 생성한다는 특징을 갖고 있다.
.generate()의 첫번째 인자는 초기값을 제공한느 Callable을 넘기고, 두번째 인자는 Flux 생성에 필요한 로직을 담은 BiFunction 클래스를 넘긴다. 이는 generator라고 부른다.

  • generate: 동기적으로 Flux를 생성한다.
Flux.generate(
        () -> 0,
        (state, sink) -> {
            sink.next(state);
            if (state == 0) {
                sink.complete();
            }
            return state + 1;
        }
    ).subscribe(v -> {
        log.info("onNext : " + v);
    }, error -> {
        log.error("onError : " + error);
    }, () -> {
        log.info("onComplete");
    });

7) create

위에서 알아본 generate와 그 목적은 같지만, 비동기적으로 Flux를 생성하는 create 메서드 또한 존재한다.

Flux.create(sink -> {
	
        var task1 = CompletableFuture.runAsync(() -> {
            for (int i = 0; i < 5; i++) {
                sink.next(i);
            }
        });

        var task2 = CompletableFuture.runAsync(() -> {
            for (int i = 5; i < 10; i++) {
                sink.next(i);
            }
        });

        // 비동기 적으로 10개의 아이템을 생성
        CompletableFuture.allOf(task1, task2)
            .thenRun(sink::complete);
        
	}).subscribe(v -> {
		log.info("onNext : " + v);
	}, error -> {
		log.error("onError : " + error);
	}, () -> {
		log.info("onComplete");
	});

8) handle

독립적으로 값을 생성할 순 없고, 존재하는 Source에 연결할 수 있다.
Interceptor와 같은 역할로 값을 필터링하거나 수정하는 역할을 한다.

Flux.fromStream(IntStream.range(1, 3).boxed())
	.handle((value, sink) -> {
    	// 3 이외의 값은 거름
    	if (value > 2) {
        	sink.next(value);
        }
	}).subscribe(v -> {
		log.info("onNext : " + v);
	}, error -> {
		log.error("onError : " + error);
	}, () -> {
		log.info("onComplete");
	});


🧙‍♂️ 많이 사용되는 연산자들


1) map, mapNotNull

  • onNext 이벤트를 받아서 값을 변경하고 이후로 전달한다.
  • mapNotNull은 값이 null이 아닌 것만 이후로 전달한다.
Flux.range(1, 5)
	.map(v -> v * 2)
    .doOnNext(v -> {
    	log.info("doOnNext : " + v);
    })
    .subscribe();
    
Flux.range(1, 5)
	.mapNotNull(v -> {
    	if (v % 2 == 0) {
        	return v;
        }
        return null;
    })
    .doOnNext(v -> {
    	log.info("doOnNext : " + v);
    })
    .subscribe();

2) doOnXXX

이벤트의 기본적인 흐름에 영향을 주지 않으며, 로직을 실행시키고 싶은 경우에 사용한다.
특정 이벤트에 대해서 로깅을 하거나 부가적인 처리를 하고 싶을 때 사용할 수 있다. 예시로 아래와 같은 메서드들이 있다.

  • doOnSubscribe
  • doOnNext
  • doOnComplete
  • doOnError
Flux.range(1, 5)
	.map(v -> v * 2)
    .doOnNext(v -> {
    	log.info("doOnNext : " + v);
    })
    .doOnComplete(v -> {
    	log.info("doOnComplete : " + v);
    })
    .doOnSubscribe(v -> {
    	log.info("doOnSubscribe : " + v);
    })
    .doOnRequest(v -> {
    	log.info("doOnRequest : " + v);
    })
    .subscribe();

위와 같은 코드가 있다면, 아래와 같은 순서대로 코드가 실행된다.

  1. doOnSubscribe
  2. doOnRequest
  3. doOnNext
  4. doOnComplete

3) flatMap

onNext 이벤트를 받아서 Publisher를 반환하는 메서드이다.
여러 Publisher를 조합해야하는 경우에 유용하다.

Flux.range(1, 5)
	.flatMap(v -> {
    	return Flux.range(1, 2)
        	.map(v2 -> v + " , " + v2)
            .publishOn(Schedulers.parallel());
    })
    .subscribe();

4) filter

onNext 이벤트를 받아서 boolean을 반환하는 메서드이다. 반환하는 값이 true라면, onNext 이벤트를 전파하고, false라면 전파하지 않는다.

Flux.range(1, 5)
	.filter(v -> v % 2 == 0) // 2, 4 만 전파
    .subscribe();

5) take, takeLast

  • take: n개까지 onNext 이벤트를 전파하고 n개에 도달하면 onComplete 이벤트를 발생시킴
  • takeLast: onComplete 이벤트가 발생하기 전, n개의 아이템만 전파
Flux.range(1, 10)
	.take(5)
    .doOnNext(v -> {
    	log.info("1 ~ 5 전파됨");
    })
    .subscribe();
    
Flux.range(1, 10)
	.takeLast(5)
    .doOnNext(v -> {
    	log.info("6 ~ 10 전파됨");
    })
    .subscribe();

6) skip, skipLast

  • skip: 처음 n개의 onNext 이벤트는 무시하고, 그 이후 onNext 이벤트를 전파
  • skipLast: onComplete가 발생하기 전, n개의 onNext 이벤트를 무시

7) collectList

next 이벤트가 전달되면 내부에 값을 저장해두었다가 complete 이벤트가 전달되면 저장했던 값들을 List 형태로 만들어서 한번에 onNext 이벤트로 전파한다.
FluxMono로 바꿀 때 유용하다.

Flux.range(1, 3)
	.collectList()
    .doOnNext(v -> {
    	log.info("doOnNext : " + v); // v를 List.of(1, 2, 3); 로 받게됨
    })
    .subscribe();


☕ 참고


0개의 댓글

관련 채용 정보