스레드 풀

오늘·2021년 4월 4일
0

Java

목록 보기
35/42

스레드 (원시 코드)를 많이 사용하면 스레드 폭증이 일어날 수 있는데, 스레드 폭증으로 일어나는 현상은 아래와 같다.
: 병렬 작업 처리가 많아지면 스레드 개수가 증가하고
: 스레드 생성과 스케줄링으로 인해 cpu가 바빠지고
: 메모리 사용량이 늘어나면서
: 애플리케이션의 성능이 급격히 저하된다

스레드 풀

: 작업 처리에 사용되는 스레드를 제한된 개수만큼 미리 생성해놓고
: 작업 큐에 들어오는 작업들을 하나씩 스레드가 맡아 처리하는 것이다
: 작업 처리가 끝난 스레드는 다시 새로운 작업을 큐에서 가져와 처리
-> 작업 처리 요청이 폭증되어도 스레드의 정체 개수가 늘어나지 x
-> 애플리케이션의 성능이 급격히 저하되지 않는다

스레드 풀의 스레드는 기본적으로 데몬 스레드가 아니다
: 메인 스레드가 종료되더라도 스레드 풀 스레드는 작업 처리를 위해
계속 실행한다 (=애플리케이션은 종료되지 않는다)
-> 스레드 풀을 종료해 스레드들이 종료 상태가 되도록 처리해주어야 한다


스레드풀 생성

ExecutorService 구현 객체는 Executors 클래스의 다음 두 가지 메소드 중 하나를 이용해서 생성할 수 있다.

newFizedThreadPool(int nThreads) 메소드로 생성된 스레드풀의 초기 스레드 개수는 0개이고, 코어 스레드 수는 nThreads 이다. 이 스레드풀은 스레드가 작업을 처리하지 않고 놀고 있더라도 스레드 개수가 줄지 않는다.

만약 CPU의 코어 수만큼 최대 스레드를 사용하는 스레드 풀을 생성하려면 아래와 같이 선언하면 된다

ExecutorService executorService = Executors.newFixedThreadPool(
		Runtime.getRuntime().availableProcessors());

new-()메소드를 사용하지 않고 코어 스레드 개수와 최대 스레드 개수를 설정하고 싶다면, 직접 ThreadPoolExecutor 객체를 생성하면 된다.

ExecutorService executorService = Executors.newFixedThreadPool(
		3, 	// 초기 스레드 개수
        	100,	 	// 최대 스레드 개수
        	120L, 		// 놀고 있는 시간
        	TimeUnnit.SECONDS, 	// 놀고있는 시간 단위
        	new SynchronousQueue<Runnable>()	// 작업 큐
        );

스레드 종료

스레드풀의 스레드는 기본적으로 데몬 스레드가 아니기 때문에 main 스레드가 종료되더라도 작업을 처리하기 위해 계속 실행 상태로 남아있다. 따라서 애플리케이션을 종료하려면 스레드풀을 종료시켜 스레드들이 종료 상태가 되도록 처리해주어야 한다

ExecutorService는 종료와 관련하여 다음 세개의 메소드를 제공하고 있다.

: 남아있는 작업을 마무리하고 스레드풀을 종료할 때에는 shutdown()
: 남아있는 작업과는 상관없이 강제로 종료할 때에는 shutdownNow()
를 호출한다.

executorService.shutdown();
executorService.shutdownNow();

작업 생성과 처리 요청

하나의 작업은 Runnable 또는 Callable 구현 클래스로 표현하는데, 둘의 차이점은 작업 처리 완료 후 리턴값이 있느냐 없느냐이다.

Runnable 의 run() 메소드는 리턴값이 없고
Callable의 call() 메소드는 리턴값이 있다

T로 지정해준 타입으로 리턴된다.

작업 처리 요청


execute()와 submit() 의 차이
1. excute()
: 작업 처리 결과를 받지 못한다
: 작업 처리 도중에 예외가 발생하면 스레드가 종료되고, 해당 스레드는 스레드풀에서 제거된다.
-> 스레드풀은 다른 작업 처리를 위해 새로운 스레드를 생성한다

  1. submit()
    : 작업 처리 결과를 받을 수 있도록 Future를 리턴한다
    : 작업 처리 도중에 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용 된다

    따라서 가급적이면 스레드의 생성 오버헤드를 줄이기 위해서 submit()를 사용하는 것이 좋다.

execute() 메소드로 작업 처리를 요청한 경우

