이것이 자바다 정리 #12 멀티 스레드 3 (작업 완료 통보 방법들)
이것이 자바다 책을 참고하였습니다.
ExecutorService의 .submit() 메소드는 스레드 작업으로 준 Runnable 혹은 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future 객체를 반환한다.
Future<?> submit(Runnable task)Future<V> submit(Runnable task, V result)Future<V> submit(Callable<V> task)Future 객체는 단순히 작업 결과를 받는 것이 아니라 작업이 완료될 때까지 기다렸다가 최종 결과를 얻는데 사용된다. Future를 지연 완료(pending completion) 객체라고도 한다.
Future의 .get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다. 이것이 블로킹을 사용하는 작업 완료 통보 방식이다.
V get(): 작업이 완료될 때까지 블로킹되었다가 처리 결과 V를 반환한다.V get(long timeout, TimeUnit unit): timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 작업이 완료되지 않으면 TimeoutException을 발생시킨다.submit(Runnable task)future.get() -> nullfuture.get() -> throw 예외객체submit(Runnable task, Integer result)future.get() -> intfuture.get() -> throw 예외객체submit(Callable<String> task)future.get() -> Stringfuture.get() -> throw 예외객체주의사항: UI 처리 등을 하는 스레드가
Future의.get()메소드를 호출하면, 반환 값이 있기 전까지 UI 작업 등 모든 작업을 멈추기 때문에 새로운 스레드를 생성해서Future.get()을 호출하거나, 스레드 풀의 다른 스레드에게Future.get()을 위임해야 한다.
new Thread(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
executorService.submit(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
boolean cancel(boolean mayInterruptIfRunning): 작업 처리가 진행 중일 경우 취소true를 주면 인터럽트를 건다.boolean isCancelled(): 작업이 취소되었는지 여부boolean isDone(): 작업 처리가 완료되었는지 여부Runnable 객체로 생성한다.Future<?> submit(Runnable task) 메소드를 이용한다.Future객체를 반환하는데, 정상적으로 작업처리가 발생했는지, 예외가 발생했는지 확인하기 위해서다.null을 반환한다.InterruptedException을 발생시킨다.ExecutionException을 발생시킨다.public class NoResultExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
Runnable runnable = () -> {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum+=i;
}
System.out.println("[처리 결과] " + sum);
};
Future future = executorService.submit(runnable);
try {
future.get();
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생] " + e.getMessage());
e.printStackTrace();
}
executorService.shutdown(); // 스레드 풀 종료
}
}
Callable로 작업 객체를 생성하면 된다. T는 call() 메소드가 리턴하는 타입이 되도록하면 된다.Callable<T> task = new Callable<T>() {
@Override
public T call() throws Exception {
// 스레드가 처리할 작업 내용
return T;
}
}
Future<T> future = executorService.submit(task);
public class YesResultExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
Callable<Integer> callable = () -> {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
System.out.println("[처리 결과] " + sum);
return sum;
};
Future<Integer> future = executorService.submit(callable);
try {
Integer resultInteger = future.get();
System.out.println("[future.get()] " + resultInteger);
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생] " + e.getMessage());
e.printStackTrace();
}
executorService.shutdown();
}
}
이전에 리턴 값이 없던 경우와 거의 같은데,
Callable 인터페이스를 구현하고,.run() 메소드에서 값 반환을 하고,Future.get()으로 받아주는 것만 다르다.public class SendRunnableResultToAnotherObject {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
class Result {
int accumValue;
synchronized void addValue(int value) {
accumValue += value;
}
}
class Task implements Runnable {
Result result;
public Task(Result result) {
this.result = result;
}
@Override
public void run() {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
result.addValue(sum);
}
}
Result result = new Result();
Runnable task1 = new Task(result);
Runnable task2 = new Task(result);
Future<Result> future1 = executorService.submit(task1, result);
Future<Result> future2 = executorService.submit(task2, result);
try {
result = future1.get();
System.out.println("result.accumValue = " + result.accumValue);
result = future2.get();
System.out.println("[처리 결과] " + result.accumValue);
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생] " + e.getMessage());
e.printStackTrace();
}
executorService.shutdown();
}
}
.submit() 메소드에 작업과 공유 객체를 인자로 넘기면 된다.CompletionService 객체는 처리 완료된 작업을 가져오는 .poll() 메소드와 .take() 메소드를 제공한다.Future<V> poll(): 완료된 작업의 Future를 가져온다. 완료된 작업이 없다면 즉시 null을 반환한다.Future<V> poll(long timeout, TimeUnit unit): 완료된 작업이 없다면, timeout까지 블로킹된다.Future<V> take(): 완료된 작업이 없다면 있을 때까지 블로킹된다.Future<V> submit(Callable<V> task): 스레드 풀에 Callable 작업 요청Future<V> submit(Runnable task, V result): 스레드 풀이 Runnable 작업 요청public class CompletionServiceExample extends Thread{
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// CompletionService 생성
CompletionService<Integer> completionService =
new ExecutorCompletionService<Integer>(executorService);
System.out.println("[3개의 Callable 객체 작업 처리 요청]");
for (int i = 0; i < 3; i++) {
int finalI = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int j = 1; j <= 10; j++) {
sum += j;
}
return sum + finalI;
};
});
}
System.out.println("[처리 완료된 작업 확인]");
executorService.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
Future<Integer> future = completionService.take();
int value = future.get();
System.out.println("[처리 결과] " + value);
} catch (InterruptedException e) {
System.out.println("[INTERRUPT OCCURRED]");
break;
} catch (ExecutionException e) {
e.printStackTrace();
break;
}
}
}
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("마지막 도달");
executorService.shutdownNow();
}
}
위의 예제는 CompletionService를 이용하여 Callable 작업을 처리하는 예제이다.
finalI를 더해주었다.shutdownNow() 코드를 넣어서, 3초 뒤에는 스레드 풀 내의 모든 스레드에 Interrupt를 건다..take() 메소드의 결과 값으로 Interrupt를 받게 되고 즉시 끝난다.
2번째 1번째 3번째 순으로 연산이 끝났다.
java.nio.channels.CompletionHandler를 이용해도 된다..completed() 메소드가 존재한다..failed() 메소드가 존재한다.CompletionHandler<V, A> callback = new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
}
@Override
public void failed(Throwable exc, A attachment) {
}
}
A attachment는 콜백 메소드 결과값 외에 추가적으로 전달할 객체가 있으면 설정해주면 된다.null을 보내도 된다.public class CallbackExample {
private ExecutorService executorService;
public CallbackExample() {
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
}
private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("completed() 실행: " + result);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("failed() 실행: " + exc.toString());
}
};
public void doWork(final String x, final String y) {
Runnable task = new Runnable() {
@Override
public void run() {
try {
int intX = Integer.parseInt(x);
int intY = Integer.parseInt(y);
int result = intX + intY;
callback.completed(result, null);
} catch (NumberFormatException e) {
callback.failed(e, null);
}
}
};
executorService.submit(task);
}
public void finish() {
executorService.shutdown();
}
public static void main(String[] args) {
CallbackExample callbackExample = new CallbackExample();
callbackExample.doWork("3", "3");
callbackExample.doWork("3", "삼");
callbackExample.finish();
}
}