스레드를 사용할 때 생기는 문제
Runnable
인터페이스의 불편함1,2번 문제를 해결하기 위해 스레드 풀이라는 기술이 도입됨
Executor 인터페이스
package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
주요 메서드
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task);
@Override
default void close(){...}
... }
Runnable 인터페이스 구현
public class RunnableTask implements Runnable {
private final String name;
private int sleepMs = 1000;
public RunnableTask(String name) {
this.name = name;
}
public RunnableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public void run() {
log(name + " 시작"); sleep(sleepMs);
// 작업 시간 시뮬레이션
log(name + " 완료");
}
}
ExecutorService 구현
public class ExecutorBasicMain {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(2,2,0,TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
log("== 초기 상태 ==");
printState(es);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("taskD"));
log("== 작업 수행 중 =="); printState(es);
sleep(3000);
log("== 작업 수행 완료 =="); printState(es);
es.close();
log("== shutdown 완료 =="); printState(es);
}
}
ExectorService
의 가장 대표적인 구현체는 ThredPoolExecutor
이다ThreadPoolExecutor(ExecutorService)
는 크게 2가지 요소가 있다BlockingQueue
:작업을 보관한다ThreadPool
: 스레드를 보관한다ThreadPoll Executor 생성자
corePoolSize
: 스레드 풀에서 관리하는 기본적인 스레드의 수maximunPoolSoze
: 스레드 풀에서 관리하는 최대 스레드의 수keepAlivetime, TimeUnit unit
:기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기시간 이시간동안 처리할 작업이 없으면 초과 스레드는 제거된다BlockingQueue workQueue
: 작업을 보관할 블로킹 큐Runnable의 불편함
run()
메서드는 반환값을 가지지 않는다.따라서 실행결과를 얻기 위해서는 별도의 메카니즘을 실행해야한다. run()
메서드는 체크예외를 던질 수 없다. 체크 예외의 처리는 메서드 내부에서 처리해야한다.ExecutorService
는 이런 문제를 깔끔하게 해결해준다Callable & Future
public class CallableMainV1 {
public static void main(String[] args) throws ExecutionException,InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt(10);
log("create value = " + value);
log("Callable 완료");
return value;
}
}
newFixedThredPool(size)
을 사용하면 편리하게 ExectorService
를 구현할 수 있다MyCallable
은 숫자를 반환하므로 반환할 제네릭 타입을 <Integer>
로 선언했다Runnable
과 비슷하지만 반환값을 가진다는 점이 차이점이다 따라거 결과를 보관할 별도의 필드를 만들지 않아도된다Future<Integer> future = es.submit(new MyCallable());
submit()
메서드를 통해 Callable
을 작업으로 전달 할 수 있다MyCallable
의 인스턴스가 블로킹 큐에 전달되고, 스레드 풀의 스레드 중 하나가 이 작업을 실행할 것이다. 이때 작업의 처리 결과는 직접 반환되는 것이 아닌 Future
라는 특별한 인터페이스로 전달된다.Integer result = future.get();
future.get()
을 호출하면 MyCallable
의 call()
이 반환한 결과를 받을 수 있다.Future<Integer> future = es.submit(new MyCallable());
submit()
의 호출로 MyCallable
의 인스턴스를 전달한다submit()
은 MyCallable.call()
이 반환하는 무작위 숫자 대신에 Future
을 반환MyCallable
이 즉시 실행되어 결과를 반환하는 것은 불가능하다. 그 이유는 MyCallable
은 즉시 실행되는 것이 아닌 스레드풀의 스레드가 미래의 어떤 시점에 이 코드를 대신 실행해야하기 때문이다.es.submit()
은 MyCallable
의 결과를 반환하는 대 신에 MyCallable
의 결과를 나중에 받을 수 있는 Futurue
라는 객체를 대신 제공한다.submit()
을 호출받은 경우 Future
가 만들어지고 전달한 작업인 taskA
가 바로 블로킹 쿠에 담기는 것이 아니라 그림처럼 taskA
를 감싼 Future
가 대신 블로킹 큐에 담긴다.Furute
를 꺼내서 스레드 풀의 스레드1이 작업을 시작한다FutureTask
의 run()
메서드를 수행한다run()
메서드가 taskA
의 call()
메서드를 호출하고 그 결과를 받아서 처리한다future.get()
을 호출한다.taskA
의 작업이 완료되면 Future
의 상태도 완료로 변경future.get()
을 호출하면 Future
가 완료 상태가 될 때 까지 대기한다. 이때 요청 스레드의 상태는 RUNNABLE
->WAITING
이 된다.taskA
가 아직 수행되지 않았거나 또는 수행 중이라는 뜻이다. 이때는 어쩔 수 없이 요청 스레드가 결과를 받기 위해 대기해야 한다. 요청 스레드가 마치 락을 얻을 때처럼, 결과를 얻기 위해 대기한 다. 이처럼 스레드가 어떤 결과를 얻기 위해 대기하는 것을 블로킹(Blocking)이라 한다.Future 잘못 사용
Future<Integer> future1 = es.submit(task1); //non-blocking
Integer sum1 = future1.get(); // blocking, 2초 대기
Future<Integer> future2 = es.submit(task2); // non-blocking
Integer sum2 = future2.get(); // blocking, 2초 대기
Integer sum1 = es.submit(task1).get(); // get()에서 블로킹
Integer sum2 = es.submit(task2).get(); // get()에서 블로킹
boolean cancel(boolean mayInterruptIfRunning)
mayInterruptIfRunning
cancel(true)
: Future
를 취소 상태로 변경한다. 이때 작업이 실행중이라면 Thread.interrupt()
를 호출해서 작업을 중단한다.cancel(false)
: Future
를 취소 상태로 변경한다. 단 작업중인 작업을 중단하지는 않는다true
, 이미 완료되었거나 취소할 수 없는 경우 false
boolean isCancelled()
true
, 그렇지 않은 경우 false
cancel()
메서드에 의해 취소된 경우에 true
를 반환한다.boolean isDone()
true
, 그렇지 않은 경우 false
true
를 반환한다.Future
의 상태를 반환한다. 자바 19부터 지원한다.RUNNING
: 작업 실행 중 SUCCESS
: 성공 완료 FAILED
: 실패 완료 CANCELLED
: 취소 완료V get()
InterruptedException
: 대기 중에 현재 스레드가 인터럽트된 경우 발생ExecutionException
: 작업 계산 중에 예외가 발생한 경우 발생get()
을 호출한 현재 스레드를 대기(블록킹)한다. 작업이 완료되면 결과를 반환한 다.public class FutureCancelMain {
private static boolean mayInterruptIfRunning = true; // 변경
//private static boolean mayInterruptIfRunning = false; // 변경
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<String> future = es.submit(new MyTask());
log("Future.state: " + future.state());
// 일정 시간 후 취소 시도
sleep(3000);
// cancel() 호출
log("future.cancel(" + mayInterruptIfRunning + ") 호출");
boolean cancelResult1 = future.cancel(mayInterruptIfRunning);
log("Future.state: " + future.state());
log("cancel(" + mayInterruptIfRunning + ") result: " + cancelResult1);
// 결과 확인
try {
log("Future result: " + future.get());
} catch (CancellationException e) { // 런타임 예외
log("Future는 이미 취소 되었습니다.");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// Executor 종료
es.close();
}
static class MyTask implements Callable<String> {
@Override
public String call() {
try {
for (int i = 0; i < 10; i++) {
log("작업 중: " + i);
Thread.sleep(1000); // 1초 동안 sleep }
} catch (InterruptedException e) { log("인터럽트 발생");
return "Interrupted";
}
return "Completed";
}
}
}
cancel(true)
: Future
를 취소 상태로 변경한다. 이때 작업이 실행중이라면 Thread.interrupt()
를cancel(false)
: Future
를 취소 상태로 변경한다. 단 이미 실행 중인 작업을 중단하지는 않는다.ExecutorService
는 여러 작업을 한 번에 편리하게 처리하는 invokeAll()
, invokeAny()
기능을 제공한다.
public class InvokeAllMain {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task2", 2000);
CallableTask task3 = new CallableTask("task3", 3000);
List<CallableTask> tasks = List.of(task1, task2, task3);
List<Future<Integer>> futures = es.invokeAll(tasks);
for (Future<Integer> future : futures) {
Integer value = future.get();
log("value = " + value);
}
es.close();
}
}
invokeAny()
는 한 번에 여러 작업을 제출하고, 가장 먼저 완료된 작업의 결과를 반환한다. 이때 완료되지 않은 나머 지 작업은 인터럽트를 통해 취소한다.
public class InvokeAnyMain {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task2", 2000);
CallableTask task3 = new CallableTask("task3", 3000);
List<CallableTask> tasks = List.of(task1, task2, task3);
Integer value = es.invokeAny(tasks);
log("value = " + value);
es.close();
}
}
서비스 종료
void shutdown()
List<Runnable> shutdownNow()
서비스 상태 확인
boolean isShutdown()
boolean isTerminated()
shutdown(),shutdownNow()
호출후 모든 작업이 완료되었는지 확인작업 완료 대기
boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException
shutdown() - 처리중인 작업이 없는 경우
ExecutorService
에 아무런 작업이 없고 스레드만 대기중인 상태shutdown()
을 호출 시 ExecutorService
는 새로운 요청을 거절한다java.util.concurrent.RejectedExecutionException
예외가 발생한다shutdown() - 처리중인 작업이 있는 경우
shutdown()
을 호출 시 ExecutorService
는 새로운 요청을 거절한다shutdownNow() - 처리중인 작업이 있는 경우
shutdown()
을 호출 시 ExecutorService
는 새로운 요청을 거절한다List<Runnable> runnables = es.shutdownNow()
ExecutorService
의 기본 구현체인 ThreadPoolExecutor
생성자의 속성
corePoolSize
: 스레드 풀에서 관리되는 기본 스레드의 수maximumPoolSize
: 스레드 풀에서 관리되는 최대 스레드 수keepAliveTime
, TimeUnit unit
: 기본 스레드 수를 초과해서 만들어진 초과 스레드가 생존할 수 있는 대기 시간, 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다. BlockingQueue workQueue
: 작업을 보관할 블로킹 큐그림으로 스레드 풀관리 알아보자
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(2, 4, 3000,
TimeUnit.MILLISECONDS, workQueue);
public class RunnableTask implements Runnable {
private final String name;
private int sleepMs = 1000;
public RunnableTask(String name) {
this.name = name;
}
public RunnableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public void run() {
log(name + " 시작");
sleep(sleepMs); // 작업 시간 시뮬레이션
log(name + " 완료");
}
}
corePoolSize=2
, maximumPoolSize=4
를 사용해서 기본 스레드는 2개 최대 스레드는 4개로 설정했다Runnable
구현체를 7개 만들어서 동작 시켰다waiting
상태로 기다리고 있다Executor
는 max(maximumPoolSize)사이즈까지 초과스레드를 만들어서 작업을 수행한다Executor
는 초과스레드인 스레드3을 만들어서 바로 작업을 지시한다RejectedExecutionException
이 발생한다자바는 Executors
클래스를 통해 3가지 기본 전력을 제공한다.
newFixedThreadPool(nThreads)
nThreads
만큼의 기본 스레드를 생성하며 초과 스레드는 생성하지 않는다public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(2);
og("pool 생성");
printState(es);
for (int i = 1; i <= 6; i++) {
String taskName = "task" + i;
es.execute(new RunnableTask(taskName));
printState(es, taskName);
}
es.close();
log("== shutdown 완료 ==");
}
newCachedThreadPool()