[Java] ThreadPool과 Executor 프레임워크(1)

MEUN·2024년 12월 15일
0

Runnable의 한계

  • 반환 값이 없음
  • 체크 예외 throw 불가


Thread를 직접 다루게 되면..

아래 문제들을 겪게 된다.

  1. Thread 생성 시간으로 인한 성능 문제
  2. Thread 관리 필요
  3. Runnable 인터페이스의 한계

이러한 문제들을 해결하기 위해 생성한 Thread를 재사용하는 자바에서는 java.util.concurrent 패키지를 통해 Executor 프레임워크를 제공한다.

이를 이용하면 매우 편리하게 Thread를 사용할 수 있게 된다.



Executor 프레임워크

개요

멀티 스레딩 및 병렬 처리를 쉽게 사용할 수 있도록 돕는 기능의 모음

주요 구성 요소

Executor

Runnable 작업을 제출하고 반환 값을 받지 않는 execute(Runnable command)가 정의되어 있다.

public interface Executor {

    void execute(Runnable command);
}

ExecutorService

Executor 인터페이스를 확장하여 작업 진행과 제어 기능을 추가로 제공한다.

public interface ExecutorService extends Executor, AutoCloseable {

    void shutdown();
	
    List<Runnable> shutdownNow();
	
    boolean isShutdown();
	
    boolean isTerminated();
	
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);
	
    Future<?> submit(Runnable task);
	
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
		
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
		
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    @Override
    default void close() {
        boolean terminated = isTerminated();
        if (!terminated) {
            shutdown();
            boolean interrupted = false;
            while (!terminated) {
                try {
                    terminated = awaitTermination(1L, TimeUnit.DAYS);
                } catch (InterruptedException e) {
                    if (!interrupted) {
                        shutdownNow();
                        interrupted = true;
                    }
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

주요 메서드

<T> Future<T> submit(Callable<T> task)
  • Callable 작업을 제출하고 결과를 반환 받음

Future<?> submit(Runnable task);
  • Runnable 작업을 제출하고 결과를 반환 받음
  • Runnable은 반환 값이 없어 리턴 받은 Future를 통해 get() 수행 시 null 반환

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
  • 모든 Callable 작업을 제출하고, 모든 작업이 완료될 때까지 대기
  • 대기 시간을 지정하기 위해 long timeout, TimeUnit unit을 추가로 전달 받는 메서드도 오버로딩 되어 있음

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
  • 하나의 Callable 작업이 완료될 때까지 기다리고 가장 먼저 완료된 작업의 결과를 반환, 완료되지 않은 나머지 작업은 모두 취소
  • 대기 시간을 지정하기 위해 long timeout, TimeUnit unit을 추가로 전달 받는 메서드도 오버로딩 되어 있음

ThreadPoolExecutor

ExecutorService 인터페이스의 대표적인 구현체다.
살펴보면 크게 Thread PoolBlockingQueue가 존재하며,
내부 클래스인 Worker를 정의 및 Runnable를 구현하여 멤버 변수로 가지고 있는 Thread의 작업을 수행하는 Worker로써 동작하도록 한다.

  • Thread Pool : Thread 관리 (=workers)
  • BlockingQueue : Thread가 처리할 작업 보관 (=workQueue)

생성자의 주요 매개변수로는 아래와 같다.

  • corePoolSize : Thread Pool에서 관리되는 기본 스레드의 수
  • maximumPoolSize : Thread Pool에서 관리되는 최대 스레드의 수
  • keepAliveTime, TimeUnit unit : 기본 스레드를 초과하여 만들어진 Thread가 생존 가능한 대기 시간이며, 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거
  • BlockingQueue : 작업을 보관할 블로킹 큐
public class ThreadPoolExecutor extends AbstractExecutorService {
    
	.. 중략 ..
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<>();
    private final Condition termination = mainLock.newCondition();
    private final SharedThreadContainer container;
	
    private int largestPoolSize;
    private long completedTaskCount;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
	
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable 

        @SuppressWarnings("serial") // Unlikely to be serializable
        final Thread thread;
		
        @SuppressWarnings("serial") // Not statically typed as Serializable
        Runnable firstTask;
		
        volatile long completedTasks;
		
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
		
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
	
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;

        String name = Objects.toIdentityString(this);
        this.container = SharedThreadContainer.create(name);
    }
    
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
			
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
	.. 중략 ..
}

Thread Pool에 미리 지정한 수만큼 Thread를 생성하지 않으며,
execute(Runnable command)를 통해 작업 시작 시 Worker의 수가 corePoolSize보다 적은 경우 Worker가 생성되도록 한다.

corePoolSize보다 많은 Worker가 수행되고 있지만, maximumPoolSize보다 적은 수의 Worker가 수행되고 있는 경우 아래와 같이 동작한다.

  • Queue가 가득 차지 않은 경우: 즉시 실행하지 않고 Queue에 작업 추가
  • Queue가 가득 찬 경우: maxPoolSize까지 Worker 생성하여 수행

Executors

ThreadPoolExecutor 객체 생성을 static 메서드로 아래와 같이 편리하게 사용할 수 있게 한다.

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    ... 중략 ...

Callable

Generic을 통해 원하는 타입의 값을 반환할 수 있으며, call() 메서드 선언에 예외가 선언되어 있어 Runnable의 한계를 극복할 수 있게 한다.

public interface Callable<V> {

    V call() throws Exception;
}

Future

resultNow(), exceptionNow(), state()의 경우 JDK 19 이상부터 사용 가능하다.

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

    /**
     * @since 19
     */
    default V resultNow() {
        if (!isDone())
            throw new IllegalStateException("Task has not completed");
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    return get();
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    throw new IllegalStateException("Task completed with exception");
                } catch (CancellationException e) {
                    throw new IllegalStateException("Task was cancelled");
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }

    /**
     * @since 19
     */
    default Throwable exceptionNow() {
        if (!isDone())
            throw new IllegalStateException("Task has not completed");
        if (isCancelled())
            throw new IllegalStateException("Task was cancelled");
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    get();
                    throw new IllegalStateException("Task completed with a result");
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    return e.getCause();
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }

    /**
     * @since 19
     */
    enum State {
        /**
         * The task has not completed.
         */
        RUNNING,
        /**
         * The task completed with a result.
         * @see Future#resultNow()
         */
        SUCCESS,
        /**
         * The task completed with an exception.
         * @see Future#exceptionNow()
         */
        FAILED,
        /**
         * The task was cancelled.
         * @see #cancel(boolean)
         */
        CANCELLED
    }

    /**
     * @since 19
     */
    default State state() {
        if (!isDone())
            return State.RUNNING;
        if (isCancelled())
            return State.CANCELLED;
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    get();  // may throw InterruptedException when done
                    return State.SUCCESS;
                } catch (InterruptedException e) {
                    interrupted = true;
                } catch (ExecutionException e) {
                    return State.FAILED;
                }
            }
        } finally {
            if (interrupted) Thread.currentThread().interrupt();
        }
    }
}

