
docs에서는 CompletableFuture를 CompletionStage로도 사용될 수 있으며 완료 시점에 실행할 수 있는 종속적인 함수와 작업을 지원하는 명시적으로 완료될 수 있는 Future라고 설명합니다.

쉽게 얘기하면
CompletableFuture는 비동기 프로그래밍을 실행할 수 있게 해주는 객체라고 볼 수 있습니다.
CompletableFuture를 이용해 어떻게 비동기 프로그래밍을 실행할 수 있는지 살펴보겠습니다.
반환값이 없는 비동기 작업을 실행합니다
@Test
void runAsync() {
long startTime = System.currentTimeMillis();
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis ", (endTime-startTime)));
}

반환값이 존재하는 비동기 작업을 실행합니다
@Test
void runAsync() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "awaken";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis ", (endTime-startTime)));
}

기다려야 합니다.InterruptedException과 ExecutionException이라는 checked exception을 발생시킵니다
@Test
void get() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "awaken";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
try {
String result = completableFuture.get();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

예외가 발생하면 해당 예외를 unchecked exception인 CompletionException으로 감싸서 예외를 발생시킵니다
@Test
void join() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "awaken";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String result = completableFuture.join();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
}

반환값이 없는 runAsync에 대한 작업도 get과 join을 통해 기다리면 됩니다.
CompletableFuture의 반환값을 받아 다른 값을 반환합니다
@Test
void thenApply() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "awaken";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
int result = completableFuture
.thenApply(completableFutureResult -> 999999)
.join();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
}

CompletableFuture의 반환값을 받아 처리 후 값을 반환하지 않습니다
@Test
void thenAccept() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "awaken";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
completableFuture
.thenAccept(completableFutureResult -> System.out.println(String.format("preProcessed : %s", completableFutureResult)))
.join();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis.", (endTime-startTime)));
}

CompletableFuture작업이 끝난 뒤 다른 작업을 실행합니다. CompletableFuture의 반환값을 받지 않습니다
@Test
void thenRun() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "awaken";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
completableFuture
.thenRun(() -> {
try {
long middleTime = System.currentTimeMillis();
System.out.println(String.format("now we spent %d millis", (middleTime - startTime)));
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.join();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis.", (endTime-startTime)));
}

두 개의 CompletableFuture작업을 이어서 연속적으로 처리합니다
@Test
void thenCompose() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "one";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String result = f1.thenCompose(f1Result -> { // 앞선 CompletableFuture의 결과를 받음
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return CompletableFuture.supplyAsync(() -> { // CompletableFuture를 반환
return f1Result + "two";
});
}).join();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
}

String result = f1.thenCompose(f1Result -> CompletableFuture.supplyAsync(() -> f1Result + "two")).join();으로 간단하게 나타낼 수 있습니다.두 개의 CompletableFuture작업을 각자 실행하고 둘 다 완료됐을 때 그 결과를 활용한 처리를 진행합니다
long startTime = System.currentTimeMillis();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "one";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
return "two";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String result = f1
.thenCombine(f2, (f1Result, f2Result) -> f1Result + f2Result)
.join();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));

여러 CompletableFuture작업을 각자 실행하고 모두 완료됐을 때 그 결과를 활용한 처리를 진행합니다
@Test
void allOf() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "one";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
return "two";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(4000);
return "three";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
List<CompletableFuture<String>> completableFutures = List.of(f1, f2, f3);
CompletableFuture<Void> allDone = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));
String result = allDone.thenApply(v -> completableFutures.stream()
.map(CompletableFuture::join) // 각 CompletableFuture에서 결과를 가져옴
.collect(Collectors.joining()) // 각 CompletableFuture의 결과를 합침
)
.join();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
}

여러 CompletableFuture작업을 각자 실행하고 가장 빠른 하나가 완료됐을 때 그 결과를 활용한 처리를 진행합니다
@Test
void anyOf() {
long startTime = System.currentTimeMillis();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "one";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
return "two";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(4000);
return "three";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Object> anyDone = CompletableFuture.anyOf(f1,f2,f3);
String result = anyDone.thenApply(v -> "fastest work" + v)
.join();
long endTime = System.currentTimeMillis();
System.out.println(String.format("this test ends %d millis. result : %s ", (endTime-startTime), result));
}

예외를 받아 로직을 처리합니다
@ParameterizedTest
@ValueSource(booleans = {true, false})
void exceptionally(boolean isError) {
String result = CompletableFuture
.supplyAsync(() -> {
if (isError) {
throw new RuntimeException("error");
}
return "hello";
})
.exceptionally(e -> {
return e.getMessage();
})
.join();
System.out.println("test result = " + result);
}


결과값과 예외를 함께 받아 로직을 처리합니다
@ParameterizedTest
@ValueSource(booleans = {true, false})
void handle(boolean isError) {
String result = CompletableFuture
.supplyAsync(() -> {
if (isError) {
throw new RuntimeException("error");
}
return "hello";
})
.handle((v,e) -> {
return v + "||" + ((e == null) ? "error not occurred" : e.getMessage());
})
.join();
System.out.println("test result = " + result);
}


CompletableFuture가 사용하는 스레드는 기본적으로 ForkJoinPool.commonPool()에서 관리되는 스레드입니다.
commonPool은 PC의 논리 프로세서 개수 - 1개의 스레드를 관리하고 있습니다.
이 값은 VM옵션을 통해 변경할 수 있습니다
-Djava.util.concurrent.ForkJoinPool.common.parallelism=?
제 PC의 논리 프로세서는 12개입니다.

그래서 commonPool의 스레드의 개수는 11개인걸 확인할 수 있습니다.
@Test
void ForkJoinPoolThreadCount() {
int parallelism = ForkJoinPool.getCommonPoolParallelism();
System.out.println("ForkJoinPool common pool parallelism: " + parallelism);
}

따라서 동시에 실행되는 작업이 스레드 개수(11개)를 초과하면, 초기에 스레드를 할당받지 못한 작업은 대기하다가 스레드를 획득한 후에야 실행될 수 있습니다.
@Test
void threadTest() {
long startTime = System.currentTimeMillis();
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
Thread thread = Thread.currentThread();
System.out.println("threadName = " + thread.getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
/*
f1과 동일하게 f2~f11 정의
*/
CompletableFuture<Void> f12 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
Thread thread = Thread.currentThread();
System.out.println("threadName = " + thread.getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
List<CompletableFuture<Void>> completableFutures = List.of(f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12);
CompletableFuture<Void> allDone = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));
allDone.join();
long endTime = System.currentTimeMillis();
System.out.println(endTime - startTime);
}

commonPool이 아닌 다른 Pool의 스레드를 사용하는 것도 가능합니다.
@Test
void threadTest() {
long startTime = System.currentTimeMillis();
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
Thread thread = Thread.currentThread();
System.out.println("threadName = " + thread.getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
/*
f1과 동일하게 f2~f10 정의
*/
CompletableFuture<Void> f11 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
Thread thread = Thread.currentThread();
System.out.println("threadName = " + thread.getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> f12 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
Thread thread = Thread.currentThread();
System.out.println("threadName = " + thread.getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executor); // 직접 정의한 Executor 사용
List<CompletableFuture<Void>> completableFutures = List.of(f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12);
CompletableFuture<Void> allDone = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));
allDone.join();
long endTime = System.currentTimeMillis();
System.out.println(endTime - startTime);
}
