이전에 모든 실습에서 우리는 new Thread()로 하여금 스레드를 직접 생성해서 Thread에 task를 적재해서 start()를 통해 쓰레드가 할 일을 정의하고 실행했다.
가벼운 작업이라면 작업의 실행보다 스레드의 생성 시간이 더 오래 걸릴 수도 있다.
스레드는 무겁고 운영체제 스케줄링과도 직접적으로 맞닿아 있어 개발자가 이를 잘못다루게 되면 치명적인 이슈가 발생할 수 있다.
필요시 쓰레드를 사용하는 것이 아니라 쓰레드 풀(Thread Pool)이라는 스레드를 담아두는 공간을 만들어 필요할 때마다 이 쓰레드 풀에서 쓰레드를 꺼내어 사용 후 사용이 끝나면 반납하는 방식으로 사용할 수 있을 것이다.
Executor는 스레드 풀, 스레드 관리, Runnable 인터페이스의 문제점, 생산자 소비자 문제와 같은 다양한 문제들을 한번에 해결해주는 자바 멀티스레드의 최고의 도구이다.
Executor 인터페이스는 다음과 같은 기능 하나를 제공한다.
public interface Executor {
void execute(Runnable command);
}
ExecutorService 인터페이스는 Executor를 상속받으며 submit(), close()기능이 추가된다.
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task);
@Override
default void close(){...}
...
}
ThreadPoolExecutor는 Executor의 가장 대표적인 구현체이며 크게 2가지 요소(스레드 풀, BlockingQueue)로 구성되어있다.
ExecutorService es = new ThreadPoolExecutor(2,2,0,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
es.execute(new Runnable("taskA"))
위와 같이 (약간은 복잡한)생성자 파라미터를 만족시켜 ThreadPoolExecutor 구현체를 생성할 수 있다.
ThreadPoolExecutor 의 생성자는 다음 속성을 사용한다.
corePoolSize : 스레드 풀에서 관리되는 기본 스레드의 수maximumPoolSize : 스레드 풀에서 관리되는 최대 스레드 수keepAliveTime , TimeUnit unit : 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기 시간이다. 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.BlockingQueue workQueue : 작업을 보관할 블로킹 큐
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
log("== 초기 상태 ==");
printState(es);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("taskD"));
log("== 작업 수행 중 ==");
printState(es);
sleep(3000);
log("== 작업 수행 완료 ==");
printState(es);
es.close();
log("== shutdown 완료 ==");
printState(es);
}
기본, 최대 관리 스레드 수를 2개로 설정하고 task를 4개 생성했다. es.execute()를 통해 4개의 Runnable task를 주었다.
ThreadPoolExecutor는 생성시점에 스레드를 만들어 확보해놓지 않는다. task가 들어올때 마다 자신의 설정 한계값까지 스레드를 대응해서 만든다. 작업은 execute()를 통해 들어오고 BlockingQueue로 작업이 들어온다.
es.execute(task)를 호출하면 BlockingQueue에 작업을 보관한다(생산자) 스레드 풀에 존재하는 스레드들은 소비자로 이 작업을 꺼낼 것이다.
생산자-소비자 개념이 들어가므로, BlockingQueue를 사용한다고 이해하면 된다.
Runnable은 반환값이 없고, 예외를 밖으로 던질 수 없다는 점에서 불편함이 존재한다.
public interface Callable<V> {
V call() throws Exception;
}
자바는 이러한 문제를 보완한 Callable을 제공한다. throws Exception을 통해 예외를 던질 수 있으며 반환 형식을 갖춘 Runnable을 제공한다.
Callable 구현체 또한 es로 하여금 작업할당이 가능하며, 이는 submit()메서드로 가능하다. 이때 작업은 Future라는 특별한 인터페이스를 통해 반환된다.
미래라는 뜻을 가진 이 객체를 반환하는 이유는 무엇일까.
es.submit(Callable 구현체)를 호출할 경우 곧바로 Future 객체를 반환받아 사용 가능하다. 이 객체는 미래의 결과를 받을 수 있는 객체라는 점에서 Future라는 이름을 가지게 되었다.
Callable task를 ExecuteService로 하여금 스레드를 할당받아 실행하기 위해 다음과 같은 과정이 존재한다.
submit은 2번 과정에 해당한다. 2~3번 과정에는 시간이 필요하다. 우리는 2~3번에 해당하는 시간을 직렬적으로 보내지 않고 병렬적으로 하여금 빠르게 결과를 얻기 위해 ExecuteService를 이용중이다.
Future에 결과가 생성될 때까지(3번까지 완료될 때 까지) Future의 내부는 task에 대한 결과가 들어있지 않으며 3번 과정이 끝이 나야 Future에 결과가 들어온다.
Future의 메서드 .get()은 Boolean 타입이다. 이 get()은 결과가 Future에 들어왔다면 true 그렇지 않으면 false이다. 또, 이 get()메서드는 결과가 들어올 때 까지 코드의 진행을 멈추어 기다리게 된다. 즉 Blocking 메서드이다.
Future이 존재하는 이유
// 1(정상). Future<Integer> future = es.submit(new MyCallable()); Integer result = future.get(); // 2(틀림). Integer result = es.submit(new MyCallable());어차피 .get()이 기다린다면 2처럼 해서 굳이 Future을 거치지 않아도 되지 않을까? 이에 대한 물음은 다음 코드로 하여금 반박이 가능하다.
Future<Integer> future1 = es.submit(new MyCallable()); Future<Integer> future2 = es.submit(new MyCallable()); Integer result1 = future.get(); Integer result2 = future.get();new MyCallable()이 약 3초간의 작업이라고 가정해보자. 위는 es.submit()을 통해 3초간의 작업을 두 개 곧바로 적재해놓고 첫번째 get()을 실행하여 result1에 3초정도 지나면 데이터가 들어오고 두번째 get()을 실행할 것이다. 이때 두번째 get()은 거의 곧바로 데이터가 들어올 것이다. es.submit()으로 두 태스크를 거의 동시에 실행해놓았기 때문이다.
만약 Future가 없이 다음처럼 코드가 형성된다면 우리는 스레드는 두 개 이용하지만 스레드를 하나 사용해서 절차적으로 코드를 짠 것과 차이가 없어진다.Integer result1 = es.submit(new MyCallable()); Integer result2 = es.submit(new MyCallable()); // 이것은 아래의 코드와 같을 것이다. Future<Integer> future1 = es.submit(new MyCallable()); Integer result1 = future.get(); Future<Integer> future2 = es.submit(new MyCallable()); Integer result2 = future.get();자바 스크립트의 비동기 async, await를 잘 이해하고 있다면 이것이 어떤 뜻인지 쉽게 이해할 수 있을 것이다. 비동기를 쓰려고 짠 메서드의 모든 부분에 await를 걸어버리는 것은 사실상의 절차적 코드이기 때문이다.
Future에는 cancel 기능이 존재한다. 아직 완료되지 않은 작업을 취소할 수 있으며 true일 경우 interrupt()로 작업을 종료한다.(강제종료와 같은 느낌) false일 경우 이미 실행중인 작업을 중단하지는 않는다.
Callable 작업을 3개 만들었다고 가정해보자. 이를 모두 es.submit()하기엔 코드가 너무 복잡해진다. es.submit()이 너무 코드상에 많아진다.
작업들을 Collection에 넣어 한번에 submit할 수 있는 invokeAll()을 자바에서 제공한다.
List<Callable<Boolean>> taskWorks = List.of(taskInventoryWork, taskShippingWork, taskAccountingWork);
List<Future<Boolean>> futures = es.invokeAll(taskWorks);
for (Future<Boolean> future : futures) {
if (future.get()) {
continue;
} else {
log("일부 작업이 실패했습니다.");
return;
}
}
log("모든 주문 처리가 성공적으로 완료되었습니다.");
invokeAll()과 비슷하게 한번에 작업을 컬렉션을 통해 여러개 적재해서 사용하는 것이지만 하나가 완료되어 리턴될 경우 나머지는 모두 취소된다.