Future

appti·2024년 3월 4일
0

분석

목록 보기
5/25

서론

CompletableFuture를 분석하기 전에 Future를 분석하고자 합니다.

사전 지식

Runnable & Callable

Runnable과 Callable은 모두 함수형 인터페이스로, 특정 Task를 수행할 수 있는 인터페이스입니다.
둘 모두 별도의 스레드에서 Task를 실행시킬 수 있습니다.

다음과 같은 차이점이 있습니다.

종류RunnableCallable
메서드 시그니처run()call()
예외 처리checked exception 예외를 던질 수 없음checked exception 예외를 던질 수 있음
반환 값없음있음
용도간단한 작업 수행 시반환 값/예외 처리가 필요하거나 복잡한 작업 수행 시

Future는 반환 값이 존재하기 때문에, 내부적으로 Callable을 사용합니다.

Future

Future는 비동기 작업의 결과와 관련된 API를 제공하는 인터페이스입니다.
주로 멀티 스레드 환경에서 작업을 처리할 때 사용합니다.

위와 같은 API를 제공합니다.

  • get() : 비동기 작업의 결과를 조회하며, 이 때 메서드를 호출한 스레드는 대기합니다.
  • get(long time, TimeUnit unit) : 비동기 작업의 결과를 조회하며, 이 때 메서드를 호출한 스레드는 파라미터로 지정한 시간만큼만 대기합니다.
  • isCancelled() : 비동기 작업이 완료되기 전 취소되었는지 여부를 반환합니다.
  • isDone() : 비동기 작업이 완료되었는지 여부를 반환합니다.

Future & Callback

Future, Callback 모두 비동기 프로그래밍 시 사용하는 패턴으로써, 둘 모두 결과값을 반환할 수 있다는 공통점이 있습니다.

Future, Callback의 차이점은 다음과 같습니다.

종류FutureCallback
용도비동기 작업의 종료 시 결과 조회비동기 작업 완료 이후 수행할 동작
Blocking 여부OX

매우 큰 차이가 존재하기 때문에 상황에 맞춰서 사용해야 합니다.

RunnableFuture, FutureTask

Future는 인터페이스입니다.
그래서 런타임에서는 다양한 구현체를 사용하는데, 주로 사용되는 구현체는 바로 FutureTask입니다.

FutureTask는 RunnableFuture를 구현하며, RunnableFuture는 Runnable과 Future를 상속합니다.

이를 통해 FutureTask는 기본적인 Runnable에 더해 추가적으로 다양한 기능을 제공합니다.

AbstractExecutorService에서도 실제로는 FutureTask(RunnableFuture)를 사용하고 있음을 확인할 수 있습니다.
이는 ExecutorService를 통해 동작하고 있는 스레드 풀에서 수행하는 작업은 RunnableFuture라는 의미입니다.

그러므로 추후 Future 동작 방식과 내부 코드 구현을 분석할 때, FutureTask를 분석할 예정입니다.

Future 동작 방식

Future의 동작 방식은 다음과 같습니다.

메인 스레드는 동기적으로 동작하며, 나머지 영역은 모두 비동기적으로 동작합니다.

  1. 메인 스레드에서 ExecutorService.submit()를 호출합니다.
  2. ExecutorService 내부적으로 execute()를 호출합니다.
    2-1. 이 때 전달되는 Runnable은 RunnableFuture 입니다.
  3. execute() 호출 즉시 메인 스레드에 Future 반환합니다.
  4. ExecutorService 비동기적으로 Worker.run()를 호출합니다.
  5. Worker는 getTask()를 통해 수행할 작업을 조회합니다.
    5-1. 이 때 조회하는 Task은 RunnableFuture 입니다.
  6. Worker가 RunnableFuture.run() 호출해 작업을 수행합니다.

  1. Future는 내부적으로 run()을 수행하고 있습니다.
  2. 메인 스레드에서 반환받은 Future.get()을 호출합니다.
  3. Future는 get()을 호출한 스레드에 락을 획득해 메인 스레드를 블로킹시킵니다.
  4. Future.run() 작업이 끝나면 set(V)가 호출됩니다.
  5. set(V)의 수행이 끝났다면, 락을 해제합니다.
  6. 락이 해제된 메인 스레드가 Future.get()의 결과를 반환받습니다.

