Java - Future

INHEES·2025년 1월 13일

금일은 Java 의 비동기 작업의 결과를 나타내는 인터페이스에 대해 알아보겠습니다.

목차

  • Future의 개념
  • Future의 분석 및 활용
  • Future의 취소와 예외
  • ExecutorService 작업 컬렉션 처리

Future 의 개념

앞서 시간에 알아본 Callable 인터페이스는 java.utuil.concurrent 에서 제공하는 기능으로 Runnable 과는 다르게 Exception 을 포함한 예외를 던질 수 있으며 반환 타입은 제네릭 V 입니다.

우선 Callable 과 Future 의 사용에 대해 알아보겠습니다.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(1);
        Future<Integer> future = es.submit(new MyCallable());
        Integer result = future.get();
        log("result value = " + result);
        es.close();
    }

    static class MyCallable implements Callable<Integer> {
        @Override
        public Integer call() {
            log("Callable 시작");
            sleep(2000);
            int value = new Random().nextInt(10);
            log("create value = " + value);
            log("Callable 완료");
            return value;
        }
    }

해당 코드를 살펴보면 java.util.concurrent.Executors 가 제공하는 newFixedThreadPool(size) 를 사용하면 편리하게 Executors 를 생성가능합니다.

Runnable 과 차이점은 결과를 반환한다는 점으로 결과를 보관할 별도의 필드를 만들지 않아도 된다는 것이다.

MyCallable 인스턴스가 블로킹 큐에 전달되고 스레드가 실행되며 처리 결과가 직접 반환되는 것이아니라 Future 라는 인터페이스를 통해 반환되는 것이다.

단순하게 ExecutorService 에 필요한 작업을 요청하고 결과를 받아 쓸 수 있게되는 것이다.


Future 분석

Future<Integer> future = es.submit(new MyCallable());

결과를 바로 받환하지 않고 불편하게 Future 라는 객체를 대신 반환하여 future.get() 함수를 호출하는 이유에 대해 알아 보겠습니다.

MyCallable.call() 메서드는 호출 스레드가 실행하는것이 아니라 스레드 풀의 다른 스레드가 실행하기 때문에 언제 실행이 완료되어 결과를 반환할 지 알 수 없다 때문에 즉시 결과값을 받는 것이 불가능하며

Future 객체가 전달한 작업의 미래 결과를 담고 있다고 생각 할 수 있습니다.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(1);
        log("submit() 호출");
        Future<Integer> future = es.submit(new MyCallable());
        log("future 즉시 반환, future = " + future);

        log("future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING");
        Integer result = future.get();
        log("future.get() [블로킹] 메서드 호출 완료 -> main 스레드 RUNNABLE");

        log("result value = " + result);
        log("future 완료, future = " + future);
        es.close();
    }
  • 해당 코드 실행을 통해 요청 스레드가 es.submit() 를 호출하며 ExecutorService 는 전달한 task 의 결과를 알 수있는 Future 객체를 생성합니다.

  • es.submit(new MyCallable()) 함수 호출을 통해 Future 의 구현체인 FutureTaskRunnable 인터페이스를 구현하며 내부의 run() 메서드가 task 의 call() 메서드를 호출하게 됩니다.

  • future.get() 함수가 실행하게 되면 Future 가 완료 상태라면 결과가 포함되고 아니라면 요청 스래드는 결과를 받기 위해 대기합니다. 즉 main thread 는 Waiting 상태가 되는 것이다. (Blocking)

    es.submit() 함수는 future 객체를 즉시 반환하기 때문에 블로킹 되지 않습니다.

    블로킹 메서드 : Thread.join(), Future.get() 과 같은 메서드는
    스레드가 작업을 바로 수행하지 않고
    다른 작업이 완료될 때까지 기다리는 메서드이다. 
  • Future 의 구현체인 FutureTaskRunnable 인터페이스를 구현하며 내부의 run() 메서드가 task 의 call() 메서드를 호출하게 됩니다.

  • 작업이 완료된 thread 는 다음 WAITING 상태의 thread 를 깨우고 스레들 풀에 들어가게 됩니다. (main thread x)


Future 활용

