CompletableFuture 정리2

Bruce Han·2023년 2월 15일
1

Java8-정리

목록 보기
17/20
post-thumbnail

이 포스팅의 코드 및 정보들은 강의를 들으며 정리한 내용을 토대로 작성한 것입니다.

Future만 가지고는 여러 작업들을 이어서 처리하는 게 힘들었다.

비동기적인 작업 여러개를 처리하는 거 자체가 콜백을 줄 수 없기 때문에 쉽지 않았다.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
	System.out.println("[Hello Thread] " + Thread.currentThread().getName());
    return "Hello";
});

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
    System.out.println("[World Thread] " + Thread.currentThread().getName());
    return "World";
});

이렇게 되면 Hello 끝난 다음에 World가 와야 한다. get()해서 기다려야 한다.
그 다음에 World의 get()을 해야 한다.

여러 작업들을 조합하는 방법

thenCompose()

CompletableFuture

이렇게 get()을 2개 쓰는 것보다 thenCompose()를 사용하여 앞의 작업과 뒤의 작업을 이어서 작업을 추가적으로 사용할 수 있다.

CompletableFuture1

world를 hello 다음에 바로 실행시키기 위해 hello의 결과를 참조할 수 있는 파라미터를 참고하도록 메서드를 따로 만든다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    	System.out.println("[Hello Thread] " + Thread.currentThread().getName());
    	return "Hello";
	});

	// hello.thenCompose(s -> getWorld(s));
    hello.thenCompose(Main::getWorld); // Method Reference
}

private static CompletableFuture<String> getWorld(String message) {
	return CompletableFuture.supplyAsync(() -> {
    	System.out.println("[World Thread] " + Thread.currentThread().getName());
        return message + " World";
	});
}

이 둘을 연결한 하나의 Future가 나오고, Future를 get()하면 Hello World를 받을 수 있다.

CompletableFuture2

이렇게 이어가는 방식은 두 Future간에 의존성이 있는 경우에 사용되고, 둘이 서로 연관관계가 없는 경우에 사용되는 메서드도 있다.

둘이 서로 연관관계는 없지만, 둘이 동시에 비동기적으로(따로) 실행하는 방법이 있다.

예를 들어, 주식의 경우에는 어느 한쪽이 먼저 와야지 다른 한쪽을 가져오는 구조가 아니라 따로 보내고 둘 다 결과가 왔을 때를 활용하고 싶은 경우이다.

thenCombine()

그때는 thenCombine()을 쓰면 된다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        System.out.println("[Hello Thread] " + Thread.currentThread().getName());
        return "Hello";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        System.out.println("[World Thread] " + Thread.currentThread().getName());
        return "World";
    });

    CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
}

CompletableFuture3

hello.thenCombine(world, 뭐시기)에서 뭐시기는 world랑 hello를 합친 것이고, hello의 결과랑 world의 결과가 둘 다 왔을 때 입력값은 2개, 결과값은 1개인 BiFunction 타입을 반환하는 것이다.

CompletableFuture4

이렇게 하면 둘의 결과가 다 왔을 때 뒤에 있는 BiFunction에 해당하는 람다가 실행된다.

모든 sub task를 다 합쳐서 실행하는 방법 - allOf()

CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
	System.out.println("[World Thread] " + Thread.currentThread().getName());
	return "World";
});

이런 sub task들이 두 개 이상일 때 여러 task를 다 합쳐서 실행할 수 있다. allOf()에 넘긴 모든 task가 다 끝났을 때 thenApply(), thenAccept() 등의 추가적인 callback을 실행할 수 있는 것이다.

하지만, 문제는 어떤 특정한 결과를 가져올 수 없다.

CompletableFuture5

이 노란색 형광으로 칠한 모든 task의 결과가 동일한 type이라는 보장도 없고, 그 중에 어떤 것들은 에러가 났을 수도 있다.

이게 결과값이라는 것 자체가 무의미하다.

CompletableFuture6

실행하면 thenAccept()의 print문에서 null을, future.get()의 print문에서 null이 출력될 것이다.

이거를 제대로(?) 받고 싶다면

예를 들어, 모든 task의 결과값을 Collection으로 만들어서 받고 싶으면

List<CompletableFuture> futures = Arrays.asList(hello, world);

이런 식으로 모든 task를 뭉쳐놓고