FutureTask

state

FutureTask는 state라는 값을 통해 상태를 파악하고 그 상태에 맞게 로직을 수행합니다.

스레드 풀에서 동작하는 Task이기 때문에 가시성이 필요해 volatile 키워드를 사용한 것을 확인할 수 있습니다.

각 상태의 의미는 다음과 같습니다.

  • NEW : 작업 시작
  • COMPLETING : 작업 진행 중
  • NORMAL : 작업 완료
  • EXCEPTIONAL : 예외 발생
  • CANCELLED : 작업 취소
  • INTERRUPTING : 인터럽트 중
  • INTERRUPTED : 인터럽트 완료

다음과 같이 값이 변경될 수 있습니다.

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

필드

상태 외에도 FutureTask는 다음과 같은 필드를 가지고 있습니다.

  • callable : FutureTask가 수행할 Task입니다.
  • outcome : 결과가 저장되는 변수입니다.
  • runner : Task(callable)을 수행하는 스레드입니다. 이는 FutureTask.run()을 수행하는 스레드이라는 것과 동일한 의미입니다.
  • waiters : future.get()을 호출해 블로킹 된 상태의 스레드입니다.

STATE, RUNNER, WAITERS

FutureTask는 비동기적으로 실행되므로, 중요한 정보는 동기화 기법을 통해 관리합니다.
이 때 사용되는 클래스는 VarHandle로, Unsafe보다 안정적으로 CAS 연산을 수행할 수 있습니다.

각각의 필드는 다음과 같은 의미를 가집니다.

  • STATE : 현재 FutureTask의 상태 값을 관리합니다.
  • RUNNER : 현재 해당 FutureTask를 실행하고 있는 스레드를 관리합니다.
  • WAITERS : 현재 FutureTask.get()을 호출해 블로킹 된 스레드를 관리합니다.

FutureTask 코드 분석

public class FutureExample {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<Integer> task = () -> {
            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        };

        Future<Integer> result = executorService.submit(task);

