Future/CompletableFuture

のの·2021년 1월 5일
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class FutureExample {
    // 제곱을 계산하는 Callable 객체를 만든다.
    public Callable<Long> calSquare(long value) {
        Callable<Long> callable = () -> {
            Long returnValue = value * value;
            TimeUnit.SECONDS.sleep(3);
            System.out.println(value + "의 제곱근은 : " + returnValue);
            return returnValue;
        };
        return callable;
    }

    public void executeTest() {
        List<Long> sampleDataList = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
        List<Future<Long>> futureList = new ArrayList<>();

        // 스레드 풀을 생선. 고정 스레드 풀을 이용한다.
        int coreSize = Runtime.getRuntime().availableProcessors();
        ExecutorService executorService = Executors.newFixedThreadPool(coreSize);

        // Callable 객체를 생성한 후 스레드 풀에 등록
        // 등록된 스레드에 대해 Future 객체를 리턴받음
        for (Long sampleValue : sampleDataList) {
            Future<Long> future = executorService.submit(calSquare(sampleValue));
            futureList.add(future);
        }

        Long sumValue = 0L;

        // Future 목록의 결과를 확인

        for (Future<Long> future : futureList) {
            try {
                // 결과를 읽어 들일 때까지 대기
                // 대기하는 동안, 스레드가 계산을 하고 값을 리턴한다.
                sumValue += future.get();
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("최종 합계 :" + sumValue);
        executorService.shutdown();

    }

    public static void main(String[] args) {
        FutureExample futureExample = new FutureExample();
        futureExample.executeTest();
    }
}

Future 인터페이스를 이용해 비동기 연산을 실행하면 저수준의 스레드 프로그래밍을 하지 않아도 비동기 처리가 가능하며 처리 상태를 확인하고 결과를 쉽게 조회할 수 있다.

Future< Long > future = servicepool.submit(calSquare(sampleValue));

submit 메서드는 입력 파라미터로 Runnable과 Callable을 받을 수 있다. Runnable과 Callable은 멀티 스레드 기반으로 동작하기 위한 인터페이스라는 공통점이 있지만 Runnable은 메서드가 void형인 run 메서드만 있기 때문에 실행 결과를 리턴하지 않는다. 그에 비해 Callable 인터페이스는 제네릭으로 정의한 리턴타입을 가지는 call 메서드가 제공된다. 즉 비동기로 데이터를 처리한 이후에 그 결과를 리턴할 필요가 있다면 Callable을 이용해야 한다.

isDone, isCancelled 메서드를 사용해서 비동기 연산이 종료 혹은 취소됐는지 확인 가능
get 결괏값을 응답 받을 때까지 대기한다.

  • Callable 인터페이스의 구현체를 실행시키기 위한 스레드 풀을 정의하기 위해 ExecutorService 객체를 생성한다.
  • 스레드 풀인 ExecutorService에 Callable 구현체를 등록하고 Future를 리턴 받는다. 리턴받은 Future는 향후에 값을 확인하기 위해 List와 같은 컬렉션에 등록한다.
  • Future로 연산의 결과를 확인하기 위해 get 메서드를 호출한다. get 메서드는 비동기 연산이 종료될 때까지 대기한다.

CompletableFuture

  • 스레드의 선언 없이도 비동기 연산 작업을 구현할 수 있고 병렬 프로그래밍이 가능하다.
  • 람다 표현식과 함수형 프로그래밍을 사용할 수 있어서 코드의 양을 현저히 줄일 수 있다.
  • 파이프라인 형태로 작업들을 연결할 수 있어서 비동기 작업의 순서를 정의하고 관리할 수 있다.

CompletableFuture 클래스 메서드

  • runAsync
    Runnable 구현체를 이용해서 비동기 연산 작업을 하기 위한 새로운 Completable Future 객체를 리턴한다.
  • supplyAsync
    Supplier 함수형 인터페이스의 구현체를 이용해서 비동기 연산 작업을 위한 새로운 CompletableFuture 객체를 리턴한다
  • thenAccept
    현재 단계가 성공적으로 종료되었을 경우, 메서드의 파라미터로 전달된 Consumer 함수형 인터페이스의 구현체를 실행하기 위한 CompletionStage 객체를 리턴한다.
  • thenRun
    현재 단계가 성공적으로 종료되었을 경우, 메서드의 파라미터로 전달된 Runnable 구현체를 실행하기 위한 CompletionState 객체를 리턴한다.
  • complete
    현재 테스트를 종료하며 만일 테스크가 동작 중이라면 get 메서드와 동일하게 종료될 때까지 대기하고, 최종 테스트 결과를 리턴한다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureExample {

    public static void main(String[] args) {
        Runnable mainTask = () -> {
            try{
                TimeUnit.SECONDS.sleep(2);
            } catch (Exception e){
                e.printStackTrace();
            }
            System.out.println("Main Task : " + Thread.currentThread().getName());
        };

        Runnable subTask = () -> System.out.println("Next Task : " + Thread.currentThread().getName());

        // ExecutorService

        ExecutorService executor = Executors.newFixedThreadPool(4);

        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);
        CompletableFuture.runAsync(mainTask, executor).thenRun(subTask);

    }

}

Main Task : pool-1-thread-1
Main Task : pool-1-thread-4
Main Task : pool-1-thread-2
Next Task : pool-1-thread-4
Next Task : pool-1-thread-1
Main Task : pool-1-thread-3
Next Task : pool-1-thread-3
Next Task : pool-1-thread-2
Main Task : pool-1-thread-4
Next Task : pool-1-thread-4

runAsync: Runnable 인터페이스 구현체를 실행시킨다. run 메서드가 void 타입이기 때문에 값을 외부에 리턴할 수 없다.
supplyAsync: Supplier 인터페이스 구현체를 실행시킨다. Supplier 인터페이스는 자바에서 기본 제공하는 함수형 인터페이스이며, 입력 파라미터는 없고 리턴 값만 있다. 그러므로 runAsync와 달리 리턴 객체를 받아서 결과를 확인할 수 있다.

동기식 API를 비동기 API로 리팩토링

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class InsuranceCalculator {

    public int calculatePrice(Map condition){
        // 기본 가격
        int price = 10000;

        // 보험료 계산하는 로직 대신 10초 대기하는 것으로 대체
        try{
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return price;
    }

    public static void main(String[] args) {
        InsuranceCalculator cal = new InsuranceCalculator();

        for(int i=0; i<5; i++){
            System.out.println(String.format("계산 치수 %s : %s\n", (i+1), cal.calculatePrice(null)));
        }
    }
}

계산공식 대신 3초간 대기하는 코드를 이용했다. 하나의 계산 작업에 3초의 시간이 걸리므로 실행하는데 총 15초의 시간이 소요된다.
이 코드를 비동기 처리 방식으로 변경하려면 두 가지 선택 방법이 있다.

  1. calculatePrice 메서드는 그대로 둔 채 이를 호출하는 로직을 비동기 방식으로 전환
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class InsuranceCalculator {

    public int calculatePrice(Map condition){
        // 기본 가격
        int price = 10000;

        // 보험료 계산하는 로직 대신 10초 대기하는 것으로 대체
        try{
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return price;
    }

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(4);
        List<Future<Integer>> futureList = new ArrayList<>();

        for(int i=0; i<8; i++){
            Future<Integer> future = executorService.submit(() -> new InsuranceCalculator().calculatePrice(null));
            futureList.add(future);
        }
        futureList.forEach((future) -> {
            try{
                System.out.println(String.format("계산 결과 : %s", future.get()));
            }catch (Exception e){
                e.printStackTrace();
            }
        });

    }
}

  1. calculatePrice 메서드가 비동기로 처리되도록 내부 구조를 변경한다. 이렇게 하면 이 메서드를 호출하는 부분의 영향을 최소화하면서 비동기로 계산 작업이 수행되도록 할 수 있다.
    public Future<Integer> calculateAsync(Map condition){
        CompletableFuture<Integer> future = new CompletableFuture<>();
        
        new Thread(()->{
            int price = calculatePrice(condition);
            // 처리 상태에 대한 래퍼런스를 등록한다.
            future.complete(price);
        }).start();
        
        return future;
    }

메서드 내부의 일부를 비동기 처리하기 위해 new Thread를 이용해서 스레드를 생성하는 것이다. 그리고 해당 스레드에 대한 참조를 얻기 위해 complete 메서드를 이용해서 작업을 등록한다.
그리고 메서드 호출자에 Future 인터페이스를 리턴하면 해당 메서드를 호출하는 개발자는 Future 인터페이스의 get 메소드를 호출해서 결과를 조회하면 된다.

Future< Integer > future = cal.calculatePrice(null);


스트림 병렬처리

import java.util.Arrays;
import java.util.List;

public class ParallelReduceMinMax {
    public static void main(String[] args) {
        List<Integer> intList = Arrays.asList(4, 2, 8, 1, 9, 6, 8, 3, 5);

        // 최댓값 구하기 - 병렬
        int max = intList.parallelStream().reduce(1,Integer::max);
        System.out.println("max value is : " + max);

        // 최솟값 구하기 - 병렬
        int min = intList.parallelStream().reduce(1,Integer::min);
        System.out.println("max value is : " + min);

    }
}

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class InsideParallelStream {

    // 병렬 스트림 API 사용 예
    public static void main(String[] args) {
        List<Integer> intList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:MM:ss");

        // 스트림 내부의 스레드 값을 구함

        intList.parallelStream().forEach(
                value -> {
                    String threadName = Thread.currentThread().getName();
                    // 스레드 이름과 데이터 값을 출력
                    LocalDateTime currentTime = LocalDateTime.now();
                    System.out.printf(currentTime.format(formatter) +
                            String.format(" -> Thread Name : %s, Stream Value : %s\n", threadName, value));
                    // 시간 확인을 위해 2초간 sleep 한다.
                    try{
                        TimeUnit.SECONDS.sleep(1);
                    } catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
        );
    }
}

2021-01-05 17:01:52 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 8
2021-01-05 17:01:52 -> Thread Name : ForkJoinPool.commonPool-worker-5, Stream Value : 3
2021-01-05 17:01:52 -> Thread Name : ForkJoinPool.commonPool-worker-7, Stream Value : 2
2021-01-05 17:01:52 -> Thread Name : main, Stream Value : 6
2021-01-05 17:01:53 -> Thread Name : main, Stream Value : 5
2021-01-05 17:01:53 -> Thread Name : ForkJoinPool.commonPool-worker-5, Stream Value : 4
2021-01-05 17:01:53 -> Thread Name : ForkJoinPool.commonPool-worker-7, Stream Value : 1
2021-01-05 17:01:53 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 9
2021-01-05 17:01:54 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Value : 7

main 스레드는 스트림을 처리하기 위한 기본 스레드를 의미하고, 이 스레드가 3개의 ForkJoinPool의 스레드를 생성한 것이다. 이렇게 코어 수 기반의 스레드 생성은 스트림에서 제어한 것이 아니라 컨커런트 API의 ForkJoinPool의 기본값이며 여기에 영향을 받은 것이다.


스레드 수 조정

  1. ForkJoinPool의 기본 스레드 값을 조정
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class InsideParallelStream2 {
    public static void main(String[] args) {
        // 병렬 스트림의 예

        List<Integer> intList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:MM:ss");

        // 스레드 수 2개로 설정
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
        System.out.printf("## Thread Pool Size : %s\n", ForkJoinPool.getCommonPoolParallelism());

        intList.parallelStream().forEach( value -> {
            // 현재 스레드 이름을 구함
            String threadName = Thread.currentThread().getName();
            LocalDateTime currentTime = LocalDateTime.now();
            System.out.println(currentTime.format(formatter) +
                    String.format(" -> Thread Name : %s, Stream Name : %s", threadName, value));

            // 시간 확인을 위해 2초간 sleep 함

            try{
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        });


    }

}

## Thread Pool Size : 2
2021-01-05 18:01:06 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Name : 3
2021-01-05 18:01:06 -> Thread Name : main, Stream Name : 7
2021-01-05 18:01:06 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Name : 9
2021-01-05 18:01:08 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Name : 10
2021-01-05 18:01:08 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Name : 5
2021-01-05 18:01:08 -> Thread Name : main, Stream Name : 6
2021-01-05 18:01:10 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Name : 4
2021-01-05 18:01:10 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Name : 8
2021-01-05 18:01:12 -> Thread Name : ForkJoinPool.commonPool-worker-3, Stream Name : 2
2021-01-05 18:01:12 -> Thread Name : ForkJoinPool.commonPool-worker-1, Stream Name : 1

메인 스레드가 2개의 스레드를 fork 하여 총 3개의 스레드가 동작하였음. 만약 2개의 스레드로 실행하려면 값을 1로 설정해야한다.
설정한 스레드 값(ForkjoinPool의 기본 값을 변경하는 것이기 때문에 자바 가상 머신 전체에 영향을 미친다. 특정한 프로세스 혹은 스레드에 대해서만 변경할 수 없으며 가상 머신이 종료될 때까지 설정이 유효하다.

  1. ForkJoinPool이 아닌 다른 스레드 풀을 사용
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class InsideParallelStream3 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 병렬 스트림의 예
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:MM:ss");

        // 별도의 스레드 풀 2개 생성
        ForkJoinPool customPool = new ForkJoinPool(2);
        customPool.submit(() -> {
            // 스레드 풀 크기를 구한다.
            System.out.printf("## Thread Pool Size : %s\n", customPool.getParallelism());

            intList.parallelStream().forEach(value -> {
                // 현재 스레드 이름을 구함
                String threadName = Thread.currentThread().getName();
                LocalDateTime currentTime = LocalDateTime.now();
                System.out.println(currentTime.format(formatter) +
                        String.format(" -> Thread Name : %s, Stream Name : %s", threadName, value));

                // 시간 확인을 위해 2초간 sleep 함
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }).get();
    }
}

별도의 스레드 풀 2개를 생성한 후 submit 메서드를 구현하였다. commonPool을 사용했을 때와 달리 main 스레드가 없다. 이처럼 별도의 스레드풀을 생성하면 정확히 원하는 만큼의 스레드 풀을 생성할 수 있고 원하는 업무 혹은 스트림에 한정해서 스레드 풀의 개수를 변경할 수 있다.

profile
wannabe developer

0개의 댓글