주요 메서드

boolean cancel(boolean mayInterruptIfRunning);
  • 아직 완료되지 않은 작업을 취소
  • mayInterruptIfRunning의 값에 따라 동작 상이
    • true : 취소 상태로 변경 후 작업이 실행중인 경우 인터럽트 발생시켜 중단
    • false : 취소 상태로 변경, 실행중인 작업 미중단
  • 취소 상태의 Futureget()을 호출하면 CancellationException 발생

boolean isCancelled();
  • 취소 여부 리턴

boolean isDone();
  • 작업 완료 여부 리턴 (완료/취소/예외 발생하여 중단된 경우 true)

V get() throws InterruptedException, ExecutionException;
  • 작업이 완료될 때까지 대기 후 결과 값 반환
  • 대기하는 과정에서 메서드 호출 Thread의 Blocking 발생

사용 예제

아래와 같이 Callable의 작업을 수행 요청하고, 값을 가져오는 데에 사용할 수 있다.

 interface ArchiveSearcher { 
 	String search(String target); 
 }
 
 class App {
 
   ExecutorService executor = ...;
   ArchiveSearcher searcher = ...;
   
   void showSearch(String target) throws InterruptedException {
     Callable<String> task = () -> searcher.search(target);
     Future<String> future = executor.submit(task);
     displayOtherThings(); // do other things while searching
     try {
       displayText(future.get()); // use future
     } catch (ExecutionException ex) { cleanup(); return; }
   }
 }

FutureTask

Future의 대표적인 구현체로 RunnableFuture를 구현하며, 내부적으로 수행해야 할 작업과 상태 및 반환 값을 가지고 있다.

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome;
    private volatile Thread runner;
    private volatile WaitNode waiters;


	... 중략 ...
	
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }
	... 중략 ...
}


Future의 필요성

ExecutorService에서 바로 반환 값을 받지 않고 Futureget()을 통하여 반환 값을 받을까?

ExecutorService에서 작업 요청 후 결과 반환 시점까지 대기한다면 요청 Thread에서 Blocking이 발생하여 성능 저하 발생 가능하다.
Future는 요청 스레드를 Blocking 상태로 만들지 않도록 하고, get() 호출 전까지 필요한 작업을 진행할 수 있도록 한다.



참고 자료



2편에 계속...

0개의 댓글