Http통신을 위한 기존의 RestTemplate
은 비동기를 지원하지 않을 뿐더러 너무 성능이 비효율적이다.
이제 비동기 지원 + 효율적인 WebFlux
를 공부하자.
WebFlux를 훑어보면 리턴값이 Mono<T>
또는 Flux<T>
이다.
이들은 Reactive Stream의 Publisher<T>
를 상속받고 있다.
Mono는 단일값, Flux는 복수값을 리턴한다.
값을 담기 위해 기본적으로 Just()
를 사용한다.
@Test
void justString () {
Mono<String> monoString = Mono.just("hello");
monoString.subscribe(System.out::println);
}
subscribe()안에 액션을 집어넣는다. 테스트로 출력을 위해 printout을 넣었다.
난 어떻게 이 subscribe()가 동작하는 지 로그를 찍고 싶다면 just()
뒤에 log()
를 붙인다.
Mono<String> monoString = Mono.just("hello").log();
onSubscribe
, request(unbounded)
, onNext(hello)
등의 실행과정 중의 메서드들을 출력한다.
just()
는 값이 null일 때 NullPointerException
을 발생시키지만, justOrEmpty()
는 값이 null일 경우 빈 Mono 객체를 리턴하기 때문에 예외가 발생하지 않는다.
Mono<String> mono = Mono.justOrEmpty(value);
단순히 just() 안에 "hello world"
와 같은 고정값이 들어간다면 just()로 편하게 가면 되지만, 값이 비동기 작업 등을 통해서 만들어진 값
이라면?
Mono<String> mono = Mono.fromSupplier(() -> {
// 카페 주문서에서 손님 이름을 조회하고 가져옴
String customerName = fetchCustomerNameFromCafeOrder();
return customerName;
});
mono.subscribe(customerName -> {
System.out.println("손님 이름: " + customerName);
});
카페라고 가정하고 메뉴가 만들어지면 주문서에 들어간 이름을 리턴하고, 그 값을 출력하게 했다.
fromSupplier는 Supplier 함수의 결과를 받아와서 Mono/Flux를 즉시 생성한다.
그러나 defer는 subscribe() 함수로 호출하기 전까지는 실행이 되지 않는다고 한다.
defer는 값을 즉시 생성하지 않고 비용이 많이 드는 작업을 처리할 때, 매번 결과값이 다를 때(랜덤숫자?) 사용한다.
강제로 예외를 던지고 싶다면 Mono.error()
를 사용한다.
@Test
void justStringError () {
Mono<?> monoString = Mono.just("hello world")
.then(Mono.error(new RuntimeException("Exception occured")))
.log();
monoString.subscribe(System.out::println, (e) -> System.out.println(e.getMessage()));
}
에러 메세지가 출력되었고, Excepton
이 발생함에 따라 hello
가 실행되지 않았다.
@Test
void justStringList () {
Flux<String> fluxString = Flux.just("hello", "world", "AWS").log()
.concatWithValues("AWS") // fluxString 다음으로 concat할 값을 적을 때 (마지막에 출력된다.)
.concatWith(Flux.error(new RuntimeException("error in flux"))) // 에러를 강제로 발생, 이후 따라오는 concatWith들은 사용되지 않는다.
.log();
fluxString.subscribe(System.out::println, (e) -> System.out.println(e.getMessage()));
}
기존 값들에 또 값을 추가할 때 사용한다.
특이점은 concatWith()
안에 새로운 예외를 붙여서 에러 발생을 시킨 것. 실행 도중 예외가 발생한 상황이라고 보면 되겠다.
//배열
String[] colorsArray = {"Red", "Green", "Blue"};
Flux<String> colorFlux = Flux.fromArray(colorsArray);
//이터러블
List<String> colorsList = Arrays.asList("Red", "Green", "Blue");
Flux<String> colorFlux = Flux.fromIterable(colorsList);
start와 end를 받아서 end까지 출력한다.
1부터 5까지 출력하는 것을 볼 수 있다.
@Test
void range () {
Flux<Integer> fluxInteger = Flux.range(1, 5).log();
fluxInteger.subscribe(System.out::println);
}
이게 리소스 처리라고 해서 처음에 감이 안 잡혔다
특정 액션 이후 정리가 필요할 때
이걸 쓰는 것이다.
사용이 끝난 리소스를 정리하거나 해제하는 함수를 세번째 인자로 넣을 수 있다.
프로미스의 .then
처럼 첫번째 함수의 리턴값이 두번째 함수로 전달되고, 마지막으로 세번째 클린업 함수를 실행하고 종료된다.
@Test
void using () {
Mono<String> resourceMono = Mono.using(
() -> "jinvicky load...",
resource -> Mono.just(resource + " data"),
// resource -> Mono.error(new RuntimeException("Exception occured")), // 예외 발생시 error -> 호출
resource -> Mono.just(resource + " cleaned")
);
resourceMono.subscribe(
result -> System.out.println("리소스 결과값: " + result), // 두번째 함수 결과 출력
error -> System.err.println("에러 발생: " + error),
() -> System.out.println("클린업 함수 완료, 파람으로 resource를 받는 건 안되더라")
);
}
예를 들어서 카페 메뉴 중에서 가장 저렴한 메뉴 2개만 가져오고 싶다면,
@Test
void take () {
Flux<Map<String, Integer>> coffeeAndPrice = Flux.just(
Map.of("juice", 5500, "icecream", 3500, "coffee", 1000, "latte", 2000, "americano", 1500)).log();
coffeeAndPrice
.flatMapIterable(menu -> menu.entrySet()) // Map을 메뉴 항목으로 변환
.collectSortedList(Map.Entry.comparingByValue()) // 가격으로 정렬
.flatMapMany(entries -> Flux.fromIterable(entries).take(2)) // 처음 2개 항목 선택
.subscribe(entry -> System.out.println(entry.getKey() + ": " + entry.getValue()));
}
Map.Entry.comparingByValue()
를 사용하면 가격이 가장 낮은 순으로 정렬된다.
take(number)
로 가져올 값의 개수를 지정할 수 있다.
Mono와 Flux 자주 쓰는 operator (정리 굿)
https://hoons-dev.tistory.com/119
Reactor 공식 가이드
https://projectreactor.io/docs/core/release/reference/index.html#which-operator
기타
webFlux tutorial 유튜브