Java Thread Pool 을 직접 구현해보자(1)

Jongmyung Choi·2023년 12월 28일

네티 개발자 이희승님의 라이브를 참고하여 실습하고 글을 작성하였습니다.
라이브 주소: https://www.youtube.com/watch?v=YVrFretPo8M

라이브를 따라가며 차근차근 개선시켜나가는 것을 기록한 글이기 때문에 구현->문제점 발견->해결->문제점 발견->해결->... 식으로 글을 작성하였습니다.(답답 주의)


여러개의 포스팅이 될 것 같은데 이번글에서는 가장 원시적인 형태의 쓰레드 풀을 만들어보고 발생하는 문제점에 대해 조금씩 리팩토링 하는식으로 진행할 것이다.

최종적인 목표는 자바에서 제공하는 쓰레드 풀 라이브러리 정도의 완성도를 가진 쓰레드 풀을 구현해보는 것이다!

1. 원시적인 쓰레드 풀

public class ThreadPool implements Executor {

	private final BlockingQueue<Runnable> queue= new LinkedTransferQueue<>();
	private final Thread[] threads;

	public ThreadPool(int numThreads) {
		threads = new Thread[numThreads];
		for (int i = 0; i < numThreads; i++) {
			threads[i]=new Thread(()->{
				try {
					for(;;) {
						final Runnable task = queue.take();
						task.run();
					}
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			});
			threads[i].start();
		}
	}
	@Override
	public void execute(Runnable command) {
		queue.add(command);
	}
}

간단하게 스레드 풀을 구현한 코드이다.
아주 러프하지만 코드에 대한 설명을 하자면 다음과 같다.
클래스선언
Executor 인터페이스를 구현한 ThreadPool 클래스를 선언하였다.

변수 선언
queue : 실행할 작업을 저장 하는데에 사용된다.
LinkedTransferQueue : 높은 처리량을 제공하는 동시성 큐 이다.
threads : 스레드 풀의 스레드를 보관하는 배열이다.

생성자
입력된 numThreads 수만큼 스레드를 생성하고 시작한다.
각 스레드는 무한 루프(for(;;)) 안에서 큐에서 작업을 가져와 실행한다.
InterruptedException이 발생하면 런타임 예외를 던진다. 이는 스레드가 중단될 때 발생한다.

execute 메소드
주어진 Runnable 작업을 큐에 추가한다.

전체 흐름

  1. ThreadPool 객체가 생성되고, 생성자 내에서 스레드들이 생성되어 시작된다.
  2. 각 스레드는 queue.take()를 호출하여 큐에서 작업을 기다린다. 이 시점에서 큐는 비어 있으므로, 스레드들은 블로킹 상태에 들어간다.
  3. 어느 시점에 execute 메소드가 호출되어 큐에 새로운 Runnable 작업이 추가된다.
  4. 큐에 작업이 들어오면, 대기 중이던 스레드 중 하나가 깨어나 작업을 가져가 실행한다.

문제점

1. 스레드 즉시 시작

ThreadPool의 생성자에서 스레드를 즉시 시작하고 있다. 이는 ThreadPool 객체가 생성되는 즉시 모든 스레드가 활성화되며, 실제 작업이 큐에 추가되기 전에도 시스템 리소스를 소비한다는 것을 의미한다.

2. 예외 처리

InterruptedException이 발생하면 RuntimeException을 던진다. 이는 스레드가 예외 상황에 적절히 대응하지 못하고, 예외가 발생할 경우 스레드 풀의 안정성에 영향을 미칠 수 있다.

1번을 해결하고자 다음과 같이 코드를 변경해보았다.

변경된 코드

public class ThreadPool implements Executor {

	private final BlockingQueue<Runnable> queue = new LinkedTransferQueue<>();
	private final Thread[] threads;
	private final AtomicBoolean started = new AtomicBoolean();

	public ThreadPool(int numThreads) {
		threads = new Thread[numThreads];
		for (int i = 0; i < numThreads; i++) {
			threads[i] = new Thread(() -> {
				try {
					for (; ; ) {
						final Runnable task = queue.take();
						task.run();
					}
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			});
		}
	}

	@Override
	public void execute(Runnable command) {
		if (started.compareAndSet(false, true)) {
			for (Thread thread : threads) {
				thread.start();
			}
		}
		queue.add(command);
	}
}

AtomicBoolean 변수 started를 사용하여 스레드의 시작을 execute메소드가 처음 호출될 때까지 지연시켰다. -> 리소스 사용 최적화
또한 AtomicBooleancompareAndSet 메소르를 사용하여 멀티스레드 환경에서도 동시성을 안전하게 관리하였다. -> 경쟁 상태 방지

테스트

public class ThreadPoolTest {

	@Test
	void submittedTasksAreExecuted() {
		final Executor executor = new ThreadPool(2);
		for (int i = 0; i < 10; i++) {
			int finalI = i;
			executor.execute(()->{
				System.err.println("Thread " + Thread.currentThread().getName() + " executes a task" + finalI);
			});
		}
	}
}

다음과 같이 테스트코드를 작성하고 실행해보자.

한 스레드 안에서는 순서가 맞지만 다른 스레드에서는 순서가 좀 다른걸 볼 수 있다.
Task 실행시간이 너무 짧으니까 하나의 스레드에서 대부분의 작업을 처리하고자 해서 3의 비율이 좀 높게 나온 것 같다.
횟수도 좀 늘리고 10ms 만큼의 시간을 줘보자.

public class ThreadPoolTest {

	@Test
	void submittedTasksAreExecuted() {
		final Executor executor = new ThreadPool(2);
		for (int i = 0; i < 100; i++) {
			int finalI = i;
			executor.execute(()->{
				try {
					Thread.sleep(10);
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
				System.err.println("Thread " + Thread.currentThread().getName() + " executes a task" + finalI);
			});
		}
	}
}

이렇게 하였는데 테스트 메소드가 작업이 완료될 때 까지 기다리지 않고 종료되버린다.

따라서 다음과 같이 수정하였다.

	@Test
	void submittedTasksAreExecuted() throws InterruptedException {
		final Executor executor = new ThreadPool(2);
		final int numTasks = 100;
		final CountDownLatch latch = new CountDownLatch(100);
		for (int i = 0; i < numTasks; i++) {
			int finalI = i;
			executor.execute(()->{
				System.err.println("Thread " + Thread.currentThread().getName() + " executes a task" + finalI);
				try {
					Thread.sleep(10);
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
				latch.countDown();
			});
		}
		latch.await();
	}

countDownLatch를 사용하여 100개의 작업이 모두 완료될 때 까지 기다리고 latch.await()를 통해 모든 카운트가 0이 될때까지 현재 스레드를 블로킹 하였다.

이렇게 가장 원초적인? 방식으로 쓰레드 풀을 만들어 보았는데
이 방식에는 여전히 문제점이 있다.

  1. 쓰레드를 만들기만 하고 제거하지(정지시키지)는 않는다.
  2. 실행시간이 짧을경우에는 하나의 스레드에서 대부분을 처리하기 때문에 필요한 만큼만 만들어야한다.
  3. 위에서 해결하지 않은 에러(InterruptedExecption) 발생시 쓰레드가 정지하게 된다.

다음 글에서는 이러한 문제점을 해결해보도록 하자.

profile
총명한 개발자

0개의 댓글