CompletableFuture<Void> future = CompletableFuture
.allOf(hello, world) // allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenAccept(System.out::println);

CompletableFuture<Void> future = CompletableFuture.allOf(futuresArray)
	.thenApply(v -> {
    	return futures.stream()
        			.map(f -> f.get());
	});

배열로 만든 futuresArray를 allOf()에 인수로 넘긴 후, 넘겨줬던 모든 future에 stream의 map()을 적용한다. thenApply()가 호출되는 이 시점에는 이미 모든 future의 작업이 다 끝났다. 즉, map()에서 get()을 호출해도 된다.

CompletableFuture7

그런데, get()에는 Checked Exception이 발생할 수 있으며, try~catch문을 걸어놓으면 코드가 너무 길어진다. 이를 대신하며 join()을 사용하면 try~catch를 걸어놓을 필요는 없지만 Unchecked Exception이 발생한다.

.map(CompletableFuture::join)

일단 get() 대신에 join()을 사용할텐데, 이 join()을 사용하면 future에서 반환하는 최종 결과값이 나올 것이다. 그 결과값을 stream의 collect()로 모아서 List로 만든다.

CompletableFuture8

CompletableFuture<List<Object>> results = CompletableFuture.allOf(futuresArray)
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

이렇게 하면 결과를 가져올 수 있다.

CompletableFuture9

results.get()을 출력하면 배열안에 인덱스 목록처럼 나온다.

CompletableFuture10

결과값 받아서 전부다 끝났을 때 그 결과값(results)으로 다른 작업을 처리할 수도 있다.
그리고 이때는 아무것도 blocking이 되지 않는다.

아무거나 하나! - anyOf()

anyOf()는 여러 작업 중 빨리 끝나는 것의 결과를 받아서 작업을 하는 거니까, allOf()처럼 List가 오는 것이 아니고 결과 하나가 온다.

CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
        future.get();

CompletableFuture11

CompletableFuture12

둘 중 아무거나 하나 먼저 오는 게 출력된다.

예외처리하는 방법

exceptionally(Function)

public static void main(String[] args) throws ExecutionException, InterruptedException {
    boolean throwError = true;

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        if(throwError) {
            throw new IllegalArgumentException();
        }
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });
}

비동기적으로 실행한 어떤 task안에서 에러가 발생한다면 exceptionally()에서 에러 타입을 받아서 무언가를 반환한다. 이때 Function을 넘겨줄 수 있는데, 이 Function으로 어떤 default값을 넘겨주면 된다.

값을 넘겨주게 되면, hello의 get()으로 결과를 얻다가 에러로 던져지고

.exceptionally(ex -> {
	System.out.println(ex); // ex는 Exception Type
    return "Error!!!";
});

이 부분으로 들어오게 된다.

CompletableFuture13

결과값을 Error로 반환했으니까 if(throwError) {} if문으로 들어오게 되는 것이다.

CompletableFuture14

에러가 없으면 에러를 던지지 않고 정상적으로 값을 출력한다.

exceptionally()보다 일반적으로 쓰이는 handle()

정상적으로 종료됐을 때와 에러가 발생했을 때 두 경우 모두 사용할 수가 있으며, BiFunction이 들어온다.

.handle((result, ex) -> {
    return result;
});

첫 번째 파라미터는 정상적인 경우의 결과값, 두 번째 파라미터는 Exception이 발생했을 때 error를 받는다.

정상적인 경우에는 result를 반환하면 되지만 에러가 있다면 "Error!!!"라고 반환하면 아까 전에 봤던 exceptionally()와 같은 결과가 나오게 된다.

CompletableFuture15

CompletableFuture16

정리

  • 조합하기
    • thenCompose() : 두 작업이 서로 이어서 실행하도록 조합
    • thenCombine() : 두 작업을 독립적으로 실행하고, 둘 다 종료했을 때 callback 실행
    • allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 callback 실행
    • anyOf() : 여러 작업 중에서 가장 빨리 끝난 하나의 결과에 callback 실행
  • 예외처리
    • exceptionally(Function)
    • handle(BiFunction)

Reference

profile
만 가지 발차기를 한 번씩 연습하는 사람은 두렵지 않다. 내가 두려워 하는 사람은 한 가지 발차기를 만 번씩 연습하는 사람이다. - Bruce Lee

0개의 댓글