테스트로 CompletableFuture 학습하기

최창효·2025년 3월 1일
0
post-thumbnail

CompletableFuture란

docs에서는 CompletableFuture를 CompletionStage로도 사용될 수 있으며 완료 시점에 실행할 수 있는 종속적인 함수와 작업을 지원하는 명시적으로 완료될 수 있는 Future라고 설명합니다.

쉽게 얘기하면 CompletableFuture는 비동기 프로그래밍을 실행할 수 있게 해주는 객체라고 볼 수 있습니다.

예제

CompletableFuture를 이용해 어떻게 비동기 프로그래밍을 실행할 수 있는지 살펴보겠습니다.

비동기 작업 실행

runAsync

반환값이 없는 비동기 작업을 실행합니다

    @Test
    void runAsync() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis ", (endTime-startTime)));
    }

  • Thread.sleep(5000)은 비동기로 실행됐기 때문에 테스트 코드는 CompletableFuture의 작업을 기다리지 않고 1ms만에 종료됐습니다.
  • runAsync는 반환값이 없기 때문에 CompletableFuture의 Generic이 Void타입입니다.

supplyAsync

반환값이 존재하는 비동기 작업을 실행합니다

    @Test
    void runAsync() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "awaken";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis ", (endTime-startTime)));
    }

  • supplyAsync는 반환값이 있기 때문에 CompletableFuture의 Generic에 Void가 아닌 반환 타입을 선언했습니다.
  • 예제코드의 CompletableFuture는 "awaken"이라는 문자열을 반환합니다. 하지만 테스트 코드는 이 비동기 작업을 기다리지 않고 진행되기 때문에 지금은 "awaken"이라는 문자열을 활용하지 못했습니다. "awaken"을 활용하려면 CompletableFuture의 비동기 작업을 기다려야 합니다.

비동기 작업 기다리기

get

InterruptedException과 ExecutionException이라는 checked exception을 발생시킵니다

    @Test
    void get() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "awaken";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        try {
            String result = completableFuture.get();

            long endTime = System.currentTimeMillis();
            System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }

    }

  • get()으로 비동기 처리를 기다렸기 때문에 5010ms만에 종료됐습니다.
  • completableFuture의 결과값을 받아와 이를 활용했습니다.
  • checked exception인 InterruptedException과 ExecutionException에 대한 처리가 필요합니다.

join

예외가 발생하면 해당 예외를 unchecked exception인 CompletionException으로 감싸서 예외를 발생시킵니다

    @Test
    void join() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "awaken";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        String result = completableFuture.join();

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));

    }

  • join()으로 비동기 처리를 기다렸기 때문에 5012ms만에 종료됐습니다.
  • completableFuture의 결과값을 받아와 이를 활용했습니다.
  • unchecked exception이기 때문에 명시적으로 예외 처리를 하지 않아도 됩니다.

반환값이 없는 runAsync에 대한 작업도 get과 join을 통해 기다리면 됩니다.

콜백 처리

thenApply

CompletableFuture의 반환값을 받아 다른 값을 반환합니다

    @Test
    void thenApply() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "awaken";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        int result = completableFuture
                .thenApply(completableFutureResult -> 999999)
                .join();

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
    }

  • thenApply는 함수형 인터페이스 Function을 매개변수로 받습니다. (T->R)
  • CompletableFuture가 반환한 타입과 다른 타입의 결과를 반환할 수도 있습니다. awaken문자열 대신 999999 숫자를 반환했습니다.

thenAccept

CompletableFuture의 반환값을 받아 처리 후 값을 반환하지 않습니다

    @Test
    void thenAccept() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "awaken";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        completableFuture
                .thenAccept(completableFutureResult -> System.out.println(String.format("preProcessed : %s", completableFutureResult)))
                .join();

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis.", (endTime-startTime)));
    }

  • thenAccept는 함수형 인터페이스 Consumer를 매개변수로 받습니다. (T->void)
  • CompletableFuture가 반환한 결과값을 곧바로 소비하고 아무런 값도 반환하지 않습니다.

thenRun

