이것이 자바다 정리 #12 멀티 스레드 3 (작업 완료 통보 방법들)
이것이 자바다 책을 참고하였습니다.
ExecutorService
의 .submit()
메소드는 스레드 작업으로 준 Runnable
혹은 Callable
작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future
객체를 반환한다.
Future<?> submit(Runnable task)
Future<V> submit(Runnable task, V result)
Future<V> submit(Callable<V> task)
Future
객체는 단순히 작업 결과를 받는 것이 아니라 작업이 완료될 때까지 기다렸다가 최종 결과를 얻는데 사용된다. Future
를 지연 완료(pending completion) 객체라고도 한다.
Future
의 .get()
메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다. 이것이 블로킹을 사용하는 작업 완료 통보 방식이다.
V get()
: 작업이 완료될 때까지 블로킹되었다가 처리 결과 V
를 반환한다.V get(long timeout, TimeUnit unit)
: timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 작업이 완료되지 않으면 TimeoutException
을 발생시킨다.submit(Runnable task)
future.get() -> null
future.get() -> throw 예외객체
submit(Runnable task, Integer result)
future.get() -> int
future.get() -> throw 예외객체
submit(Callable<String> task)
future.get() -> String
future.get() -> throw 예외객체
주의사항: UI 처리 등을 하는 스레드가
Future
의.get()
메소드를 호출하면, 반환 값이 있기 전까지 UI 작업 등 모든 작업을 멈추기 때문에 새로운 스레드를 생성해서Future.get()
을 호출하거나, 스레드 풀의 다른 스레드에게Future.get()
을 위임해야 한다.
new Thread(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
executorService.submit(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
boolean cancel(boolean mayInterruptIfRunning)
: 작업 처리가 진행 중일 경우 취소true
를 주면 인터럽트를 건다.boolean isCancelled()
: 작업이 취소되었는지 여부boolean isDone()
: 작업 처리가 완료되었는지 여부Runnable
객체로 생성한다.Future<?> submit(Runnable task)
메소드를 이용한다.Future
객체를 반환하는데, 정상적으로 작업처리가 발생했는지, 예외가 발생했는지 확인하기 위해서다.null
을 반환한다.InterruptedException
을 발생시킨다.ExecutionException
을 발생시킨다.public class NoResultExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
Runnable runnable = () -> {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum+=i;
}
System.out.println("[처리 결과] " + sum);
};
Future future = executorService.submit(runnable);
try {
future.get();
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생] " + e.getMessage());
e.printStackTrace();
}
executorService.shutdown(); // 스레드 풀 종료
}
}
Callable
로 작업 객체를 생성하면 된다. T
는 call()
메소드가 리턴하는 타입이 되도록하면 된다.Callable<T> task = new Callable<T>() {
@Override
public T call() throws Exception {
// 스레드가 처리할 작업 내용
return T;
}
}
Future<T> future = executorService.submit(task);
public class YesResultExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
Callable<Integer> callable = () -> {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
System.out.println("[처리 결과] " + sum);
return sum;
};
Future<Integer> future = executorService.submit(callable);
try {
Integer resultInteger = future.get();
System.out.println("[future.get()] " + resultInteger);
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생] " + e.getMessage());
e.printStackTrace();
}
executorService.shutdown();
}
}
이전에 리턴 값이 없던 경우와 거의 같은데,
Callable
인터페이스를 구현하고,.run()
메소드에서 값 반환을 하고,Future.get()
으로 받아주는 것만 다르다.public class SendRunnableResultToAnotherObject {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
class Result {
int accumValue;
synchronized void addValue(int value) {
accumValue += value;
}
}
class Task implements Runnable {
Result result;
public Task(Result result) {
this.result = result;
}
@Override
public void run() {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
result.addValue(sum);
}
}
Result result = new Result();
Runnable task1 = new Task(result);
Runnable task2 = new Task(result);
Future<Result> future1 = executorService.submit(task1, result);
Future<Result> future2 = executorService.submit(task2, result);
try {
result = future1.get();
System.out.println("result.accumValue = " + result.accumValue);
result = future2.get();
System.out.println("[처리 결과] " + result.accumValue);
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생] " + e.getMessage());
e.printStackTrace();
}
executorService.shutdown();
}
}
.submit()
메소드에 작업과 공유 객체를 인자로 넘기면 된다.CompletionService
객체는 처리 완료된 작업을 가져오는 .poll()
메소드와 .take()
메소드를 제공한다.Future<V> poll()
: 완료된 작업의 Future
를 가져온다. 완료된 작업이 없다면 즉시 null
을 반환한다.Future<V> poll(long timeout, TimeUnit unit)
: 완료된 작업이 없다면, timeout
까지 블로킹된다.Future<V> take()
: 완료된 작업이 없다면 있을 때까지 블로킹된다.Future<V> submit(Callable<V> task)
: 스레드 풀에 Callable
작업 요청Future<V> submit(Runnable task, V result)
: 스레드 풀이 Runnable
작업 요청public class CompletionServiceExample extends Thread{
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// CompletionService 생성
CompletionService<Integer> completionService =
new ExecutorCompletionService<Integer>(executorService);
System.out.println("[3개의 Callable 객체 작업 처리 요청]");
for (int i = 0; i < 3; i++) {
int finalI = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int j = 1; j <= 10; j++) {
sum += j;
}
return sum + finalI;
};
});
}
System.out.println("[처리 완료된 작업 확인]");
executorService.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
Future<Integer> future = completionService.take();
int value = future.get();
System.out.println("[처리 결과] " + value);
} catch (InterruptedException e) {
System.out.println("[INTERRUPT OCCURRED]");
break;
} catch (ExecutionException e) {
e.printStackTrace();
break;
}
}
}
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("마지막 도달");
executorService.shutdownNow();
}
}
위의 예제는 CompletionService
를 이용하여 Callable
작업을 처리하는 예제이다.
finalI
를 더해주었다.shutdownNow()
코드를 넣어서, 3초 뒤에는 스레드 풀 내의 모든 스레드에 Interrupt를 건다..take()
메소드의 결과 값으로 Interrupt를 받게 되고 즉시 끝난다.2번째 1번째 3번째 순으로 연산이 끝났다.
java.nio.channels.CompletionHandler
를 이용해도 된다..completed()
메소드가 존재한다..failed()
메소드가 존재한다.CompletionHandler<V, A> callback = new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
}
@Override
public void failed(Throwable exc, A attachment) {
}
}
A attachment
는 콜백 메소드 결과값 외에 추가적으로 전달할 객체가 있으면 설정해주면 된다.null
을 보내도 된다.public class CallbackExample {
private ExecutorService executorService;
public CallbackExample() {
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
}
private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("completed() 실행: " + result);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("failed() 실행: " + exc.toString());
}
};
public void doWork(final String x, final String y) {
Runnable task = new Runnable() {
@Override
public void run() {
try {
int intX = Integer.parseInt(x);
int intY = Integer.parseInt(y);
int result = intX + intY;
callback.completed(result, null);
} catch (NumberFormatException e) {
callback.failed(e, null);
}
}
};
executorService.submit(task);
}
public void finish() {
executorService.shutdown();
}
public static void main(String[] args) {
CallbackExample callbackExample = new CallbackExample();
callbackExample.doWork("3", "3");
callbackExample.doWork("3", "삼");
callbackExample.finish();
}
}