        try {
            int integer = result.get();

            System.out.println("integer = " + integer);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

다음과 같은 기본 예제를 기준으로 코드 분석과 그 결과를 출력할 예정입니다.

run()

스레드 풀에 의해 호출되는 메서드로, 실제 주어진 Task를 수행합니다.

실행 가능 여부 확인

FutureTask의 상태가 NEW라는 것은 종료/실행 취소/예외/인터럽트 등의 상황이 아닌 아무 동작도 수행하지 않았다는 의미이기 때문에 동작합니다.

NEW가 아닐 시 RUNNER VarHandle을 활용해 runner 필드를 세팅합니다.

정상적으로 동작했다면 FutureTask의 상태가 NEW가 아니지만 컨텍스트 스위칭이 발생했을 때 runner을 null로 항상 초기화하므로 runner 필드는 null입니다.
이를 활용해 FutureTask.runner가 null일 때 현재 동작중인 스레드로 업데이트하고, 성공 여부를 반환합니다.
실패했다는 것은 다른 스레드가 실행 중인 FutureTask거나 예외적인 상황이기 때문에 바로 early return을 수행합니다.

Task 실행

이후 Task, Callable이 존재하면서 상태가 NEW일 때 Task를 수행합니다.
컨텍스트 스위칭이 발생하더라도 실행중인 코드에서 멈출 것이기 때문에, 정상적으로 동작했다면 상태는 항상 NEW일 수 밖에 없습니다.

이후 Callable을 수행하며, 다음처럼 동작합니다.

  • 정상적으로 동작했다면 result에 결과를 저장하고 ran을 true로 세팅
  • 예외가 발생했다면 result을 null로 초기화하고 ran을 false로 세팅, 예외 세팅

정상 동작 시 - 결과 처리

정상 동작했다면 ran이 true가 되어 set(result)가 동작합니다.

  1. CAS로 FutureTask.state가 NEW라면 COMPLETING으로 변환합니다.
  2. outcome을 result, v로 세팅합니다.
    2-1. outcome을 별도로 volatile로 관리하지 않는 이유는 이미 CAS 연산을 통해 state로 관리하기 때문입니다.
  3. 현재 state를 NORMAL로 변경합니다.
  4. finishCompletion()을 호출합니다.

finishCompletion()은 waiters로 등록한, future.get()으로 인해 블로킹된 스레드를 삭제합니다.

CAS 연산으로 FutureTask.waiters에 등록한 값과 WaitNode q가 일치하면 null로 세팅합니다.

내부 for문으로 무한 반복하며 모든 WaitNode를 처리합니다.

WaitNode에 세팅한 스레드 변수를 null로 처리하고, 락을 해제하고, q.next를 null로 설정해 GC를 유도합니다.

이렇게 반복하면 마지막 노드일 때 next = null이 되므로 break 됩니다.

이후 done()이 호출됩니다.

기본적으로 아무것도 하지 않지만, 오버라이딩을 통해 후처리가 가능합니다.

마지막으로 Task, callable을 null 처리합니다.

get(), get(time, unit)

get() 메서드에서 핵심적인 규약은 get()을 호출했을 때 반드시 결과 값을 조회할 수 있어야 한다는 점입니다.

이 점을 명시하고 코드를 살펴보겠습니다.

두 메서드 모두 내부적으로 awaitDone()을 호출하고 있습니다.
차이점으로는, 시간 관련된 파라미터를 추가한 get()의 경우 예외 처리가 추가되어 있다는 점 뿐입니다.

awitDone()은 이와 같이 for문을 활용한 무한 루프로 동작합니다.
반환 값은 int로, FutureTask의 상태 값을 반환합니다.

WaitNode의 경우 WAITERS에 저장되는 것으로, 현재 future.get()을 호출한 스레드를 의미합니다.

WaitNode 생성 및 예외 상황 처리

가장 먼저 수행되는 코드입니다.

첫 번째 조건문은 get() 호출 시 시간 제한을 걸었지만 시간을 0ns 이하로 지정한 경우입니다.
get() 호출 시 0ns를 기다린다는 것은 기다리지 않겠다는 것과 동일하므로 바로 상태를 반환합니다.

생성자 호출 시 Thread.currentThread()로 현재 실행중인 스레드를 지정하고 있음을 확인할 수 있습니다.
가시성을 고려해 volatile 키워드가 추가된 것을 확인할 수 있습니다.

또한 WaitNode의 구성은 Linked List와 유사한 형식임을 확인할 수 있습니다.

FutureTask.waiters 초기화

해당 코드는 현재 스레드를 WAITERS에 등록하기 위한 코드입니다.

이제 WaitNode를 만들었기 때문에, FutureTask의 필드 waiters는 아직 초기화되지 않은 상태입니다.

이 때 WAITERS VarHandle을 활용해, 지금 실행중인 FutureTask에 대해 q.next = waiters인 경우 q를 초기화합니다.

방금 WaitNode q를 초기화했으므로 q.next는 null이며, waiters도 null이므로 FutureTask.waiters에 WaitNode q가 초기화됩니다.

락 획득

LockSupport를 활용해 락을 획득합니다.

위의 else if(timed)의 경우 get() 호출 시 시간 관련 파라미터를 전달한 경우입니다.

예제에서는 시간 관련 파라미터를 지정하지 않았으므로, LockSupport.park(this)가 호출됩니다.

COMPLETING 상태 시

COMPLETING 상태라면 Thread.yield()를 호출합니다.
그 이유는 주석에서 볼 수 있는 것처럼, get()의 규약을 지키기 위함입니다.

run()에서 확인했던 set() 메서드입니다.
상태를 NEW -> COMPLETING으로 변경한 직후 get()을 수행하는 스레드가 CPU를 할당받아 COMPLETING일 때 for문이 동작할 수 있습니다.

이 때 outcome이 null임에도 불구하고 outcome을 반환할 수 있기 때문에 이를 방지하고자 다른 스레드에게 CPU 할당을 양보하는 것입니다.

run()이 정상적으로 수행된 경우

상태가 COMPLETING보다 큰 경우에 동작합니다.

COMPLETING보다 큰 경우는 NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTING, INTERRUPTED 입니다.

그러므로 run()이 정상적으로 수행된 NORMAL도 이 때 동작합니다.

run()이 정상적으로 수행되었다면 finishCompletion()이 수행되었고, 이로 인해 future.get()을 호출한 스레드의 락을 해제해 정상적으로 수행할 수 있게 됩니다.

report()

값이 NORMAL, 정상적으로 수행된 경우 그 값을 명시적으로 캐스팅해서 반환합니다.
명시적으로 캐스팅하다보니 @SuppressWarnings로 unchecked를 무시하고 있음을 확인할 수 있습니다.

cancel(mayInterruptIfRunning)

cancel()은 수행하고 있는 Task를 취소하는 메서드입니다.
파라미터는 취소할 때 인터럽트를 발생시킬지 유무입니다.

취소하기 위해 상태를 세팅합니다.
취소하기 위해서는 상태가 NEW일 때만 가능하며, 인터럽트 발생 여부에 따라 INTERRUPTING, CANCELLED로 세팅합니다.

try 블록의 경우 인터럽트를 통해 취소할 경우에만 동작합니다.
단순하게 t.interrupt()를 호출하고 finally에서 무조건 INTERRUPTED로 상태를 변경하는걸 확인할 수 있습니다.

이를 통해 인터럽트를 호출했지만 Task에 따라 실제로는 인터럽트가 발생하지 않을 수 있다는 것을 확인할 수 있습니다.

이후 finishCompletion()을 수행합니다.
이 finishCompletion()은 get()에서 확인한 것처럼 waiters가 GC 처리될 수 있도록 후처리하고, 락을 해제하는 과정을 수행합니다.

cancel() 시 get() 동작 방식

메인 스레드에서 FutureTask.cancel()을 호출한다고 하더라도 Task는 그대로 수행됩니다.

cancel()을 수행하는 스레드(= 메인 스레드)와 FutureTask.callable을 수행하는 스레드(= 스레드 풀의 스레드, runner)가 다르기 때문입니다.

또한 인터럽트도 발생하지 않을 수 있는데, 인터럽트를 발생시키려면 Thread.sleep() 등 InterruptedException이 발생할 수 있어야 하기 때문입니다.

이를 처리하기 위해 report()에서 CANCELLED보다 크거나 같은 INTERRUPTING, INTERRUPTED 모두 위와 같은 예외가 발생하게 됩니다.

run() 도중 인터럽트 발생 시

run()에서 Task를 수행하다가 InterruptedException이 발생하면 catch 블록에서 setException()을 호출하게 됩니다.

상태를 CAS 연산을 통해 COMPETING으로 바꾸고, 결과에는 예외를 담은 뒤 상태를 EXCEPTIONAL로 변경합니다.

run()에서 Task를 수행하다가 InterruptedException이 발생하면 finally 부분에서 handlePossibleCancellationInterrupt()를 호출합니다.

해당 메서드는 인터럽트를 활용해 FutureTask의 동작을 중지시킬 때, 인터럽트로 인해 중지하는 작업이 run() 혹은 runAndReset()일 때만 동작하도록 제한하는 용도입니다.

isCancelled(), isDone()

isCancelled()는 CANCELLED보다 크거나 같은 경우 true를 반환합니다.
즉, 취소한 상태와 인터럽트 관련 상태인 경우 모두 true를 반환합니다.
인터럽트도 결국 취소하는 방식 중에 하나이므로 자연스럽다고 볼 수 있습니다.

isDone()은 상태가 NEW가 아니라면 true를 반환합니다.

결론

  • Future는 비동기 작업의 결과와 관련된 API를 제공하는 인터페이스이며, 주로 멀티 스레드 환경에서 작업을 처리할 때 사용합니다.
  • Future.get() 호출 시 해당 메서드를 호출된 스레드는 블로킹됩니다.
  • Future의 작업을 취소(중단)할 수 있습니다. 단, 비동기로 동작중이기 때문에 실제로 작업이 중단되지 않을 수도 있습니다.
  • isDone()은 상태가 NEW가 아니라면 모두 true를 반환하기 때문에, 좀 더 자세한 정보가 필요할 경우 isCancelled()를 사용해야 합니다.
profile
안녕하세요

0개의 댓글