Multi Thread(멀티스레드) - 3 (스레드 풀)

dykwon·2024년 1월 16일
  1. 스레드풀
  • 스레드풀은 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해놓고 작업 큐에 들어오는 작업들을 하나씩 스레드가 맡아 처리하는 것
  • 작업 처리 요청이 폭증되어도 스레드의 전체 개수가 늘어나지 않으며, 작업 처리가 끝난 스레드는 다시 작업 큐에서 새로운 작업을 가져와 처리한다
  • 자바에서는 java.util.concurrent 패키지의 ExcutorService가 그 역할을 한다
  1. 스레드풀 생성
  • newCahcedThreadPool() : 초기 스레드 수 0, 코어 스레드 수 0, 최대 스레드 수, Integer.MAX_VALUE
    운영체제의 성능과 상황에 따라 스레드가 동적으로 추가된다. 60초동안 추가된 스레드가 작업이 없을때는 스레드를 종료하고 풀에서 삭제한다
  • newFixedThreadPool(int num) : 초기 스레드 수 0, 코어 스레드 수 num, 최대 스레드 수 num
    스레드가 작업을 처리하고 나서 놀고 있더라도 스레드 개수는 고정이 되어 줄지 않는다
    ExcutorService es = Excutors.newCachedThreadPool();
    ExcutorService excutor = Excutors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    // ThreadPoolExecutor를 사용항여 직접 스레드 풀을 생성한 경우
    ExcutorService es = new ThreadPoolExecutor(3, 100, 120L, TimeUnit.SECONDS, new SynchronouseQueue<Runnable>());
  1. 스레드 풀 종료
  • excutorService.shutdown() : 스레드풀 일반 종료, 모든 작업 처리 후 종료
  • excutorService.shutdownNow() : 스레드풀 강제 종료, 현재 작업 처리 중인 스레드를 모두 interrupt하고 스레드풀을 종료
  1. 작업 생성과 작업 처리 요청
  • 하나의 작업은 Runnable 또는 Callable 구현 클래스로 표현
  • Runnalbe은 작업 처리 완료 who 리턴 값이 없으며, Callable은 리턴타입을 갖고 있음
  • 작업 처리 요청은 ExcutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위를 말함
  • execute(Runnable command) 는 Runnable을 작업 큐에 저장하고, 작업 처리 결과는 반환 받지 않는 경우 사용
  • submit(Runnable command) 또는 submit(Callable task)는 Runnable 또는 Callable을 작업 큐에 저장하고 처리결과를 반환 받는 경우 사용
  • excecute는 오류 발생 시, 해당 스레드를 제거하고 재생성함. 그러나 submit은 재생성 하지 않고 작업 결과를 반환함
    • 스레드 생성에 대한 오버헤드를 줄이기 위해서는 submit을 사용하는 것이 바람직함
  1. 블로킹(Blocking) 방식의 작업 완료 통보
  • submit() 메소드를 호출하면, Runnable 또는 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴한다
  • Future 객체는 작업 결과가 아니라 작업이 완료될때까지 블로킹 되었다가, 최종 결과를 얻는데 사용한다
  • get() : 작업이 완료될떄까지 기다렸다가, 처리결과 V를 리턴 (스레드가 작업이 완료되기전까지는 get() 메소드가 블로킹 되어 다른 코드 실행이 불가)
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> callable = new Callable<String>() {
	@Override
	public String call() throws InterruptedException {
		Thread.sleep(3000);
		return "Thread: " + Thread.currentThread().getName();
	}
};

Future<String> future = executorService.submit(callable);
String rs = future.get();
executorService.shutdown();
  1. 작업 처리 결과를 외부에 저장
  • 작업 처리 결과를 외부에 저장하고 이를 하나로 취합하는 방식
public class ResultByRunnableExample {
	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
		
		System.out.println("[작업 처리 요청]");
		class Task implements Runnable{
			Result result;
			
			Task(Result result){
				this.result = result;
			}
			
			@Override
			public void run() {
				int sum = 0;
				for(int i =1; i<=10; i++) {
					sum+=i;
				}
				result.addValue(sum);
			}
		}
		
