Runnable
의 한계throw
불가Thread
를 직접 다루게 되면..아래 문제들을 겪게 된다.
Runnable
인터페이스의 한계이러한 문제들을 해결하기 위해 생성한 Thread를 재사용하는 자바에서는 java.util.concurrent
패키지를 통해 Executor
프레임워크를 제공한다.
이를 이용하면 매우 편리하게 Thread를 사용할 수 있게 된다.
Executor
프레임워크멀티 스레딩 및 병렬 처리를 쉽게 사용할 수 있도록 돕는 기능의 모음
Executor
Runnable
작업을 제출하고 반환 값을 받지 않는 execute(Runnable command)
가 정의되어 있다.
public interface Executor {
void execute(Runnable command);
}
ExecutorService
Executor
인터페이스를 확장하여 작업 진행과 제어 기능을 추가로 제공한다.
public interface ExecutorService extends Executor, AutoCloseable {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
@Override
default void close() {
boolean terminated = isTerminated();
if (!terminated) {
shutdown();
boolean interrupted = false;
while (!terminated) {
try {
terminated = awaitTermination(1L, TimeUnit.DAYS);
} catch (InterruptedException e) {
if (!interrupted) {
shutdownNow();
interrupted = true;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
<T> Future<T> submit(Callable<T> task)
Callable
작업을 제출하고 결과를 반환 받음Future<?> submit(Runnable task);
Runnable
작업을 제출하고 결과를 반환 받음Runnable
은 반환 값이 없어 리턴 받은 Future
를 통해 get()
수행 시 null
반환<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
Callable
작업을 제출하고, 모든 작업이 완료될 때까지 대기long timeout, TimeUnit unit
을 추가로 전달 받는 메서드도 오버로딩 되어 있음<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
Callable
작업이 완료될 때까지 기다리고 가장 먼저 완료된 작업의 결과를 반환, 완료되지 않은 나머지 작업은 모두 취소long timeout, TimeUnit unit
을 추가로 전달 받는 메서드도 오버로딩 되어 있음ThreadPoolExecutor
ExecutorService
인터페이스의 대표적인 구현체다.
살펴보면 크게 Thread Pool
과 BlockingQueue
가 존재하며,
내부 클래스인 Worker
를 정의 및 Runnable
를 구현하여 멤버 변수로 가지고 있는 Thread의 작업을 수행하는 Worker로써 동작하도록 한다.
Thread Pool
: Thread
관리 (=workers
)BlockingQueue
: Thread
가 처리할 작업 보관 (=workQueue
)생성자의 주요 매개변수로는 아래와 같다.
corePoolSize
: Thread Pool에서 관리되는 기본 스레드의 수maximumPoolSize
: Thread Pool에서 관리되는 최대 스레드의 수keepAliveTime
, TimeUnit unit
: 기본 스레드를 초과하여 만들어진 Thread가 생존 가능한 대기 시간이며, 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거BlockingQueue
: 작업을 보관할 블로킹 큐public class ThreadPoolExecutor extends AbstractExecutorService {
.. 중략 ..
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
private final Condition termination = mainLock.newCondition();
private final SharedThreadContainer container;
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
String name = Objects.toIdentityString(this);
this.container = SharedThreadContainer.create(name);
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
.. 중략 ..
}
Thread Pool에 미리 지정한 수만큼 Thread를 생성하지 않으며,
execute(Runnable command)
를 통해 작업 시작 시 Worker의 수가 corePoolSize
보다 적은 경우 Worker가 생성되도록 한다.
corePoolSize
보다 많은 Worker가 수행되고 있지만, maximumPoolSize
보다 적은 수의 Worker가 수행되고 있는 경우 아래와 같이 동작한다.
maxPoolSize
까지 Worker 생성하여 수행Executors
ThreadPoolExecutor
객체 생성을 static
메서드로 아래와 같이 편리하게 사용할 수 있게 한다.
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
... 중략 ...
Callable
Generic을 통해 원하는 타입의 값을 반환할 수 있으며, call()
메서드 선언에 예외가 선언되어 있어 Runnable
의 한계를 극복할 수 있게 한다.
public interface Callable<V> {
V call() throws Exception;
}
Future
resultNow()
, exceptionNow()
, state()
의 경우 JDK 19 이상부터 사용 가능하다.
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
/**
* @since 19
*/
default V resultNow() {
if (!isDone())
throw new IllegalStateException("Task has not completed");
boolean interrupted = false;
try {
while (true) {
try {
return get();
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
throw new IllegalStateException("Task completed with exception");
} catch (CancellationException e) {
throw new IllegalStateException("Task was cancelled");
}
}
} finally {
if (interrupted) Thread.currentThread().interrupt();
}
}
/**
* @since 19
*/
default Throwable exceptionNow() {
if (!isDone())
throw new IllegalStateException("Task has not completed");
if (isCancelled())
throw new IllegalStateException("Task was cancelled");
boolean interrupted = false;
try {
while (true) {
try {
get();
throw new IllegalStateException("Task completed with a result");
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
return e.getCause();
}
}
} finally {
if (interrupted) Thread.currentThread().interrupt();
}
}
/**
* @since 19
*/
enum State {
/**
* The task has not completed.
*/
RUNNING,
/**
* The task completed with a result.
* @see Future#resultNow()
*/
SUCCESS,
/**
* The task completed with an exception.
* @see Future#exceptionNow()
*/
FAILED,
/**
* The task was cancelled.
* @see #cancel(boolean)
*/
CANCELLED
}
/**
* @since 19
*/
default State state() {
if (!isDone())
return State.RUNNING;
if (isCancelled())
return State.CANCELLED;
boolean interrupted = false;
try {
while (true) {
try {
get(); // may throw InterruptedException when done
return State.SUCCESS;
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
return State.FAILED;
}
}
} finally {
if (interrupted) Thread.currentThread().interrupt();
}
}
}
boolean cancel(boolean mayInterruptIfRunning);
mayInterruptIfRunning
의 값에 따라 동작 상이true
: 취소 상태로 변경 후 작업이 실행중인 경우 인터럽트 발생시켜 중단false
: 취소 상태로 변경, 실행중인 작업 미중단Future
의 get()
을 호출하면 CancellationException
발생boolean isCancelled();
boolean isDone();
true
)V get() throws InterruptedException, ExecutionException;
아래와 같이 Callable
의 작업을 수행 요청하고, 값을 가져오는 데에 사용할 수 있다.
interface ArchiveSearcher {
String search(String target);
}
class App {
ExecutorService executor = ...;
ArchiveSearcher searcher = ...;
void showSearch(String target) throws InterruptedException {
Callable<String> task = () -> searcher.search(target);
Future<String> future = executor.submit(task);
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) { cleanup(); return; }
}
}
FutureTask
Future
의 대표적인 구현체로 RunnableFuture
를 구현하며, 내부적으로 수행해야 할 작업과 상태 및 반환 값을 가지고 있다.
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
... 중략 ...
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
... 중략 ...
}
Future
의 필요성왜 ExecutorService
에서 바로 반환 값을 받지 않고 Future
의 get()
을 통하여 반환 값을 받을까?
ExecutorService
에서 작업 요청 후 결과 반환 시점까지 대기한다면 요청 Thread에서 Blocking이 발생하여 성능 저하 발생 가능하다.
Future
는 요청 스레드를 Blocking 상태로 만들지 않도록 하고, get()
호출 전까지 필요한 작업을 진행할 수 있도록 한다.
2편에 계속...