Reactor는 Reactive Streams 사양을 기반으로 JVM에서 반응성 비차단 애플리케이션을 생성하기 위한 Java 라이브러리이다.
이 글은 Mono 및 Flux 클래스를 통해 Reactor를 제공하는 Reactive Streams의 실행을 생성, 조작 및 관리하는 프로세스를 안내하는 것을 목표로 하는 시리즈의 두 번째 글이다. 여기서는 Mono 및 Flux의 값을 수정하고 변환하는 과정을 살펴봅니다.
1 ~ 100까지 모든 정수의 값을 제곱으로 계산한다고 해봅시다. 이 경우 먼저 1에서 100까지의 모든 정수 값을 포함하는 Flux를 생성해야 합니다. → Flux.range(1, 100)
그렇다면 각 값의 제곱은 어떻게 해야할까요?
Mono와 Flux는 인터페이스가 Flux<B> map(Function<A, B> mapper). 즉 유형 A의 요소를 B로 변환하는 map 메서드를 Flux<A>에 전달하면 Flux의 각 요소에 이 함수를 적용하고 그 결과 기존의 Flux<A> 대신에 Flux<B>를 결과로 가질 수 있게 됩니다.
Flux<Integer> squared = Flux.range(1, 100).map(x -> x * x);
squared.subscribe(x -> System.out.println(x + ", ");
//결과: 1, 4, 9, 16, ..., 10000
아래는 여러 map 호출을 연결하는 예제입니다.
public Disposable findFirstUsers(int count, Response resopnse) {
return Flux.range(1, count)
.map(id -> UserApi.findUser(id))
.map(user -> UserApi.getUserDetail(user))
.collectList() //Flux<UserDeails>나 Mono<List<UserDetails>>로 변환
.map(listUserDetails -> JsonWriter.toJson(listUserDetails))
.subscribe(json -> response.status(HttpStatus.OK).send(json));
}
Mono<T> flatMap 메서드는 map과 유사하지만 여러분이 제공하는 supplier가 Mono<T>나 Flux<T>를 반환해야 한다는 차이점이 있습니다. 따라서 map을 쓰는 경우는 Mono<Mono<T>>를 반환하는 반면 flatMap은 Mono<T>를 반환합니다.
예를 들어 Mono를 반환하는 Java api를 사용하여 데이터를 검색하기 위해 네트워크 호출을 수핸한 다음 첫 번째 네트워크 호출을 수행해야할 때 유용합니다.
//HttpClient.get 함수의 시그니처
Mono<JsonObjec> get(String url);
//호출을 위한 두 개의 URL
String firstUseUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details";
//map을 활용한 예제
//반환값은 Mono<Mono<...>>인데 그 이유는 HttpClient.get이 Mono를 반환하기 때문이다.
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl)
.map(user -> HttpClient.get(userDetailsUrl + user.getId()));
//flatmap을 이용한 같은 예제
//이제야 우리가 예상한대로 결과를 얻을 수 있다.
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl)
.flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
flatMap 은 에러 처리 또한 확실히 할 수 있습니다.
public UserApi {
private HttpClient httpClient;
Mono<User> findUser(String username) {
String queryUrl = "http://my-api-address/users/" + username;
return Mono.fromCallable(() -> httpClient.get(queryUrl))
.flatMap(response -> {
if(response.statusCode == 404)
return Mono.error(new NotFoundException("..."));
else if(response.statusCode == 500)
return Mono.error(new InternalServerErrorException());
else if(response.statuscode != 200)
return Mono.error(new Exception(""));
return Mono.just(response.data);
});
}
}
사용자의 정보와 웹사이트에서 작성한 모든 댓글을 검색하는 메서드를 작성한다고 상상해봅시다.
flatMap을 사용하면 다음과 같이 작성할 수 있습니다.
public Mono<UserWithComments> getUserWithComments(String userId) {
Mono<UserInfo> userInfo = Mono.fromCallable(
() -> BlockingUserApi.getUserInfo(userId));
Mono<Comment> userComments = Mono.fromCallable(
() -> BlockingCommentsApi.getCommentsForUser(userId));
Mono<UserWithComments> userWithComments = userInfo.flatMap(
info -> userComments.map(comments -> new UserWithComments(info, comments)));
return userWithComments;
}
위 모드의 문제점은 프로그램이 userInfo 를 수신한 후 사용자 코멘트를 얻기위해 또 호출을 한다는 것입니다.
zip 메서드를 사용하면 여러 결과를 쉽게 결합할 수 있으며 zip 방법의 실행히 모든 실행 시간의 총합이 아니라 Mono가 가장 긴 시간만큼 걸린다는 장점이 있습니다.
public Mono<UserWithComments> getUserWithComments(String userId) {
Mono<UserInfo> userInfo = Mono.fromCallable(
() -> BlockingUserApi.getUserInfo(userId));
Mono<Comments> userComments = Mono.fromCallable(
() -> BlockingCommentsApi.getCommentsForUser(userId));
Mono<UserWithComments> userWithComments = userInfo.zipWith(userComments)
.map(tuple -> {
UserInfo info = tuple.getT1();
Comments comments = tuple.getT2();
return new UserWithComments(info, comments);
});
return userWithComments;
}
Flux와 Mono가 가진 요소들에 대하여 filter를 적용할 수도 있습니다.
Flux<User> getAllAdmins() {
List<User> allUses = Users.getAllUsers();
return Flux.fromIterable(allUsers)
.filter(user -> user.getRoles().equals("ADMIN"));
}
Flux를 사용하면서 사용자는 7개의 Flux Subset을 선택할 수 있습니다. 일반적으로는 첫 번째로 보이는 메서드가 가장 적합합니다. → 기본적으로는 take 메서드를 가장 많이 사용합니다.
public Flux<T> take(long n)
public Flux<T> take(Duration timespan)
public Flux<T> take(Duration timespan, Scheduler timer)
public Flux<T> takeLast(int n)
public Flux<T> takeUntil(Predicate<? super T> predicate)
public Flux<T> takeUntilOther(Publisher<?> other)
public Flux<T> takeWhile(Predicate<? super T> continuePredicate)
두 번째 글은 Flux또는 Mono의 요소에 변형을 적용하는 방법을 알아보았습니다. 다음 글에서는 Mono와 Flux가 어떻게 동작하는지 살펴보겠습니다.