import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class FutureExample {
// 제곱을 계산하는 Callable 객체를 만든다.
public Callable<Long> calSquare(long value) {
Callable<Long> callable = () -> {
Long returnValue = value * value;
TimeUnit.SECONDS.sleep(3);
System.out.println(value + "의 제곱근은 : " + returnValue);
return returnValue;
};
return callable;
}
public void executeTest() {
List<Long> sampleDataList = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
List<Future<Long>> futureList = new ArrayList<>();
// 스레드 풀을 생선. 고정 스레드 풀을 이용한다.
int coreSize = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(coreSize);
// Callable 객체를 생성한 후 스레드 풀에 등록
// 등록된 스레드에 대해 Future 객체를 리턴받음
for (Long sampleValue : sampleDataList) {
Future<Long> future = executorService.submit(calSquare(sampleValue));
futureList.add(future);
}
Long sumValue = 0L;
// Future 목록의 결과를 확인
for (Future<Long> future : futureList) {
try {
// 결과를 읽어 들일 때까지 대기
// 대기하는 동안, 스레드가 계산을 하고 값을 리턴한다.
sumValue += future.get();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("최종 합계 :" + sumValue);
executorService.shutdown();
}
public static void main(String[] args) {
FutureExample futureExample = new FutureExample();
futureExample.executeTest();
}
}
Future 인터페이스를 이용해 비동기 연산을 실행하면 저수준의 스레드 프로그래밍을 하지 않아도 비동기 처리가 가능하며 처리 상태를 확인하고 결과를 쉽게 조회할 수 있다.
Future< Long > future = servicepool.submit(calSquare(sampleValue));
submit 메서드는 입력 파라미터로 Runnable과 Callable을 받을 수 있다. Runnable과 Callable은 멀티 스레드 기반으로 동작하기 위한 인터페이스라는 공통점이 있지만 Runnable은 메서드가 void형인 run 메서드만 있기 때문에 실행 결과를 리턴하지 않는다. 그에 비해 Callable 인터페이스는 제네릭으로 정의한 리턴타입을 가지는 call 메서드가 제공된다. 즉 비동기로 데이터를 처리한 이후에 그 결과를 리턴할 필요가 있다면 Callable을 이용해야 한다.
isDone, isCancelled 메서드를 사용해서 비동기 연산이 종료 혹은 취소됐는지 확인 가능
get 결괏값을 응답 받을 때까지 대기한다.
CompletableFuture
CompletableFuture 클래스 메서드
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CompletableFutureExample {
public static void main(String[] args) {
Runnable mainTask = () -> {
try{
TimeUnit.SECONDS.sleep(2);
} catch (Exception e){
e.printStackTrace();
}
System.out.println("Main Task : " + Thread.currentThread().getName());
};
Runnable subTask = () -> System.out.println("Next Task : " + Thread.currentThread().getName());
// ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
}
}
Main Task : pool-1-thread-1
Main Task : pool-1-thread-4
Main Task : pool-1-thread-2
Next Task : pool-1-thread-4
Next Task : pool-1-thread-1
Main Task : pool-1-thread-3
Next Task : pool-1-thread-3
Next Task : pool-1-thread-2
Main Task : pool-1-thread-4
Next Task : pool-1-thread-4
runAsync: Runnable 인터페이스 구현체를 실행시킨다. run 메서드가 void 타입이기 때문에 값을 외부에 리턴할 수 없다.
supplyAsync: Supplier 인터페이스 구현체를 실행시킨다. Supplier 인터페이스는 자바에서 기본 제공하는 함수형 인터페이스이며, 입력 파라미터는 없고 리턴 값만 있다. 그러므로 runAsync와 달리 리턴 객체를 받아서 결과를 확인할 수 있다.
동기식 API를 비동기 API로 리팩토링
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class InsuranceCalculator {
public int calculatePrice(Map condition){
// 기본 가격
int price = 10000;
// 보험료 계산하는 로직 대신 10초 대기하는 것으로 대체
try{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return price;
}
public static void main(String[] args) {
InsuranceCalculator cal = new InsuranceCalculator();
for(int i=0; i<5; i++){
System.out.println(String.format("계산 치수 %s : %s\n", (i+1), cal.calculatePrice(null)));
}
}
}
계산공식 대신 3초간 대기하는 코드를 이용했다. 하나의 계산 작업에 3초의 시간이 걸리므로 실행하는데 총 15초의 시간이 소요된다.
이 코드를 비동기 처리 방식으로 변경하려면 두 가지 선택 방법이 있다.
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class InsuranceCalculator {
public int calculatePrice(Map condition){
// 기본 가격
int price = 10000;
// 보험료 계산하는 로직 대신 10초 대기하는 것으로 대체
try{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return price;
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future<Integer>> futureList = new ArrayList<>();
for(int i=0; i<8; i++){
Future<Integer> future = executorService.submit(() -> new InsuranceCalculator().calculatePrice(null));
futureList.add(future);
}
futureList.forEach((future) -> {
try{
System.out.println(String.format("계산 결과 : %s", future.get()));
}catch (Exception e){
e.printStackTrace();
}
});
}
}
public Future<Integer> calculateAsync(Map condition){
CompletableFuture<Integer> future = new CompletableFuture<>();
new Thread(()->{
int price = calculatePrice(condition);
// 처리 상태에 대한 래퍼런스를 등록한다.
future.complete(price);
}).start();
return future;
}
메서드 내부의 일부를 비동기 처리하기 위해 new Thread를 이용해서 스레드를 생성하는 것이다. 그리고 해당 스레드에 대한 참조를 얻기 위해 complete 메서드를 이용해서 작업을 등록한다.
그리고 메서드 호출자에 Future 인터페이스를 리턴하면 해당 메서드를 호출하는 개발자는 Future 인터페이스의 get 메소드를 호출해서 결과를 조회하면 된다.
Future< Integer > future = cal.calculatePrice(null);
스트림 병렬처리
import java.util.Arrays;
import java.util.List;
public class ParallelReduceMinMax {
public static void main(String[] args) {
List<Integer> intList = Arrays.asList(4, 2, 8, 1, 9, 6, 8, 3, 5);
// 최댓값 구하기 - 병렬
int max = intList.parallelStream().reduce(1,Integer::max);
System.out.println("max value is : " + max);
// 최솟값 구하기 - 병렬
int min = intList.parallelStream().reduce(1,Integer::min);
System.out.println("max value is : " + min);
}
}
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class InsideParallelStream {
// 병렬 스트림 API 사용 예
public static void main(String[] args) {
List<Integer> intList = Arrays.asList(1,2,3,4,5,6,7,8,9);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:MM:ss");
// 스트림 내부의 스레드 값을 구함
intList.parallelStream().forEach(
value -> {
String threadName = Thread.currentThread().getName();
// 스레드 이름과 데이터 값을 출력
LocalDateTime currentTime = LocalDateTime.now();
System.out.printf(currentTime.format(formatter) +
String.format(" -> Thread Name : %s, Stream Value : %s\n", threadName, value));
// 시간 확인을 위해 2초간 sleep 한다.
try{
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException e){
e.printStackTrace();
}
}
);
}
}
2021-01-05 17:01:52 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 8
2021-01-05 17:01:52 -> Thread Name : ForkJoinPool.commonPool-worker-5, Stream Value : 3
2021-01-05 17:01:52 -> Thread Name : ForkJoinPool.commonPool-worker-7, Stream Value : 2
2021-01-05 17:01:52 -> Thread Name : main, Stream Value : 6
2021-01-05 17:01:53 -> Thread Name : main, Stream Value : 5
2021-01-05 17:01:53 -> Thread Name : ForkJoinPool.commonPool-worker-5, Stream Value : 4
2021-01-05 17:01:53 -> Thread Name : ForkJoinPool.commonPool-worker-7, Stream Value : 1
2021-01-05 17:01:53 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 9
2021-01-05 17:01:54 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 7
main 스레드는 스트림을 처리하기 위한 기본 스레드를 의미하고, 이 스레드가 3개의 ForkJoinPool의 스레드를 생성한 것이다. 이렇게 코어 수 기반의 스레드 생성은 스트림에서 제어한 것이 아니라 컨커런트 API의 ForkJoinPool의 기본값이며 여기에 영향을 받은 것이다.
스레드 수 조정
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class InsideParallelStream2 {
public static void main(String[] args) {
// 병렬 스트림의 예
List<Integer> intList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:MM:ss");
// 스레드 수 2개로 설정
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
System.out.printf("## Thread Pool Size : %s\n", ForkJoinPool.getCommonPoolParallelism());
intList.parallelStream().forEach( value -> {
// 현재 스레드 이름을 구함
String threadName = Thread.currentThread().getName();
LocalDateTime currentTime = LocalDateTime.now();
System.out.println(currentTime.format(formatter) +
String.format(" -> Thread Name : %s, Stream Name : %s", threadName, value));
// 시간 확인을 위해 2초간 sleep 함
try{
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e){
e.printStackTrace();
}
});
}
}
## Thread Pool Size : 2
2021-01-05 18:01:06 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Name : 3
2021-01-05 18:01:06 -> Thread Name : main, Stream Name : 7
2021-01-05 18:01:06 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Name : 9
2021-01-05 18:01:08 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Name : 10
2021-01-05 18:01:08 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Name : 5
2021-01-05 18:01:08 -> Thread Name : main, Stream Name : 6
2021-01-05 18:01:10 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Name : 4
2021-01-05 18:01:10 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Name : 8
2021-01-05 18:01:12 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Name : 2
2021-01-05 18:01:12 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Name : 1
메인 스레드가 2개의 스레드를 fork 하여 총 3개의 스레드가 동작하였음. 만약 2개의 스레드로 실행하려면 값을 1로 설정해야한다.
설정한 스레드 값(ForkjoinPool의 기본 값을 변경하는 것이기 때문에 자바 가상 머신 전체에 영향을 미친다. 특정한 프로세스 혹은 스레드에 대해서만 변경할 수 없으며 가상 머신이 종료될 때까지 설정이 유효하다.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class InsideParallelStream3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 병렬 스트림의 예
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:MM:ss");
// 별도의 스레드 풀 2개 생성
ForkJoinPool customPool = new ForkJoinPool(2);
customPool.submit(() -> {
// 스레드 풀 크기를 구한다.
System.out.printf("## Thread Pool Size : %s\n", customPool.getParallelism());
intList.parallelStream().forEach(value -> {
// 현재 스레드 이름을 구함
String threadName = Thread.currentThread().getName();
LocalDateTime currentTime = LocalDateTime.now();
System.out.println(currentTime.format(formatter) +
String.format(" -> Thread Name : %s, Stream Name : %s", threadName, value));
// 시간 확인을 위해 2초간 sleep 함
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}).get();
}
}
별도의 스레드 풀 2개를 생성한 후 submit 메서드를 구현하였다. commonPool을 사용했을 때와 달리 main 스레드가 없다. 이처럼 별도의 스레드풀을 생성하면 정확히 원하는 만큼의 스레드 풀을 생성할 수 있고 원하는 업무 혹은 스트림에 한정해서 스레드 풀의 개수를 변경할 수 있다.