조금 더 복잡한 예제 확인을 통해 Future 객체를 활용해 보겠습니다.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        SumTask task1 = new SumTask(1, 50);
        SumTask task2 = new SumTask(51, 100);

        ExecutorService es = Executors.newFixedThreadPool(2);

        Future<Integer> future1 = es.submit(task1);
        Future<Integer> future2 = es.submit(task2);

        Integer sum1 = future1.get();
        Integer sum2 = future2.get();

        log("task1.result=" + sum1);
        log("task2.result=" + sum2);

        int sumAll = sum1 + sum2;
        log("task1 + task2 = " + sumAll);
        log("End");

        es.close();
    }

    static class SumTask implements Callable<Integer> {
        int startValue;
        int endValue;

        public SumTask(int startValue, int endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }

        @Override
        public Integer call() throws InterruptedException {
            log("작업 시작");
            Thread.sleep(2000);
            int sum = 0;
            for (int i = startValue; i <= endValue; i++) {
                sum += i;
            }
            log("작업 완료 result=" + sum);
            return sum;
        }
    }

해당 코드를 살펴보면 ExecutorService, Callable 를 활용하여 기존의 Runnable, start, join 함수를 제거하며 Callable 이 Exception 을 던지기 때문에 체크 예외도 가능하다는 것을 확인할 수 있습니다.


Future 사용 이유

Future<Integer> future1 = es.submit(task1); // 여기는 블로킹 아님
Future<Integer> future2 = es.submit(task2); // 여기는 블로킹 아님
Integer sum1 = future1.get(); // 여기서 블로킹
Integer sum2 = future2.get(); // 여기서 블로킹

두개의 작업 task1, 2가 있다고 하면 2개의 요청 스레드는 즉시 Future를 반환 받습니다.

요청스레드는 task1, task2 를 동시에 요청하며 두 작업은 동시에 수행됩니다.

future1.get() 을 호출하며 작업 스레드는 약 2초가 소모가 된다고 하면 future2.get() 도 2초동안 수행되고 task1 의 2초 를 기다릴 필요없이 거의 즉시 task1, task2 가 거의 동시에 결과값을 반환한다.

만약 아래와 같이 코드를 작성하게 되면 총 작업의 시간이 4초가 걸리게 되므로 유의해야 합니다.

Integer sum1 = es.submit(task1).get(); // get 에서 블로킹
Integer sum2 = es.submit(task2).get();

submit 함수를 통해 병렬 처리를 만들고 future.get 을 통해 결과 값만 받으면 되는 것입니다.

Future 는 요청 스레드를 블로킹(대기) 상태로 만들지 않고 필요한 요청을 모두 수행할 수 있게 해줍니다.

이러한 이유로 ExecutorService 는 결과를 직접 반환하지 않고, Future 를 반환합니다.


Future 정리

Future Interface 는 다양한 메서드르 가지고 있습니다.

주요 메서드

boolean cancel(boolean mayInterruptIfRunning)

  • 아직 완료되지 않은 작업을 취소한다.
  • 작업이 실행 중이 아니거나 아직 시작되지 않았으면 취소하고, 실행 중인 작업의 경우
    mayInterruptIfRunning 이 true 이면 중단을 시도한다

boolean isCancelled()

  • 이 메서드는 작업이 cancel() 메서드에 의해 취소된 경우에 true 를 반환한다

boolean isDone()

  • 작업이 정상적으로 완료되었거나, 취소되었거나, 예외가 발생하여 종료된 경우에 true 를 반환한다
  • 그렇지 않은 경우 false 를 반환한다.

State state()

  • Future 의 상태를 반환한다. 자바 19부터 지원한다.

V get()

  • 작업이 완료될 때까지 get() 을 호출한 현재 스레드를 대기(블록킹)한다. 작업이 완료되면 결과를 반환한
  • InterruptedException : 대기 중에 현재 스레드가 인터럽트된 경우 발생
  • ExecutionException : 작업 계산 중에 예외가 발생한 경우 발생

V get(long timeout, TimeUnit unit)

  • get() 과 같은데, 시간 초과되면 예외를 발생시킨다.
  • TimeoutException : 주어진 시간 내에 작업이 완료되지 않은 경우 발생

Future 취소