public class A_ExecuteEx_630 {
	public static void main(String[] args) throws Exception {
		// 최대 스레드가 2개인 스레드풀 생성
		ExecutorService executorService = Executors.newFixedThreadPool(2);
		
		// 작업정의
		for(int i=0; i<10; i++) {
			// 익명 구현 객체로 익명 스레드 생성한 것
			// 작업 스레드
			Runnable runable = new Runnable() {
				@Override
				public void run() {
					// 스레드 총 개수 및 작업 스레드 이름을 출력한다
					ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
					int poolSize = threadPoolExecutor.getPoolSize();
					String threadName = Thread.currentThread().getName();
					System.out.println("[총 스레드 갯수 : " + poolSize +
							"] 작업 스레드 이름 : " + threadName);
					
					// 예외 발생 시점
					int value = Integer.parseInt("삼");
				}
			};
			
			// 작업 처리 요청
			// start() 아니고 excute()
			// 실행시 스레드 풀 박스에 들어가서 작업을 실행
			executorService.execute(runable);
			// executorService.submit(runable);
			
			// 콘솔에 출력시간을 주기 위해서 0.01초 일시정지 시킨다
			Thread.sleep(10);
		}
		// 스레드풀 종료
		executorService.shutdown();
	}
}

실행화면

작업 도중 예외가 발생하여 해당 스레드가 제거되고 새 스레드가 계송 생성되고 있음을 알 수 있다.


블로킹 방식의 작업 완료 통보

: Future 객체는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가 (지연=블로킹) 최종 결과를 얻는데 사용된다.
: Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹 되었다가 작업을 완료하면 처리 결과를 리턴한다.
: Future를 이용한 블로킹 방식의 작업 완료 통보에서 주의할 점은 작업을 처리하는 스레드가 작업을 완료하기 전까지는 다른 코드를 실행할 수 없다.

리턴 값이 없는 작업 완료 통보

리턴값이 없기 때문에 Runnable 객체로 생성하고, submit() 메소드를 이용하면 된다.

리턴값이 없는 작업 완료 통보

public class B_NoResultEx_735 {
	public static void main(String[] args) {
		// CPU의 코어 수만큼 최대 스레드를 사용하는 스레드풀 생성
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors());
		
		System.out.println("[작업 처리 요청]");
		
		// 리턴값이 없는 작업일 경우 Runnable 객체로 생성하면된다.
		Runnable runnable = new Runnable() {
			@Override
			public void run() {
				int sum=0; for(int i=1; i<=10; i++) {
					sum += i;
				}
				System.out.println("[처리결과 : " + sum + "]");
			}
		};
		// 결과값이 없는 작업 처리 요청은 submit(Runnable task)메소드를 이용
		// 결과값이 없음에도 불고하고 Future 객체를 리턴하는데
		// 이것은 스레드가 작업 처리를 정상적으로 완료했는지, 아님 처리 도중 예외가 발생했는지 확인하기 위함
		Future future = executorService.submit(runnable);
		
		// 작업처리가 정상적으로 완료되었다면
		// Future의 get() 메소드는 null을 리턴하지만
		// 스레드가 작업토중 예외를 발생시키고
		// 다음과 같은 예외 처리 코드가 필요하다
		try {
			// 정상적으로 처리되었기 때문에
			// [작업 처리 완료] 가 출력된다.
			future.get();
			System.out.println("[작업 처리 완료]");
		}catch (Exception e) {
			// 작업 처리 도중 스레드가 interrupt되거나
			// 작업 처리 도중 예외가 발생된 경우 실행되는 곳
			System.out.println("[실행 예외 발생] " + e.getMessage());
		}
		// 스레드 풀 모두 종료
		executorService.shutdown();
	}
}

실행결과

[작업 처리 요청]
[처리결과 : 55]
[작업 처리 완료]

리턴값이 있는 작업 완료 통보

스레드풀의 스레드가 작업을 완료한 후에 애플리케이션이 처리 결과를 얻어야 한다면 작업 객체를 Callable로 생성하면 된다.

리턴값이 있는 작업 완료 통보

public class A_ResultByCallable {
	public static void main(String[] args) {
		// 스레드 풀 만들기
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors() ); 
		
		System.out.println("[작업 처리 요청]");
		// 작업 스레드 구현하기
		// V에는 결과로 나올 형타입을 써주면 된다
		Callable<Integer> task = new Callable<Integer>() {			
			@Override
			public Integer call() throws Exception {
				// 작업할 스레드
				int sum = 0;
				for(int i=0; i<=10; i++) {
					sum += i;
				}
				// 정수형을 리턴할거니까 V에 Integer을 넣어준 것이다
				return sum;
			}
		};
		// 스레드풀에 작업 처리 요쳥
		// callable의 변수인 task가 매개값으로 들어간다
		Future<Integer> future = executorService.submit(task);