CompletableFuture작업이 끝난 뒤 다른 작업을 실행합니다. CompletableFuture의 반환값을 받지 않습니다

    @Test
    void thenRun() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "awaken";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        completableFuture
                .thenRun(() -> {
                    try {
                        long middleTime = System.currentTimeMillis();
                        System.out.println(String.format("now we spent %d millis", (middleTime - startTime)));
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                })
                .join();

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis.", (endTime-startTime)));
    }

  • thenRun은 함수형 인터페이스 Runnable을 매개변수로 받습니다. (() -> void)
  • completableFuture에서의 작업이 끝나고 뒤이어 다음 작업을 실행합니다. 이때 completableFuture의 결과값인 "awaken"을 받아올 수 없습니다.
  • thenRun을 시작하기 전에 5012ms가 흘러있었고, thenRun에서 3000ms를 추가로 소비해 모든 작업이 끝나기까지 총 8019ms의 시간이 걸렸습니다.

작업 결합

thenCompose

두 개의 CompletableFuture작업을 이어서 연속적으로 처리합니다

    @Test
    void thenCompose() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "one";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        String result = f1.thenCompose(f1Result -> { // 앞선 CompletableFuture의 결과를 받음
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return CompletableFuture.supplyAsync(() -> { // CompletableFuture를 반환
                return f1Result + "two";
            });
        }).join();

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
    }

  • thenCompose는 함수형 인터페이스 Function을 매개변수로 받습니다. (T->R)
    이때 매개변수 T는 앞선 CompletableFuture의 결과값이며 반환값 R은 CompletableFuture(정확히는 CompletionStage의 하위타입)입니다.
  • 람다 표현식을 이용하면 String result = f1.thenCompose(f1Result -> CompletableFuture.supplyAsync(() -> f1Result + "two")).join();으로 간단하게 나타낼 수 있습니다.
  • f1이 먼저 실행된 후 thenCompose의 CompletableFuture가 실행되기 때문에(순차적) 8019ms가 걸렸습니다.

thenCombine

두 개의 CompletableFuture작업을 각자 실행하고 둘 다 완료됐을 때 그 결과를 활용한 처리를 진행합니다

        long startTime = System.currentTimeMillis();

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "one";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                return "two";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        String result = f1
                .thenCombine(f2, (f1Result, f2Result) -> f1Result + f2Result)
                .join();

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));

  • thenCombine은 CompletableFuture(정확히는 CompletionStage의 하위타입)와 BiFunction을 매개변수로 받습니다.
    앞선 CompletableFuture의 결과값과 매개변수로 받은 CompletableFuture의 결과값을 이용해 BiFunction을 실행합니다. (T,U->R)
  • f1이 먼저 실행된 후 f2를 실행하는 게 아니라, f1과 f2를 독립적으로 실행했기 때문에 8000ms가 아닌 5008ms가 걸렸습니다.

allOf

여러 CompletableFuture작업을 각자 실행하고 모두 완료됐을 때 그 결과를 활용한 처리를 진행합니다

    @Test
    void allOf() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "one";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                return "two";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(4000);
                return "three";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        List<CompletableFuture<String>> completableFutures = List.of(f1, f2, f3);

        CompletableFuture<Void> allDone = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));

        String result = allDone.thenApply(v -> completableFutures.stream()
                .map(CompletableFuture::join) // 각 CompletableFuture에서 결과를 가져옴
                .collect(Collectors.joining()) // 각 CompletableFuture의 결과를 합침
                )
                .join();

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
    }

  • allOf를 통해 f1, f2, f3작업이 모두 끝났는지를 확인할 수 있습니다.
    allOf.thenApply를 통해 모든 작업이 끝났을 때 각각의 CompletableFuture에서 join()을 실행해 결과를 가져오고 이를 합쳤습니다.
  • 모든 작업이 끝날 때까지 기다렸기 때문에 가장 느린 작업인 5000ms정도의 시간이 소요됐습니다.

anyOf

여러 CompletableFuture작업을 각자 실행하고 가장 빠른 하나가 완료됐을 때 그 결과를 활용한 처리를 진행합니다

    @Test
    void anyOf() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                return "one";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                return "two";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(4000);
                return "three";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });


        CompletableFuture<Object> anyDone = CompletableFuture.anyOf(f1,f2,f3);

        String result = anyDone.thenApply(v -> "fastest work" + v)
                .join();

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
    }

  • anyOf를 통해 f1, f2, f3작업 중 가장 빠른 하나가 끝났을 때 동작할 수 있습니다.

예외 처리

예외를 받아 로직을 처리합니다

exceptionally

    @ParameterizedTest
    @ValueSource(booleans =  {true, false})
    void exceptionally(boolean isError) {
        String result = CompletableFuture
                .supplyAsync(() -> {
                    if (isError) {
                        throw new RuntimeException("error");
                    }
                    return "hello";
                })
                .exceptionally(e -> {
                    return e.getMessage();
                })
                .join();

        System.out.println("test result = " + result);
    }

  • exceptionally는 CompletableFuture에서 예외가 발생했을 때 해당 예외를 인자로 받아 CompletableFuture가 원래 반환하려는 타입의 값을 반환합니다

