Spring webflux는 비동기, 논블로킹 방식으로 동작하는 웹 프레임워크이다.
Reactive 프로그래밍 모델을 사용해서 높은 동시 처리 성능을 보여준다.
주로 MSA 환경, 높은 처리량 확보, 실시간 데이터 처리가 필요한 경우에 많이 사용한다.
비동기, Non-blocking애플리케이션을 만들려면, 해당 애플리케이션의 모든 부분에 비동기, Non-blocking 적으로 동작해야한다.
그래서, 일반적인 Spring servlet 스택과 달리 Reactive 애플리케이션을 만들기 위한 스택이 별도로 존재하고, 위와 같은 기술들이 사용된다.
다시 말해서, 위에 명시된 모든 기술들이 각자의 방법으로 비동기적으로 데이터 또는 로직을 처리한다고 할 수 있다.
Reative stack에서 Spring webflux는 서버 애플리케이션이 담당하는 다양한 기능들을 비동기, Non-blocking 방식으로 처리하는 역할을 한다고 보면 된다.
이러한 처리를 위해서 Reactive streams의 구현체인 Reactor 라는 기술을 채택해서 사용하고 있다.
이후로는 Netty와 Reactor에 대해서 좀 더 자세히 알아보자.
Event loop은 Netty에서 비동기적인 처리가 가능하도록 하는 요소이고, 아래와 같은 구성 요소로 구성되어 있다.
N개의 EventLoop을 하나의 Group으로 지정할 수 있는데, 이를 EventLoopGroup이라고 부른다.
하나의 EventLoop은 하나의 쓰레드를 사용하여 처리를 함으로 N개의 EventLoop을 하나의 EventLoopGroup으로 설정하면, N개의 쓰레드를 사용하여 더 높은 처리량을 확보할 수 있다.
Java NIO의 기반이 되는 요소로, 읽기, 쓰기와 같은 I/O 작업을 수행할 수 있는 연결을 나타내는 추상적인 개념 또는 요소이다.
클라이언트가 서버에 연결을 요청할 때, 새로운 채널이 EventLoop에 등록된다.
이 기능 또는 요소는 웹소켓 통신에 주로 사용된다.
Reactive streams
의 구현체이다. (Spring webflux에서 사용됨)Publisher
와 이벤트를 받아서 처리하는 Subscriber
로 구성되어 비동기적인 처리를 수행한다. (⭐)Subscriber
에는 .request
라는 메서드가 있는데, 이는 Publisher에게 쌓여 있는 이벤트를 요청하는 메서드이고, 한번에 가져올 이벤트의 수를 매개변수로 넘겨줄 수 있다. 이러한 특징 또는 기능을 Backpressure라고 부르며, 데이터 처리의 안정성을 높이는 역할을 한다.Mono
와 Flux
가 존재한다.Mono
는 0 또는 1개의 데이터를 Subscirber에게 전달하는 퍼블리셔이고, Flux
는 0 ~ N개의 데이터를 Subscriber에게 전달하는 퍼블리셔이다..subscribe()
하지 않으면, 아무 일도 일어나지 않는다.별도의 처리 없이 Subscriber에게 바로 값을 전달하는 메서드이다.
Subscriber가 받게 되는 이벤트는 onNext
이다.
Mono.just(1)
.subscribe(v -> {
log.info("value : " + v);
});
Flux.just(1, 2, 3)
.subscribe(v -> {
log.info("value : " + v);
});
Subscriber에게 에러를 전달하는 메서드이다. Subscriber 입장에서는 onError
이벤트만 받게 된다.
Mono.error(new RuntimeException("err"))
.subscribe(v -> {
log.info("value : " + v); // 실행 안됨
}, error -> {
log.error("error : " + error);
});
Flux.error(new RuntimeException("err"))
.subscribe(v -> {
log.info("value : " + v); // 실행 안됨
}, error -> {
log.error("error : " + error);
});
Subscriber에게 아무 값도 전달하지 않고, onComplete
이벤트만 전달하는 메서드이다.
Mono.empty()
.subscribe(v -> {
log.info("value : " + v); // 실행 안됨
}, null, () -> {
log.info("complete");
});
Flux.empty()
.subscribe(v -> {
log.info("value : " + v); // 실행 안됨
}, null, () -> {
log.info("complete");
});
다양한 연산 결과를 그 결과를 Subscriber에게 전달하는 경우엔 아래와 같은 메서드들을 사용할 수 있다.
fromCallable
: Callable 함수형 인터페이스를 실행하고 반환값을 onNext
로 전달fromFuture
: Future를 받아서 done 상태가 되면, 반환값을 onNext
로 전달fromSupplier
: Supplier 함수형 인터페이서를 실행하고 반환 값을 onNext
전달fromRunnable
: Runnable 함수형 인터페이스를 실행하고 onComplete
이벤트를 전달Mono.fromCallable(() -> {
return 1;
}).subscribe(v -> {
log.info("value : " + v);
});
Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
return 1;
})).subscribe(v -> {
log.info("value : " + v);
});
Mono.fromSupplier(() -> {
return 1;
}).subscribe(v -> {
log.info("value : " + v);
});
Mono.fromRunnable(() -> {
// 아무 것도 안해도 됨
}).subscribe(null, null, v -> {
log.info("value : " + v);
});
위에서 Mono의 .fromXXX
메서드들을 살펴봤다면, 이번에는 Flux의 .fromXXX
메서드들을 알아보자.
fromIterable
: Iterable을 받아서 각각의 값을 onNext
로 전달fromStream
: Stream을 받아서 각각의 값을 onNext
로 전달fromArray
: Array를 받아서 각각의 값을 onNext
로 전달range(start, n)
: start
부터 시작해서 1씩 커진 값을 n
개 만큼 onNext
로 전달Flux.fromIterable(
List.of(1, 2, 3)
).subscribe(v -> {
log.info("value : " + v);
});
Flux.fromStream(
IntStream.range(1, 3).boxed()
).subscribe(v -> {
log.info("value : " + v);
});
Flux.fromArray(
new Integer[]{1, 2, 3}
).subscribe(v -> {
log.info("value : " + v);
});
Flux.range(1, 5).subscribe(v -> {
log.info("value : " + v);
});
조건문을 사용하거나, Callback 함수를 사용해야하는 경우 유연하게 Flux를 다루기 위해서 사용되는 메서드이다.
동기적으로 Flux를 생성한다는 특징을 갖고 있다.
.generate()
의 첫번째 인자는 초기값을 제공한느 Callable
을 넘기고, 두번째 인자는 Flux 생성에 필요한 로직을 담은 BiFunction
클래스를 넘긴다. 이는 generator라고 부른다.
generate
: 동기적으로 Flux를 생성한다.Flux.generate(
() -> 0,
(state, sink) -> {
sink.next(state);
if (state == 0) {
sink.complete();
}
return state + 1;
}
).subscribe(v -> {
log.info("onNext : " + v);
}, error -> {
log.error("onError : " + error);
}, () -> {
log.info("onComplete");
});
위에서 알아본 generate
와 그 목적은 같지만, 비동기적으로 Flux를 생성하는 create
메서드 또한 존재한다.
Flux.create(sink -> {
var task1 = CompletableFuture.runAsync(() -> {
for (int i = 0; i < 5; i++) {
sink.next(i);
}
});
var task2 = CompletableFuture.runAsync(() -> {
for (int i = 5; i < 10; i++) {
sink.next(i);
}
});
// 비동기 적으로 10개의 아이템을 생성
CompletableFuture.allOf(task1, task2)
.thenRun(sink::complete);
}).subscribe(v -> {
log.info("onNext : " + v);
}, error -> {
log.error("onError : " + error);
}, () -> {
log.info("onComplete");
});
독립적으로 값을 생성할 순 없고, 존재하는 Source에 연결할 수 있다.
Interceptor와 같은 역할로 값을 필터링하거나 수정하는 역할을 한다.
Flux.fromStream(IntStream.range(1, 3).boxed())
.handle((value, sink) -> {
// 3 이외의 값은 거름
if (value > 2) {
sink.next(value);
}
}).subscribe(v -> {
log.info("onNext : " + v);
}, error -> {
log.error("onError : " + error);
}, () -> {
log.info("onComplete");
});
onNext
이벤트를 받아서 값을 변경하고 이후로 전달한다.mapNotNull
은 값이 null
이 아닌 것만 이후로 전달한다.Flux.range(1, 5)
.map(v -> v * 2)
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.subscribe();
Flux.range(1, 5)
.mapNotNull(v -> {
if (v % 2 == 0) {
return v;
}
return null;
})
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.subscribe();
이벤트의 기본적인 흐름에 영향을 주지 않으며, 로직을 실행시키고 싶은 경우에 사용한다.
특정 이벤트에 대해서 로깅을 하거나 부가적인 처리를 하고 싶을 때 사용할 수 있다. 예시로 아래와 같은 메서드들이 있다.
doOnSubscribe
doOnNext
doOnComplete
doOnError
Flux.range(1, 5)
.map(v -> v * 2)
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.doOnComplete(v -> {
log.info("doOnComplete : " + v);
})
.doOnSubscribe(v -> {
log.info("doOnSubscribe : " + v);
})
.doOnRequest(v -> {
log.info("doOnRequest : " + v);
})
.subscribe();
위와 같은 코드가 있다면, 아래와 같은 순서대로 코드가 실행된다.
doOnSubscribe
doOnRequest
doOnNext
doOnComplete
onNext
이벤트를 받아서 Publisher를 반환하는 메서드이다.
여러 Publisher를 조합해야하는 경우에 유용하다.
Flux.range(1, 5)
.flatMap(v -> {
return Flux.range(1, 2)
.map(v2 -> v + " , " + v2)
.publishOn(Schedulers.parallel());
})
.subscribe();
onNext
이벤트를 받아서 boolean을 반환하는 메서드이다. 반환하는 값이 true
라면, onNext
이벤트를 전파하고, false
라면 전파하지 않는다.
Flux.range(1, 5)
.filter(v -> v % 2 == 0) // 2, 4 만 전파
.subscribe();
take
: n개까지 onNext
이벤트를 전파하고 n개에 도달하면 onComplete
이벤트를 발생시킴takeLast
: onComplete
이벤트가 발생하기 전, n개의 아이템만 전파Flux.range(1, 10)
.take(5)
.doOnNext(v -> {
log.info("1 ~ 5 전파됨");
})
.subscribe();
Flux.range(1, 10)
.takeLast(5)
.doOnNext(v -> {
log.info("6 ~ 10 전파됨");
})
.subscribe();
skip
: 처음 n개의 onNext
이벤트는 무시하고, 그 이후 onNext
이벤트를 전파skipLast
: onComplete
가 발생하기 전, n개의 onNext
이벤트를 무시next 이벤트가 전달되면 내부에 값을 저장해두었다가 complete 이벤트가 전달되면 저장했던 값들을 List 형태로 만들어서 한번에 onNext
이벤트로 전파한다.
Flux
를 Mono
로 바꿀 때 유용하다.
Flux.range(1, 3)
.collectList()
.doOnNext(v -> {
log.info("doOnNext : " + v); // v를 List.of(1, 2, 3); 로 받게됨
})
.subscribe();