스레드 관리 문제
서버의 CPU, 메모리 자원은 한정 -> 스레드르 무한정 생성 할 수는 없음
Runnable의 불편함
위 3가지 방법을 해결하기 위해선 Thread Pool 필요



멀티스래딩 및 병렬 처리를 쉽게 돕는 기능.
Executor interface
public interface Executor {
void execute(Runnable command);
}
주요 메서드
인터페이스를 확장해서 작업 제출과 제어 기능을 추가로 제공
로그 출력 유틸리티 만들기
package chap50;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import static chap41.util.MyLogger.log;
public abstract class ExecutorUtils {
public static void printState(ExecutorService executorService) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) { //구현체로 ThreadPoolExecutor 넘어옴 (casting)
int pool = poolExecutor.getPoolSize(); //현재 생성된 스레드 개수
int active = poolExecutor.getActiveCount(); //현재 일하고 있는 스래드
int queueTasks = poolExecutor.getQueue().size(); //스레드가 작업을 던지면 queue의 작업을 당김(큐에 작업이 얼마나 들어있는지)
long completedTasks = poolExecutor.getCompletedTaskCount();//완료된 작업은??
log("[pool= " + pool + ", active=" + active + ", queueTasks=" + queueTasks + ", completedTasks=" + completedTasks + "]");
} else {
log(executorService);
}
}
//추가
public static void printState(ExecutorService executorService, String taskName) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) { //구현체로 ThreadPoolExecutor 넘어옴 (casting)
int pool = poolExecutor.getPoolSize(); //현재 생성된 스레드 개수
int active = poolExecutor.getActiveCount(); //현재 일하고 있는 스래드
int queueTasks = poolExecutor.getQueue().size(); //스레드가 작업을 던지면 queue의 작업을 당김(큐에 작업이 얼마나 들어있는지)
long completedTasks = poolExecutor.getCompletedTaskCount();//완료된 작업은??
log("[pool= " + pool + ", active=" + active + ", queueTasks=" + queueTasks + ", completedTasks=" + completedTasks + "]");
} else {
log(executorService);
}
}
}
pool : 스레드 풀에서 관리되는 스레드의 숫자active : 작업을 수행하는 스레드의 숫자queuedTasks : 큐에 대기중인 작업의 숫자completedTask : 완료된 작업의 숫자ExecutorService 인터페이스는 getPoolSize(), getActiveCount() 같은 자세한 기능은 제공 X package chap49.executor;
import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
public class RunnableTask implements Runnable {
private final String name;
private int sleepMs = 1000;
public RunnableTask(String name) {
this.name = name;
}
public RunnableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public void run() {
log(name + " 시작");
sleep(sleepMs);
log(name + " 완료");
}
}
package chap49.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
import static chap49.executor.ExecutorUtils.printState;
public class ExecutorBasicMain {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor
(2,2,0, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>());
//LinkedBlockingQueue -> 무제한으로 꺼낼 수 있음, 없으면 대기
log("== 초기 상태 ==");
printState(es);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
//재사용 구간
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("taskD"));
log("== 작업 수행 중 ==");
printState(es);
sleep(3000);
//두개의 스레드가 있는데 둘다 쉬는 중
log("== 작업 수행 완료 ==");
printState(es);
es.shutdown();
log("== shutdown 완료 ==");
printState(es);
}
}
ThreadPoolExecutor(ExecutorService)
ThreadPoolExecutor 생성자
corePoolSize : 스레드 풀에서 관리되는 기본 스레드의 수
maximumPoolSize : 스레드 풀에서 관리되는 최대 스레드 수
keepAliveTime, TimeUnit unit : 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기시간이다. 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.
BlockingQueue workQueue : 작업을 보관할 블로킹 큐
Runnable 사용
package chap49.executor;
import java.util.Random;
import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
public class RunnableMain {
public static void main(String[] args) throws InterruptedException {
MyRunnable task = new MyRunnable();
Thread thread = new Thread(task, "T1");
thread.start();
thread.join();
int result = task.value;
log("result Value = " + result);
}
static class MyRunnable implements Runnable {
int value;
@Override
public void run() {
log("Runnable 시작");
sleep(2000);
value = new Random().nextInt(10);
log("create Value = " + value);
log("Runnable 완료 ");
}
}
}
Callable
public interface Callable<V> {
V call() throws Exception;
}
Callable과 Future 사용
package chap49.executor.future;
import java.util.Random;
import java.util.concurrent.*;
import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
public class CallableMainV1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> future = es.submit(new MyCallable()); //future를 반환
Integer result = future.get(); //Main이 바로 호출
log("result Value = " + result);
es.shutdown();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt(10);
log("create value = " + value);
log("Callable 끝");
return value;
}
}
}
newFixedThreadPool(size) : 편리하게 ExecutorService를 생성 가능
submit : <T> Future<T> submit(Callable<T> task); //인터페이스 정의
submit을 통해 Callable 작업으로 전달 가능
MyCallable 인스턴스가 블로킹 큐에 절달, 스레드 풀의 스레드 중 하나가 이 작업을 실행
Future.get()은 InterruptException, ExecutionExeception 체크 예외를 던진다.
Executor 프레임워크의 강점
요청 스레드가 결과를 받아야 하는 상황, Callable 사용한 방식은 Runnable을 사용하는 방식보다는 훨씬 편함. 단순한 싱글스레드 방식으로 개발한다는 느낌을 받음
내가 스레들 따로 생성, Join으로 제어 하는 일이 없음
단 future.get을 호출하는 요청 스레드 main은 future.get을 호출 했을 때 2가지 상황으로 나뉨
MyCallable 작업을 처리하는 스레드 풀의 스레드가 작업을 완료했다.
MyCallable 작업을 처리하는 스레드 풀의 스레드가 아직 작업을 완료하지 못했다.
Future<Integer> future = es.submit(new MyCallable());
submit 호출로 MyCallable 인스턴스를 전달
submit은 MyCallable이 반환하는 무작위 숫자 대신 Future을 반환
MyCallable이 즉시 실행되어 즉시 결과를 반환하는 것은 불가능.
생각해보면 MyCallable 이 즉시 실행되어서 즉시 결과를 반환하는 것은 불가능. 스레드 풀의 스레드가 미래의 어떤 시점에 이 코드를 대신 실행.
MyCallable.call() 메서드는 호출 스레드가 실행하는 것도 아니고, 스레드 풀의 다른 스레드가 실행하기 때문에 언제 실행이 완료되어서 결과를 반환할 지 알 수 없다. 결과를 즉시 받는 것은 불가능. es.submit() 은 MyCallable 의 결과를 반환하는 대신에 MyCallable 의 결과를 나중에 받을 수 있는 Futurue 라는 객체를 대신 제공.
package chap49.executor.future;
import java.util.Random;
import java.util.concurrent.*;
import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
public class CallableMainV2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
log("submit 호출");
Future<Integer> future = es.submit(new MyCallable()); //future를 반환
log("future 즉시 반환, future = " + future);
log("future.get() [블로킹] 메서드 호출 시작 -> main 스레드 Waiting");
Integer result = future.get(); //Main이 바로 호출 (결과가 value를 반환할때까지 대기)
log("future.get() [블로킹] 메서드 호출 완료 -> main 스레드 Runnable");
log("result Value = " + result);
log("future 완료, future = " + future);
es.shutdown();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt(10);
log("create value = " + value);
log("Callable 끝");
return value;
}
}
}
실행 결과 분석