handle

결과값과 예외를 함께 받아 로직을 처리합니다

    @ParameterizedTest
    @ValueSource(booleans =  {true, false})
    void handle(boolean isError) {
        String result = CompletableFuture
                .supplyAsync(() -> {
                    if (isError) {
                        throw new RuntimeException("error");
                    }
                    return "hello";
                })
                .handle((v,e) -> {
                    return v + "||" + ((e == null) ? "error not occurred" : e.getMessage());
                })
                .join();

        System.out.println("test result = " + result);
    }

  • handle은 CompletableFuture의 결과값과 예외를 모두 인자로 받아 CompletableFuture가 원래 반환하려는 타입의 값을 반환합니다.
    exceptionally와 달리 CompletetableFuture에서 예외가 발생하지 않았어도 실행됩니다.

기타

CompletableFuture가 사용하는 Thread

CompletableFuture가 사용하는 스레드는 기본적으로 ForkJoinPool.commonPool()에서 관리되는 스레드입니다.

commonPool은 PC의 논리 프로세서 개수 - 1개의 스레드를 관리하고 있습니다.

이 값은 VM옵션을 통해 변경할 수 있습니다
-Djava.util.concurrent.ForkJoinPool.common.parallelism=?

제 PC의 논리 프로세서는 12개입니다.

그래서 commonPool의 스레드의 개수는 11개인걸 확인할 수 있습니다.

    @Test
    void ForkJoinPoolThreadCount() {
        int parallelism = ForkJoinPool.getCommonPoolParallelism();
        System.out.println("ForkJoinPool common pool parallelism: " + parallelism);
    }

따라서 동시에 실행되는 작업이 스레드 개수(11개)를 초과하면, 초기에 스레드를 할당받지 못한 작업은 대기하다가 스레드를 획득한 후에야 실행될 수 있습니다.

    @Test
    void threadTest() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(5000);
                Thread thread = Thread.currentThread();
                System.out.println("threadName = " + thread.getName());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        
        /*
         f1과 동일하게 f2~f11 정의
         */
        
        CompletableFuture<Void> f12 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(5000);
                Thread thread = Thread.currentThread();
                System.out.println("threadName = " + thread.getName());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        
        List<CompletableFuture<Void>> completableFutures = List.of(f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12);
        
        CompletableFuture<Void> allDone = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));
        allDone.join();

        long endTime = System.currentTimeMillis();

        System.out.println(endTime - startTime);
	}        

  • 사용 가능한 스레드가 없어 대기하고 있다가 가장 빨리 반납된 스레드(worker-3)를 사용해 작업을 진행했습니다.
  • 이로 인해 모든 작업이 끝나기까지 5000ms가 아닌 10013ms가 걸렸습니다.

CompletableFuture의 Executor 지정하기

commonPool이 아닌 다른 Pool의 스레드를 사용하는 것도 가능합니다.

    @Test
    void threadTest() {
        long startTime = System.currentTimeMillis();

        CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(5000);
                Thread thread = Thread.currentThread();
                System.out.println("threadName = " + thread.getName());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        
        /*
         f1과 동일하게 f2~f10 정의
         */

        CompletableFuture<Void> f11 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(5000);
                Thread thread = Thread.currentThread();
                System.out.println("threadName = " + thread.getName());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        
		Executor executor = Executors.newFixedThreadPool(10);
        CompletableFuture<Void> f12 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(5000);
                Thread thread = Thread.currentThread();
                System.out.println("threadName = " + thread.getName());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executor); // 직접 정의한 Executor 사용
        
        List<CompletableFuture<Void>> completableFutures = List.of(f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12);
        
        CompletableFuture<Void> allDone = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));
        allDone.join();

        long endTime = System.currentTimeMillis();

        System.out.println(endTime - startTime);
	}  

  • f1~f11까지의 작업은 commonPool에서 스레드를 얻어오고, f12는 제가 정의한 Executor에서 스레드를 얻어옵니다.
  • 모든 작업이 즉시 스레드를 할당받을 수 있어 5018ms만에 모든 작업이 종료됩니다.

References

profile
기록하고 정리하는 걸 좋아하는 백엔드 개발자입니다.

0개의 댓글