		// 결과를 얻어온다
		// 근데 .get()이 불안정한가보다 try-catch로 싸준다.
		try {
			int sum = future.get();
			System.out.println("[처리결과 : " + sum + " ]");
			System.out.println("처리 완료");
		} catch (InterruptedException|ExecutionException e) {
			System.out.println("실행 예외 발생 " + e.getMessage());
		}
		
		// 스레드풀 종료하기
		executorService.shutdown();
	}
}

실행 결과

[작업 처리 요청]
[처리결과 : 55 ]
처리 완료

runable 받는 것과 callable 받는 차이

executorService.execute(runable);
Future<Integer> future = executorService.submit(task);

작업 처리 결과를 외부 객체에 저장

public class B_ResultByRunnableEx {
	public static void main(String[] args) {
		// 스레드 만들기
		ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		System.out.println("작업 처리 요청");
		// 내부 클래스 task 생성
		class Task implements Runnable {
			// Result를 받을 필드값 생성
			Result result;
			// 생성자
			Task(Result result) {
				this.result = result;
			}
			@Override
			public void run() {
				// 스레드에 처리할 내용 작업 구현
				int sum = 0;
				for(int i=0; i<=10; i++) {
					sum += i;
				}
				// 외부에 값 보내서 저장시키기
				result.addValue(sum);
			}
		} // end Task
		
		// 두가지 작업 처리를 요청한다
		Result result = new Result();
		Runnable task1 = new Task(result);	// 다형성
		Runnable task2 = new Task(result);	// 다형성
		 
		// 작업 요청
		// Runnable을 연결해 사용해 주었으니
		// executorService.submit(Runnable 객체 변수, 리턴타입객체)
		// Future<V> <= V가 바로 return 타입
		Future<Result> future1 = executorService.submit(task1, result);
		Future<Result> future2 = executorService.submit(task2, result);
		
		// 결과 전달하기
		try {
			result = future1.get();
			result = future2.get();
			System.out.println("처리결과 : " + result.accumValue);
			System.out.println("작업 처리 완료");
		} catch (InterruptedException | ExecutionException e) {
			System.out.println("실행 예외 발생 " + e.getMessage());
		}
		
		// 안전하게 완전히 종료하기
		executorService.shutdown();		
	}
}

// 처리 결과를 저장하는 Result 클래스
// 외부 객체
class Result {
	int accumValue;
	synchronized void addValue(int value) {
		// 받아서 결과 누적하기
		this.accumValue += value;
	}
}

실행 결과

작업 처리 요청
처리결과 : 110
작업 처리 완료

외부객체에 저장하는 것 연습

// 1~10까지 덧셈
// 11.0~10.0 까지 덧셈하기

// 출력할 것
// 1~10까지의 합 :
// 11.0 ~ 20.0까지의 합 :
// 1 ~20까지의 합 :

public class B_ResultByRunnableTest {
	public static void main(String[] args) {
		// 스레드 만들기
		ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		System.out.println("작업 처리 요청");
		// 내부 클래스
		class RetunInt implements Runnable {
			Result1 result;
			public RetunInt(Result1 result) {
				this.result = result;
			}

			@Override
			public void run() {
				int sum = 0;
				for (int i = 1; i <= 10; i++) {
					sum += i;
				}
				result.addValue(sum);
			}
		} // end RetrunInt

		// 내부 클래스
		class RetunDouble implements Runnable {
			Result1 result;

			public RetunDouble(Result1 result) {
				this.result = result;
			}

			@Override
			public void run() {
				double sum = 0;
				for (int i = 11; i <= 20; i++) {
					sum += i;
				}
				result.accValue2(sum);
			}
		} // end RetrunDouble

		// 두가지 작업 처리를 요청한다
		Result1 result = new Result1();
		Runnable task1 = new RetunInt(result); // 다형성
		Runnable task2 = new RetunDouble(result); // 다형성
		
		Future<Result1> future1 = executorService.submit(task1, result);
		Future<Result1> future2 = executorService.submit(task2, result);
		
		try {
			result = future1.get();
			result = future2.get();
			System.out.println("int 처리 결과 : " + result.accValue);
			System.out.println("double 처리 결과 : " + result.accValue2);
			System.out.println("처리결과 : " + (result.accValue + result.accValue2));
			System.out.println("작업 처리 완료");
		} catch (InterruptedException | ExecutionException e) {
			System.out.println("실행 예외 발생 " + e.getMessage());
		}
	}
}