es.submit(new MyCallable())
MyCallable 인스턴스를 편의상 taskA 라고 하겠다.
편의상 스레드풀에 스레드가 1개 있다고 가정하겠다.
submit() 을 호출해서 ExecutorService 에 taskA 를 전달
Future의 생성

요청 스레드 es.submit(taskA)를 호출하는 중
ExecutorService는 전달한 taskA의 미래 결과를 알 수 있는 Future 객체를 생성
FutureTask
submit()을 호출한 경우 Future가 만들어지고, 전달한 작업인 taskA가 바로 블로킹 큐에 담기는 것이 아니라, 그림처럼 taskA를 감싸고 있는 Future가 대신 블로킹 큐에 담긴다.
Future<Integer> future = es.submit(new MyCallable());
Future 는 내부에 작업의 완료 여부와, 작업의 결과 값을 가진다. 작업이 완료되지 않았기 때문에 아직은 결과 값이 없다
Future 의 구현체는 `FutureTaskFuture 의 상태는 "Not completed"(미 완료)이고, 연관된 작업은 전달한 taskA(MyCallable 인스턴스) 이다.Future 는 즉시 반환생성한 Future 를 즉시 반환하기 때문에 요청 스레드는 대기하지 않고, 자유롭게 본인의 다음 코드를 호출

큐에 들어있는 Future[taskA] 를 꺼내서 스레드 풀의 스레드1이 작업을 시작
Future의 구현체인 FutureTask 는 Runnable 인터페이스도 함께 구현
스레드1은 FutureTask 의 run() 메서드를 수행
FutureTask.run() -> MyCallable.call()

스레드1 : taskA의 작업을 아직 처리중
요청 스레드
요청 스레드는 Future 인스턴스의 참조
Future.get()을 호출해서 taskA 작업의 미래 결과를 받을 수 있다.
요청 스레드는 작업의 결과가 필요해서 future.get() 을 호출
taskA 의 작업이 완료되면 Future 의 상태도 완료로 변경taskA 의 작업이 아직 완료되지 않았다. 따라서 Future 도 완료 상태가 아니다.RUNNABLE WAITING
요청 스레드
대기(WAITING ) 상태로 future.get() 을 호출하고 대기중이다.
스레드1
taskA 작업을 완료한다.Future 에 taskA 의 반환 결과를 담는다.Future 의 상태를 완료로 변경한다.WAITING RUNNABLE 상태로 변한다.
요청 스레드
요청 스레드는 RUNNABLE 상태가 되었다. 그리고 완료 상태의 Future 에서 결과를 반환 받는다. 참고로taskA 의 결과가 Future 에 담겨있다.
스레드1
작업을 마친 스레드1은 스레드 풀로 반환된다. RUNNABLE WAITING

Future 의 인스턴스인 FutureTask 를 보면 "Completed normally"로 정상 완료
Integer result = es.submit(new MyCallable()); // 여기서 블로킹
이렇게 하면 submit이 언제 결과 값을 반환하는지 알 수 없음
package chap49.executor.future;
import java.util.concurrent.Callable;
import static chap41.util.MyLogger.log;
public class SumTaskMainV1 {
public static void main(String[] args) throws InterruptedException {
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
Thread t1 = new Thread(task1, "t1");
Thread t2 = new Thread(task2, "t2");
t1.start();
t2.start();
log("join() - main 스레드가 t1, t2 종료까지 대기");
t1.join();
t2.join();
log("main 스레드 대기 완료");
log("task1.result =" + task1.result);
log("task2.result =" + task2.result);
int sumAll = task1.result + task2.result;
log("task1 + task2 = " + sumAll);
log("End");
}
static class SumTask implements Runnable {
int startValue;
int endValue;
int result = 0;
public SumTask(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
public void run() {
log("작업 시작");
//예외를 잡았어야 한다.
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int sum = 0;
for (int i = startValue; i <= endValue ; i++) {
sum += i;
}
result = sum;
log("작업 완료 result = " + sum);
}
}
}
package chap49.executor.future;
import java.util.concurrent.*;
import static chap41.util.MyLogger.log;
public class SumTaskMainV2 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
ExecutorService es = Executors.newFixedThreadPool(2);
//main 스레드가 던자고 바로 또 다른 요청을 요구 -> 알아서 여러곳에서 실행
//submit으로 원하는 작업 마음껏 던지고, 결과 받는 건 마지막
Future<Integer> f1 = es.submit(task1);
Future<Integer> f2 = es.submit(task2);
//기다리고 return 여기서 블로킹
Integer sum1 = f1.get();
Integer sum2 = f2.get();
log("task1.result =" + sum1);
log("task2.result =" + sum2);
int sumAll = sum1 + sum2;
log("task1 + task2 = " + sumAll);
log("end");
es.shutdown();
}
static class SumTask implements Callable<Integer> {
int startValue;
int endValue;
int result = 0;
public SumTask(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
//Exception의 자식들을 모두 잡아서 던질 수 있다.
public Integer call() throws Exception {
log("작업 시작");
Thread.sleep(2000);
int sum = 0;
for (int i = startValue; i <= endValue; i++) {
sum += i;
}
result = sum;
log("작업 완료 result = " + sum);
return sum;
}
}
}
작업의 결과를 반환, 요청 스레드에서 결과를 바로 받아서 처리 하는 부분이 매우 직관적, 코드만 보면 마치 멀티스레드를 사용하지 않고, 단일 스레드 상황에서 일반적 메서드 호출, 결과.
Future<Integer> future1 = es.submit(task1); // 여기는 블로킹 아님
Future<Integer> future2 = es.submit(task2); // 여기는 블로킹 아님
Integer sum1 = future1.get(); // 여기서 블로킹
Integer sum2 = future2.get(); // 여기서 블로킹
Future 없이 결과를 직접 반환

Future 를 사용하지 않는 경우 결과적으로 task1 의 결과를 기다린 다음에 task2 를 요청한다. 따라서 총 4초의 시간이 걸렸다. 이것은 마치 단일 스레드가 작업을 한 것과 비슷한 결과
Future 반환


f1.get을 호출해 대기. -> f2.get 호출해 즉시 결과를 받음
Future를 잘못 사용하는 예
1. 적절히 사용
Future<Integer> future1 = es.submit(task1); // non-blocking
Future<Integer> future2 = es.submit(task2); // non-blocking
Integer sum1 = future1.get(); // blocking, 2초 대기
Integer sum2 = future2.get(); // blocking, 즉시 반환
Future<Integer> future1 = es.submit(task1); // non-blocking
Integer sum1 = future1.get(); // blocking, 2초 대기
Future<Integer> future2 = es.submit(task2); // non-blocking
Integer sum2 = future2.get(); // blocking, 2초 대기
package chap49.executor.future;
import java.util.concurrent.*;
import static chap41.util.MyLogger.log;
public class SumTaskBadMainV2 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
ExecutorService es = Executors.newFixedThreadPool(2);
//하나 던지고 결과 받고 하나 던지고 결과 받고
Future<Integer> f1 = es.submit(task1);
Integer sum1 = f1.get(); //2초 대기
Future<Integer> f2 = es.submit(task2); //2초 대기
Integer sum2 = f2.get();
log("task1.result =" + sum1);
log("task2.result =" + sum2);
int sumAll = sum1 + sum2;
log("task1 + task2 = " + sumAll);
log("end");
es.shutdown();
}
static class SumTask implements Callable<Integer> {
int startValue;
int endValue;
int result = 0;
public SumTask(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
//Exception의 자식들을 모두 잡아서 던질 수 있다.
public Integer call() throws Exception {
log("작업 시작");
Thread.sleep(2000);
int sum = 0;
for (int i = startValue; i <= endValue; i++) {
sum += i;
}
result = sum;
log("작업 완료 result = " + sum);
return sum;
}
}
}
cancel(true) : Future 를 취소 상태로 변경한다. 이때 작업이 실행중이라면Thread.interrupt() 를 호출해서 작업을 중단한다.cancel(false) : Future 를 취소 상태로 변경, 작업을 중단하지 않는다.true , 이미 완료되었거나 취소할 수 없는 경우 falseboolean isCancelled()
기능 : 작업이 취소되었는지 여부
반환값 : 작업이 취소된 경우 true , 그렇지 않은 경우 false
설명 : 이 메서드는 작업이 cancel() 메서드에 의해 취소된 경우에 true 를 반환
boolean isDone()
기능 : 작업이 완료되었는지 여부를 확인한다.
반환값: 작업이 완료된 경우 true , 그렇지 않은 경우 false
설명: 작업이 정상적으로 완료되었거나, 취소되었거나, 예외가 발생하여 종료된 경우에 true 를 반환한다.
State state()
기능 : Future 상태를 반환
RUNNING : 작업 실행 중
SUCCESS : 성공 완료
FAILED : 실패 완료
CANCELLED : 취소 완료
V get()
기능 : get과 같은데 시간 초과시 예외
매개변수
timeout : 대기할 최대 시간unit: timeout 매개변수의 시간 단위 지정InterruptedException : 대기 중에 현재 스레드가 인터럽트된 경우 발생ExecutionException : 계산 중에 예외가 발생한 경우 발생TimeoutException : 주어진 시간 내에 작업이 완료되지 않은 경우 발생package chap49.executor.future;
import java.util.concurrent.*;
import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
public class FutureCancelMain {
private static boolean mayInterruptIfRunning = true;
//private static boolean mayInterruptIfRunning = false;
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<String> future = es.submit(new MyTask());
//log("future state: " + future.state());
sleep(3000);
//cancel 호출
log("future.cancel(" + mayInterruptIfRunning + ") 호출");
boolean cancelResult1 = future.cancel(mayInterruptIfRunning);
log("future.cancel(" + mayInterruptIfRunning + ") result: " + cancelResult1);
//결과 확인
try {
log("Future result : " + future.get());
} catch (CancellationException e) { //Runtime 예외
log("Future는 이미 취소 되었습니다.");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
es.shutdown();
}
static class MyTask implements Callable<String> {
@Override
public String call() throws Exception {
try {
for (int i = 0; i < 10; i++) {
log("작업 중 : " + i);
}
Thread.sleep(1000);
} catch (InterruptedException e) {
log("인터럽트 발생");
return "Interrupted";
}
return "completed";
}
}
}
package chap49.executor.future;
import java.util.concurrent.*;
import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
public class FutureExceptionMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
log("작업 전달");
Future<Integer> future = es.submit(new ExCallable());
sleep(1000);
try {
log("future.get() 호출 시도, future.state()" + future.isDone()); //FAILED
Integer result = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
//실행 중에 예외 발생 call 안에 코드를 발생하는 예외
} catch (ExecutionException e) {
log("e = " + e);
Throwable cause = e.getCause(); //ExecutionException이 왜 발생했는지
log("cause = " + cause);
}
es.shutdown();
}
static class ExCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
log("Callable 실행, 예외 발생");
throw new IllegalArgumentException("ex!"); //callable이 호출해서 예외 발생
}
}
}
package chap49.executor;
import java.util.concurrent.Callable;
import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
public class CallableTask implements Callable<Integer> {
private String name;
private int sleepMs = 1000;
public CallableTask(String name) {
this.name = name;
}
public CallableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public Integer call() throws Exception {
log(name + "실행");
sleep(sleepMs);
log(name + " 완료, return = " + sleepMs);
return sleepMs;
}
}
invokeAll()
Callable 작업을 제출하고 모든 작업이 완료될 때까지 대기
지정된 시간 내에 모든 Callable 작업을 제출, 완료시 대기
package chap49.executor.future;
import chap49.executor.CallableTask;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static chap41.util.MyLogger.log;
public class InvokeAllMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask t1 = new CallableTask("t1", 1000);
CallableTask t2 = new CallableTask("t2", 2000);
CallableTask t3 = new CallableTask("t3", 3000);
List<CallableTask> tasks = List.of(t1, t2, t3);
List<Future<Integer>> futures = es.invokeAll(tasks);
for (Future<Integer> future : futures) {
Integer value = future.get();
log("value = " + value);
}
es.shutdown();
}
}
invokeAny()
하나의 Callable 작업이 완료될 때 까지 대기, 가장 먼저 완료된 작업의 결과를 반환
완료되자 않은 나머진 취소
지정된 시간 내에 하나의 Callable 작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과를 반
환한다.
완료되지 않은 나머지 작업은 취소
package chap49.executor.future;
import chap49.executor.CallableTask;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static chap41.util.MyLogger.log;
public class InvokeAnyMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask t1 = new CallableTask("t1", 1000);
CallableTask t2 = new CallableTask("t2", 2000);
CallableTask t3 = new CallableTask("t3", 3000);
List<CallableTask> tasks = List.of(t1, t2, t3);
Integer value = es.invokeAny(tasks);
log("value = " + value);
es.shutdown();
}
}