Reactor Java 2. Mono와 Flux의 내부 데이터를 조작하는 방법

xellos·2022년 4월 22일
2

JAVA-Reactor

목록 보기
2/11

Reactor는 Reactive Streams 사양을 기반으로 JVM에서 반응성 비차단 애플리케이션을 생성하기 위한 Java 라이브러리이다.
이 글은 Mono 및 Flux 클래스를 통해 Reactor를 제공하는 Reactive Streams의 실행을 생성, 조작 및 관리하는 프로세스를 안내하는 것을 목표로 하는 시리즈의 두 번째 글이다. 여기서는 Mono 및 Flux의 값을 수정하고 변환하는 과정을 살펴봅니다.

Flux 또는 Mono에 수학적 연산 적용

1 ~ 100까지 모든 정수의 값을 제곱으로 계산한다고 해봅시다. 이 경우 먼저 1에서 100까지의 모든 정수 값을 포함하는 Flux를 생성해야 합니다. → Flux.range(1, 100)

그렇다면 각 값의 제곱은 어떻게 해야할까요?

Mono와 Flux는 인터페이스가 Flux<B> map(Function<A, B> mapper). 즉 유형 A의 요소를 B로 변환하는 map 메서드를 Flux<A>에 전달하면 Flux의 각 요소에 이 함수를 적용하고 그 결과 기존의 Flux<A> 대신에 Flux<B>를 결과로 가질 수 있게 됩니다.

  • 이는 Mono도 동일
Flux<Integer> squared = Flux.range(1, 100).map(x -> x * x);
squared.subscribe(x -> System.out.println(x + ", ");
//결과: 1, 4, 9, 16, ..., 10000

Flux 및 Mono에 여러 변환 적용

아래는 여러 map 호출을 연결하는 예제입니다.

public Disposable findFirstUsers(int count, Response resopnse) {
	return Flux.range(1, count)
    	.map(id -> UserApi.findUser(id))
        .map(user -> UserApi.getUserDetail(user))
        .collectList() //Flux<UserDeails>나 Mono<List<UserDetails>>로 변환
        .map(listUserDetails -> JsonWriter.toJson(listUserDetails))
        .subscribe(json -> response.status(HttpStatus.OK).send(json));
}

flatMap 메서드

Mono<T> flatMap 메서드는 map과 유사하지만 여러분이 제공하는 supplier가 Mono<T>나 Flux<T>를 반환해야 한다는 차이점이 있습니다. 따라서 map을 쓰는 경우는 Mono<Mono<T>>를 반환하는 반면 flatMap은 Mono<T>를 반환합니다.

예를 들어 Mono를 반환하는 Java api를 사용하여 데이터를 검색하기 위해 네트워크 호출을 수핸한 다음 첫 번째 네트워크 호출을 수행해야할 때 유용합니다.

//HttpClient.get 함수의 시그니처
Mono<JsonObjec> get(String url);
//호출을 위한 두 개의 URL
String firstUseUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details";
//map을 활용한 예제
//반환값은 Mono<Mono<...>>인데 그 이유는 HttpClient.get이 Mono를 반환하기 때문이다.
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl)
	.map(user -> HttpClient.get(userDetailsUrl + user.getId()));
//flatmap을 이용한 같은 예제
//이제야 우리가 예상한대로 결과를 얻을 수 있다.
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl)
	.flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));

flatMap 은 에러 처리 또한 확실히 할 수 있습니다.

public UserApi {
	private HttpClient httpClient;
    
    Mono<User> findUser(String username) {
    	String queryUrl = "http://my-api-address/users/" + username;
        
        return Mono.fromCallable(() -> httpClient.get(queryUrl))
        	.flatMap(response -> {
            	if(response.statusCode == 404)
                	return Mono.error(new NotFoundException("..."));
                else if(response.statusCode == 500)
                	return Mono.error(new InternalServerErrorException());
                else if(response.statuscode != 200)
                	return Mono.error(new Exception(""));
                    
             	return Mono.just(response.data);
            });
    }
}

Zip 메서드

사용자의 정보와 웹사이트에서 작성한 모든 댓글을 검색하는 메서드를 작성한다고 상상해봅시다.
flatMap을 사용하면 다음과 같이 작성할 수 있습니다.

public Mono<UserWithComments> getUserWithComments(String userId) {

	Mono<UserInfo> userInfo = Mono.fromCallable(
    	() -> BlockingUserApi.getUserInfo(userId));
        
	Mono<Comment> userComments = Mono.fromCallable(
    	() -> BlockingCommentsApi.getCommentsForUser(userId));
        
    Mono<UserWithComments> userWithComments = userInfo.flatMap(
    	info -> userComments.map(comments -> new UserWithComments(info, comments)));
        
    return userWithComments;
}

위 모드의 문제점은 프로그램이 userInfo 를 수신한 후 사용자 코멘트를 얻기위해 또 호출을 한다는 것입니다.

zip 메서드를 사용하면 여러 결과를 쉽게 결합할 수 있으며 zip 방법의 실행히 모든 실행 시간의 총합이 아니라 Mono가 가장 긴 시간만큼 걸린다는 장점이 있습니다.

public Mono<UserWithComments> getUserWithComments(String userId) {
	Mono<UserInfo> userInfo = Mono.fromCallable(
    	() -> BlockingUserApi.getUserInfo(userId));
        
    Mono<Comments> userComments = Mono.fromCallable(
    	() -> BlockingCommentsApi.getCommentsForUser(userId));
        
    Mono<UserWithComments> userWithComments = userInfo.zipWith(userComments)
    	.map(tuple -> {
        	UserInfo info = tuple.getT1();
            Comments comments = tuple.getT2();
            return new UserWithComments(info, comments);
        });
        
    return userWithComments;
}

Flux 또는 Mono 요소 필터링

Flux와 Mono가 가진 요소들에 대하여 filter를 적용할 수도 있습니다.

Flux<User> getAllAdmins() {
	List<User> allUses = Users.getAllUsers();
    
    return Flux.fromIterable(allUsers)
    			.filter(user -> user.getRoles().equals("ADMIN"));
}

Flux 의 하위 집합 선택

Flux를 사용하면서 사용자는 7개의 Flux Subset을 선택할 수 있습니다. 일반적으로는 첫 번째로 보이는 메서드가 가장 적합합니다. → 기본적으로는 take 메서드를 가장 많이 사용합니다.

public Flux<T> take(long n)
public Flux<T> take(Duration timespan)
public Flux<T> take(Duration timespan, Scheduler timer)
public Flux<T> takeLast(int n)
public Flux<T> takeUntil(Predicate<? super T> predicate)
public Flux<T> takeUntilOther(Publisher<?> other)
public Flux<T> takeWhile(Predicate<? super T> continuePredicate)

결론

두 번째 글은 Flux또는 Mono의 요소에 변형을 적용하는 방법을 알아보았습니다. 다음 글에서는 Mono와 Flux가 어떻게 동작하는지 살펴보겠습니다.

0개의 댓글