		Result result = new Result();
		Runnable task1 = new Task(result);
		Runnable task2 = new Task(result);
		
		Future<Result> future1 = executorService.submit(task1, result);
		Future<Result> future2 = executorService.submit(task2, result);
		
		try {
			result = future1.get();
			result = future2.get();
			System.out.println("[처리 결과] "+result.accumValue);
			System.out.println("[작업 처리 완료]");
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("[실행 예외 발생함]"+e.getMessage());
		}
		executorService.shutdown();
	}
}

class Result{
	int accumValue;
	synchronized void addValue(int value) {
		accumValue += value;
	}
}
  1. 작업 완료된 순으로 통보
  • 작업이 완료된 순으로 바로 Future를 리턴 받는 방법은 CompletionService를 활용하는 방법임
  • CompletionService는 poll()과 take() 메소드를 제공함
  • poll()은 완료된 작업의 Future를 즉시 가져오며, 완료된 작업이 없다면 null을 리턴함
  • take()는 완료된 작업의 Future를 가져오며, 완료된 작업이 없다면 있을때까지 블로킹 됨
   public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
			Runtime.getRuntime().availableProcessors()
		);
		
		CompletionService<Integer> completionService = 
				new ExecutorCompletionService<Integer>(executorService);
		
		System.out.println("[작업 처리 요청]");
		
		for(int i =0; i<3; i++) {
			completionService.submit(new Callable<Integer>() {

				@Override
				public Integer call() throws Exception {
					int sum =0;
					for(int i =1; i <= 10; i++) {
						sum += i;
					}
					return sum;
				}
			});
		}
		
		System.out.println("[처리 완료된 작업 확인]");
		executorService.submit(new Runnable() {

			@Override
			public void run() {
				while(true) {
					try {
						Future<Integer> future = completionService.take(); 
						int value = future.get();
						System.out.println("[처리 결과] "+value);
					} catch (Exception e) {
						e.printStackTrace();
						break;
					}
				}
			}
		});
		
		try { Thread.sleep(3000); } 
		catch (Exception e) {}
		executorService.shutdownNow();
		
  1. 콜백 방식으로 통보
  • 스레드가 완료되면 특정 콜백 메소드를 자동으로 호출하는 방법
  • 블로킹 방식은 작업 처리 후 작업이 완료될때까지 기다려야 하지만, 콜백 방식은 작업 처리를 요청한 후 기다릴 필요가 없이 다른 기능 수행이 가능함
  • ExecutorService에는 콜백 기능ㅇ ㅣ제공되지 않기 때문에, java.nio.CopletionHandler를 활용하거나 직접 콜백을 구현해야함
  • CompletionHandler에는 completed()와 failed가 있는데 이는 override 하여 사용이 가능하며, completed는 완료 시 failed는 예외 발생 시 사용
CompletionHandler<V, A> callback = new CompletionHandler<V, A>(){
	@Override
    public void completed(V result, A attachment){ } // 성공시 호출
    
    @Override
    public void failde(V result, A attachment){ } // 실패시 호출
};	
public class CallbackExample {
	private ExecutorService executorService;
	
	public CallbackExample() {
		executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
	}
	
	private CompletionHandler<Integer,Void> callback = new CompletionHandler<Integer,Void>(){

		@Override
		public void completed(Integer result, Void attachment) {	
			System.out.println("completed() 실행 : "+result);
		}

		@Override
		public void failed(Throwable exc, Void attachment) {
			System.out.println("failed() 실행 : " + exc.toString());
		}
	};
	
	public void doWork(final String x, final String y) {
		Runnable task = new Runnable() {

			@Override
			public void run() {
				try {
					int intX = Integer.parseInt(x);
					int intY = Integer.parseInt(y);
					int result = intX + intY;
					callback.completed(result, null);
				} catch (Exception e) {
					callback.failed(e, null);
				}
			}
		};
		executorService.submit(task);
	}
	
	public void finish() {
		executorService.shutdown();
	}
	
	public static void main(String[] args) {
		CallbackExample example = new CallbackExample();
		example.doWork("3", "3");
		example.doWork("3", "삼");
		example.finish();
	}
	
}
profile
Programmer, who turns ideas into value

0개의 댓글