// 처리 결과를 저장하는 Result 클래스
class Result1 {
	int accValue;
	double accValue2;

	synchronized void addValue(int value) {
		accValue += value;
	}

	synchronized void accValue2(double sum) {
		accValue2 += sum;
	}
}

실행결과

작업 처리 요청
int 처리 결과 : 55
double 처리 결과 : 155.0
처리결과 : 210.0
작업 처리 완료

작업 완료 순으로 통보

작업 요청 순서대로 작업처리가 완료 되지는 않는다
작업의 양과 스케줄링 순서에 따라 완료되기 때문이다

처리결과를 순차적으로 이용할 필요가 없고
여러개의 작업들이 순차적으로 처리될 필요가 없다면
스레드 풀에서 작업처리가 완료된 것만 통보받는 방법이 있다.

public class C_ComletionServiceEx {
	public static void main(String[] args) {
		ExecutorService eS = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

		// CompletionService 생성
		// 결과를 통보받기 위해서
		CompletionService<Integer> cS = new ExecutorCompletionService<Integer>(eS);

		System.out.println("작업 처리 요청");

		for (int i = 0; i < 3; i++) {
			// 스레드풀에게 작업 처리 요청
			cS.submit(new Callable<Integer>() {
				@Override
				public Integer call() throws Exception {
					int sum = 0;
					for (int i = 11; i < 20; i++) {
						sum += i;
					}
					return sum;
				}
			});
		} // end for

		System.out.println("처리 완료된 작업 확인");
		// 스레드풀의 스레드에서 실행되도록 함
		eS.submit(new Runnable() {
			@Override
			public void run() {
				while (true) {
					try {
						// 완료된 작업을 가져온다
						// .take(); 부분이
						Future<Integer> future = cS.take();
						// 변수 value에 작업 결과를 저장해준다
						int value = future.get();
						System.out.println("처리 결과 : " + value);
					} catch (Exception e) {
						break;
					}
				}
			}
		});

		// 3초후 스레드풀을 종료하겠다
		try {
			Thread.sleep(3000);
		} catch (Exception e) {	}
		eS.shutdownNow();
	}
}

실행결과

작업 처리 요청
처리 완료된 작업 확인
처리 결과 : 135
처리 결과 : 135
처리 결과 : 135

콜백 방식의 작업 완료 통보

콜백이란 애플리케이션이 스레드에게 작업 처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법을 말한다.
이때, 자동 실행되는 메소드를 콜백 메소드라 한다.

블로킹 방식과 콜백 방식의 차이점

블로킹 방식
: 작업 처리를 요청한 후 작업이 완료될 때까지 블로킹
: 처리 될때까지 다른 기능 수행 불가

콜백 방식
: 작업 처리를 요청한 후 결과를 기다릴 필요가 없다
: 작업처리가 완료되면 자동적으로 콜백 메소드가 실행되어 결과를 알 수 있음

콜백 방식의 작업 완료 통보 받기
1. 스레드풀 만들기
2. Completionhandler 인터페이스의 익명구현
3. 작업 스레드 작성 - 익명 구현 객체
4. 작업 스레드 스레드풀에 넣기

public class D_CallbackEx {
	private ExecutorService excutorService;
	
	public D_CallbackEx() {
		excutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
	}

	// 콜백 메소드를 가진 (콜백 기능이 있는) CompletionHandler 객체 생성
	// CompletionHandler<결과 타입, 첨부 타입>()
	private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>() {
		@Override
		public void completed(Integer result, Void attachment) {
			System.out.println("completed() 실행 : " + result);
		}
		public void failed(Throwable exc, Void attachment) {
			System.out.println("failed() 실행 : " + exc.toString());
		}
	};
	
	// 실행코드가 들어가는 곳
	public void doWork(final String x, final String y) {
		// 작업 스레드(익명 구현 객체로) 작성하기
		Runnable task = new Runnable() {			
			@Override
			public void run() {
				try {
					int intX = Integer.parseInt(x);
					int intY = Integer.parseInt(y);
					int result = intX + intY;
					// 정상처리 했을 때의 호출
					callback.completed(result, null);
				} catch (NumberFormatException e) {
					callback.failed(e, null);
				}
			}
		};
		// 스레드풀에게 작업 처리 요청
		excutorService.submit(task);
	}
	
	// 스레드풀 종료
	public void finish() {
		excutorService.shutdown();
	}
	
	public static void main(String [] args) {
		D_CallbackEx example = new D_CallbackEx();
		example.doWork("3",	"3");
		example.doWork("3", "삼");
		example.finish();
	}
}

0개의 댓글