Future interface 의 cancel() 함수 예제에 대해 알아보겠습니다.

    //private static boolean mayInterruptIfRunning = true; // 변경
    private static boolean mayInterruptIfRunning = false; // 변경

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1);
        Future<String> future = es.submit(new MyTask());
        log("Future.state: " + future.state());

        // 일정 시간 후 취소 시도
        sleep(3000);

        // cancel() 호출
        log("future.cancel(" + mayInterruptIfRunning + ") 호출");
        boolean cancelResult = future.cancel(mayInterruptIfRunning);
        log("cancel(" + mayInterruptIfRunning + ") result: " + cancelResult);

        // 결과 확인
        try {
            log("Future result: " + future.get());
        } catch (CancellationException e) {
            log("Future는 이미 취소 되었습니다.");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        es.close();
    }

    static class MyTask implements Callable<String> {
        @Override
        public String call() {
            try {
                for (int i = 0; i < 10; i++) {
                    log("작업 중: " + i);
                    Thread.sleep(1000); // 1초 동안 sleep
                }
            } catch (InterruptedException e) {
                log("인터럽트 발생");
                return "Interrupted";
            }
            return "Completed";
        }

    }

해당 코드를 살펴보면 mayInterruptIfRunning = true 일 때는 call() method 를 호출할때 InterruptedException 이 반환되며 main() 함수에서 CalcellationException 이 발생합니다.

mayInterruptIfRunning = false 일 경우에는 이미 실행중인 작업을 중단하지는 않으며 Callable 에서 결과값이 반환되지만 main method 에서 future.get() 함수 실행시 CalcellationException 이 발생합니다.

Future 예외

Future.get() 을 호출하면 작업의 결과값 뿐만 아니라, 작업 중에 발생한 예외도 받을 수 있다

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1);
        log("작업 전달");
        Future<Integer> future = es.submit(new ExCallable());
        sleep(1000); // 잠시 대기

        try {
            log("future.get() 호출 시도, future.state(): " + future.state());
            Integer result = future.get();
            log("result value = " + result);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            log("e = " + e);
            Throwable cause = e.getCause(); // 원본 예외
            log("cause = " + cause);
        }
        es.close();
    }

    static class ExCallable implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            log("Callable 실행, 예외 발생");
            throw new IllegalStateException("ex!");
        }
    }

다음 코드를 살펴보면

  • 요청 스레드 : es.submit(new ExCallable()) 을 호출해서 작업을 전달한다
    결과를 얻기 위해 future.get() 을 호출한다

    Future 의 상태가 FAILED 면 ExecutionException 예외를 던진다.
    이 예외는 내부에 앞서 Future 에 저장해둔 IllegalStateException 을 포함하고 있다.

    e.getCause() 을 호출하면 작업에서 발생한 원본 예외를 받을 수 있다. (IllegalStateException)

  • 작업 스레드 : ExCallable 을 실행하는데, IllegalStateException 예외가 발생한다

    작업 스레드는 Future 에 발생한 예외를 담아둔다


ExecutorService 작업 컬렉션 처리

ExecutorService 는 여러 작업을 한 번에 편리하게 처리하는 invokeAll() , invokeAny() 기능을 제공한다.

invokeAll(), invokeAll(timeout)

  • 모든 Callable 작업을 제출하고, 모든 작업이 완료될 때까지 기다린다
  • 지정된 시간 내에 모든 Callable 작업을 제출하고 완료될 때까지 기다린다.

invokeAny(), invokeAny(timeout)

  • 하나의 Callable 작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과를 반환한다.

  • 완료되지 않은 나머지 작업은 취소한다.

  • 지정된 시간 내에 하나의 Callable 작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과를 반
    환한다.

        [invokeAll]
        List<CallableTask> tasks = List.of(task1, task2, task3);

        List<Future<Integer>> futures = es.invokeAll(tasks);
        for (Future<Integer> future : futures) {
            Integer value = future.get();
            log("value = " + value);
        }
        
        ===========================================
        
        [invokeAny]
        List<CallableTask> tasks = List.of(task1, task2, task3);

        Integer value = es.invokeAny(tasks);
        log("value = " + value);

참고

inflearn

profile
이유를 찾아보자

0개의 댓글