public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
@RequiredArgsConstructor
public static class Person {
@Getter
private final String name;
public Boolean compareTo(Person o) {
return o.name.compareTo(name) > 0;
}
}
public static void print(String name) {
System.out.println(name);
}
public static void main(String[] args) {
var target = new Person("f");
Consumer<String> staticPrint = MethodReferenceExample::print;
Stream.of("a", "b", "g", "h")
.map(Person::new)// constructor reference
.filter(target::compareTo) // method reference
.map(Person::getName) // instance method reference
.forEach(staticPrint); // static method reference
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
get()
get(long timeout,TimeUnit unit)
isDone(), isCancelled()
cancel(booleanmayInterruptIfRunning)
public static Future<Integer> getFuture() {
var executor = Executors.newSingleThreadExecutor();
try {
return executor.submit(() -> {
return 1;
});
} finally {
executor.shutdown();
}
}
public static Future<Integer> getFutureCompleteAfter1s() {
var executor = Executors.newSingleThreadExecutor();
try {
return executor.submit(() -> {
Thread.sleep(1000);
return 1;
});
} finally {
executor.shutdown();
}
}
getFuture()
: 새로운 쓰레드를 생성하여 1을 반환getFutureCompleteAfter1S()
: 새로운 쓰레드를 생성하고 1초 대기 후 1을 반환public interface ExecutorService extends Executor {
void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);
void shutdown();
}
execute(Runnable command)
: Runnable 인터페이스를 구현한 작업을 쓰레드 풀에서 비동기적으로 실행
<T> Future<T> submit(Callable<T> task)
: Callable 인터페이스를 구현한 작업을 쓰레드 풀에서 비동기적으로 실행하고, 해당 작업의 결과를 Future<T>
로 반환
shutdown()
: ExecutorService를 종료. 더 이상 task를 받지 않는다.
public interface CompletionStage<T> {
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U > fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U > fn);
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
}
Helper.completionStage()
.thenApplyAsync(value -> {
log.info("thenApplyAsync: {}", value);
return value + 1;
}).thenAccept(value -> {
log.info("thenAccept: {}", value);
}).thenRunAsync(() -> {
log.info("thenRun");
}).exceptionally(e -> {
log.info("exceptionally: {}", e.getMessage());
return null;
});
Thread.sleep(100);
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}
@FunctionalInterface
public interface Supplier<T> {
T get();
}
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}
CompletionStage<Void> thenAccept(Consumer<? super T> action);
CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
finishedStage
: 1을 반환하는 완료된 CompletableFuture 반환 (future는 항상 종료시킨 다음에 반환함 == 무조건 Done 상태)runningStage
: 1초를 sleep한 후 1을 반환하는 completableFuture (future는 무조건 진행중인 상태)@SneakyThrows
public static CompletionStage<Integer> finishedStage() {
var future = CompletableFuture.supplyAsync(() > {
log.info("supplyAsync");
return 1;
});
Thread.sleep(100);
return future;
}
public static CompletionStage<Integer> runningStage() {
return CompletableFuture.supplyAsync(() > {
try {
Thread.sleep(1000);
log.info("I'm running!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
});
}
CompletionStageThenAcceptExample.java
log.info("start main");
CompletionStage<Integer> stage = Helper.finishedStage();
stage.thenAccept(i > {
log.info("{} in thenAccept", i);
}).thenAccept(i > {
log.info("{} in thenAccept2", i);
});
log.info("after thenAccept");
Thread.sleep(100);
[main] - start main
[ForkJoinPool.commonPool-worker-19] - return in future
[main] - 1 in thenAccept
[main] - null in thenAccept2
[main] - after thenAccept
CompletionStageThenAcceptAsyncExample.java
log.info("start main");
CompletionStage<Integer> stage = Helper.finishedStage();
stage.thenAcceptAsync(i > {
log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i > {
log.info("{} in thenAcceptAsync2", i);
});
log.info("after thenAccept");
Thread.sleep(100)
[main] - start main
[ForkJoinPool.commonPool-worker-19] - return in future
[main] - after thenAccept
[ForkJoinPool.commonPool-worker-19] - 1 in thenAcceptAsync
[ForkJoinPool.commonPool-worker-5] - null in thenAcceptAsync2
done
상태에서 thenAccept
는 caller(main)
의 쓰레드에서 실행된다.done
상태의 completionStage
에 thenAccept
를 사용하는 경우, caller 쓰레드를 block 할 수 있다.done
상태가 아닌 thenAcceptAsync
는 callee(forkJoinPool)
의 쓰레드에서 실행done
상태가 아닌 completionStage
에 thenAcceptAsync
를 사용하는 경우, callee를 block 할 수 있다.thenAccept
CompletionStageThenAcceptRunningExample.java
log.info("start main");
CompletionStage<Integer> stage = Helper.runningStage();
stage.thenAccept(i > {
log.info("{} in thenAccept", i);
}).thenAccept(i > {
log.info("{} in thenAccept2", i);
});
Thread.sleep(2000);
[main] INFO - start main
[ForkJoinPool.commonPool-worker-19] INFO - I'm running!
[ForkJoinPool.commonPool-worker-19] INFO - 1 in thenAccept
[ForkJoinPool.commonPool-worker-19] INFO - null in thenAccept2
thenAcceptAsync
log.info("start main");
CompletionStage<Integer> stage = Helper.runningStage();
stage.thenAcceptAsync(i > {
log.info("{} in thenAcceptAsync", i);
}).thenAcceptAsync(i > {
log.info("{} in thenAcceptAsync", i);
});
Thread.sleep(2000);
output
[main] INFO - start main
[ForkJoinPool.commonPool-worker-19] INFO - I'm running!
[ForkJoinPool.commonPool-worker-5] INFO - 1 in thenAcceptAsync
[ForkJoinPool.commonPool-worker-5] INFO - null in thenAcceptAsync2
CompletionStageThenAcceptAsyncExecutorExample.java
var single = Executors.newSingleThreadExecutor();
var fixed = Executors.newFixedThreadPool(10);
log.info("start main");
CompletionStage<Integer> stage = Helper.completionStage();
stage.thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync", i);
}, fixed).thenAcceptAsync(i -> {
log.info("{} in thenAcceptAsync2", i);
}, single);
log.info("after thenAccept");
Thread.sleep(200);
single.shutdown();
fixed.shutdown();
[main] - start main
[ForkJoinPool.commonPool-worker-19] - return in future
[main] - after thenAccept
[pool-3-thread-1] - 1 in thenAcceptAsync
[pool-2-thread-1] - null in thenAcceptAsync2
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}
<U> CompletionStage<U>
thenApply(Function<? super T,? extends U> fn);
<U> CompletionStage<U>
thenApplyAsync(Function<? super T,? extends U> fn);
CompletionStageThenApplyAsyncExample.java
CompletionStage<Integer> stage = Helper.completionStage();
stage.thenApplyAsync(value -> {
var next = value + 1;
// logging
return next;
}).thenApplyAsync(value -> {
var next = "result: " + value;
// logging
return next;
}).thenApplyAsync(value -> {
var next = value.equals("result: 2");
// logging
return next;
}).thenAcceptAsync(value -> log.info("{}", value));
Thread.sleep(100);
[ForkJoinPool.commonPool-worker-19] - return in future
[ForkJoinPool.commonPool-worker-19] - in thenApplyAsync: 2
[ForkJoinPool.commonPool-worker-19] - in thenApplyAsync2: result: 2
[ForkJoinPool.commonPool-worker-19] - in thenApplyAsync3: true
[ForkJoinPool.commonPool-worker-19] - true
@FunctionalInterface
public interface Function<T, R> {
default <V> Function<V, R> compose(Function<? super V, ? extends T> before)
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}
}
<U> CompletionStage<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn);
<U> CompletionStage<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn);
CompletionStageThenComposeAsyncExample.java
CompletionStage<Integer> stage = Helper.completionStage();
stage.thenComposeAsync(value -> {
var next = Helper.addOne(value);
log.info("in thenComposeAsync: {}", next);
return next;
}).thenComposeAsync(value -> {
var next = Helper.addResultPrefix(value);
log.info("in thenComposeAsync2: {}", next);
return next;
}).thenAcceptAsync(value -> {
log.info("{} in thenAcceptAsync", value);
});
Thread.sleep(1000);
public static CompletionStage<Integer> addOne(int value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return value + 1;
});
}
19:14:48 [main] - start main
19:14:48 [ForkJoinPool.commonPool-worker-19] - I'm running!
19:14:48 [ForkJoinPool.commonPool-worker-19] - in thenComposeAsync:
java.util.concurrent.CompletableFuture@37b05857[Not completed]
19:14:48 [ForkJoinPool.commonPool-worker-19] - in thenComposeAsync2:
java.util.concurrent.CompletableFuture@6398d2c5[Not completed]
19:14:48 [ForkJoinPool.commonPool-worker-5] - result: 2 in thenAcceptAsync
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
CompletionStageThenRunAsyncExample.java
log.info("start main");
CompletionStage<Integer> stage = Helper.completionStage();
stage.thenRunAsync(() -> {
log.info("in thenRunAsync");
}).thenRunAsync(() -> {
log.info("in thenRunAsync2");
}).thenAcceptAsync(value -> {
log.info("{} in thenAcceptAsync", value);
});
Thread.sleep(100);
48:32 [main] - start main
48:32 [ForkJoinPool.commonPool-worker-19] - return in future
48:32 [ForkJoinPool.commonPool-worker-19] - in thenRunAsync
48:32 [ForkJoinPool.commonPool-worker-19] - in thenRunAsync2
48:32 [ForkJoinPool.commonPool-worker-19] - null in thenAcceptAsync
CompletionStage<T> exceptionally(
Function<Throwable, ? extends T> fn);
CompletionStageExceptionallyExample.java
Helper.completionStage()
.thenApplyAsync(i -> {
log.info("in thenApplyAsync");
return i / 0;
}).exceptionally(e -> {
log.info("{} in exceptionally", e.getMessage());
return 0;
}).thenAcceptAsync(value -> {
log.info("{} in thenAcceptAsync", value);
});
Thread.sleep(1000);
55:36 [ForkJoinPool.commonPool-worker-19] - return in future
55:36 [ForkJoinPool.commonPool-worker-5] - in thenApplyAsync
55:36 [ForkJoinPool.commonPool-worker-5] -
java.lang.ArithmeticException: / by zero in exceptionally
55:36 [ForkJoinPool.commonPool-worker-5] - 0 in thenAcceptAsync
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { … }
public static CompletableFuture<Void> runAsync(Runnable runnable) { … }
public boolean complete(T value) { … }
public boolean isCompletedExceptionally() { … }
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { … }
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { … }
}
@FunctionalInterface
public interface Supplier<T> {
T get();
}
public static <U> CompletableFuture<U> supplyAsync(
Supplier<U> supplier) { … }
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
public static CompletableFuture<Void> runAsync(
Runnable runnable) { … }
CompletableFutureSupplyAsyncExample.java
var future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
});
assert !future.isDone();
Thread.sleep(1000);
assert future.isDone();
assert future.get() == 1;
CompletableFutureRunAsyncExample.java
var future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assert !future.isDone();
Thread.sleep(1000);
assert future.isDone();
assert future.get() == null;
CompletableFuture<Integer> future = new CompletableFuture<>();
assert !future.isDone();
var triggered = future.complete(1);
assert future.isDone();
assert triggered;
assert future.get() == 1;
triggered = future.complete(2);
assert future.isDone();
assert !triggered;
assert future.get() == 1;
var futureWithException = CompletableFuture.supplyAsync(() -> {
return 1 / 0;
});
Thread.sleep(100);
assert futureWithException.isDone();
assert futureWithException.isCompletedExceptionally();
public static CompletableFuture<Void> allOf(
CompletableFuture<?>... cfs) { … }
var startTime = System.currentTimeMillis();
var firstFuture = Helper.waitAndReturn(100, 1);
var secondFuture = Helper.waitAndReturn(500, 2);
var thirdFuture = Helper.waitAndReturn(1000, 3);
CompletableFuture.allOf(firstFuture, secondFuture, thirdFuture)
.thenAcceptAsync(v -> {
log.info("after allOf");
try {
log.info("first: {}", firstFuture.get());
log.info("second: {}", secondFuture.get());
log.info("third: {}", thirdFuture.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}).join();
var endTime = System.currentTimeMillis();
11:18 [ForkJoinPool.commonPool-worker-5] - waitAndReturn: 500ms
11:18 [ForkJoinPool.commonPool-worker-23] - waitAndReturn: 1000ms
11:18 [ForkJoinPool.commonPool-worker-19] - waitAndReturn: 100ms
11:19 [ForkJoinPool.commonPool-worker-5] - after allOf
11:19 [ForkJoinPool.commonPool-worker-5] - first: 1
11:19 [ForkJoinPool.commonPool-worker-5] - second: 2
11:19 [ForkJoinPool.commonPool-worker-5] - third: 3
11:19 [main] - elapsed: 1014ms
public static CompletableFuture<Object> anyOf(
CompletableFuture<?>... cfs) { … }
var startTime = System.currentTimeMillis();
var firstFuture = Helper.waitAndReturn(100, 1);
var secondFuture = Helper.waitAndReturn(500, 2);
var thirdFuture = Helper.waitAndReturn(1000, 3);
CompletableFuture.anyOf(firstFuture, secondFuture, thirdFuture)
.thenAcceptAsync(v -> {
log.info("after anyOf");
log.info("first value: {}", v);
}).join();
var endTime = System.currentTimeMillis();
log.info("elapsed: {}ms", endTime - startTime);
14:18 [ForkJoinPool.commonPool-worker-23] - waitAndReturn: 1000ms
14:18 [ForkJoinPool.commonPool-worker-19] - waitAndReturn: 500ms
14:18 [ForkJoinPool.commonPool-worker-5] - waitAndReturn: 100ms
14:18 [ForkJoinPool.commonPool-worker-9] - after anyOf
14:18 [ForkJoinPool.commonPool-worker-9] - first value: 1
14:18 [main] - elapsed: 114ms