네티 개발자 이희승님의 라이브를 참고하여 실습하고 글을 작성하였습니다.
라이브 주소: https://www.youtube.com/watch?v=YVrFretPo8M
라이브를 따라가며 차근차근 개선시켜나가는 것을 기록한 글이기 때문에 구현->문제점 발견->해결->문제점 발견->해결->... 식으로 글을 작성하였습니다.(답답 주의)
여러개의 포스팅이 될 것 같은데 이번글에서는 가장 원시적인 형태의 쓰레드 풀을 만들어보고 발생하는 문제점에 대해 조금씩 리팩토링 하는식으로 진행할 것이다.
최종적인 목표는 자바에서 제공하는 쓰레드 풀 라이브러리 정도의 완성도를 가진 쓰레드 풀을 구현해보는 것이다!
public class ThreadPool implements Executor {
private final BlockingQueue<Runnable> queue= new LinkedTransferQueue<>();
private final Thread[] threads;
public ThreadPool(int numThreads) {
threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i]=new Thread(()->{
try {
for(;;) {
final Runnable task = queue.take();
task.run();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
threads[i].start();
}
}
@Override
public void execute(Runnable command) {
queue.add(command);
}
}
간단하게 스레드 풀을 구현한 코드이다.
아주 러프하지만 코드에 대한 설명을 하자면 다음과 같다.
클래스선언
Executor 인터페이스를 구현한 ThreadPool 클래스를 선언하였다.
변수 선언
queue : 실행할 작업을 저장 하는데에 사용된다.
LinkedTransferQueue : 높은 처리량을 제공하는 동시성 큐 이다.
threads : 스레드 풀의 스레드를 보관하는 배열이다.
생성자
입력된 numThreads 수만큼 스레드를 생성하고 시작한다.
각 스레드는 무한 루프(for(;;)) 안에서 큐에서 작업을 가져와 실행한다.
InterruptedException이 발생하면 런타임 예외를 던진다. 이는 스레드가 중단될 때 발생한다.
execute 메소드
주어진 Runnable 작업을 큐에 추가한다.
ThreadPool의 생성자에서 스레드를 즉시 시작하고 있다. 이는 ThreadPool 객체가 생성되는 즉시 모든 스레드가 활성화되며, 실제 작업이 큐에 추가되기 전에도 시스템 리소스를 소비한다는 것을 의미한다.
InterruptedException이 발생하면 RuntimeException을 던진다. 이는 스레드가 예외 상황에 적절히 대응하지 못하고, 예외가 발생할 경우 스레드 풀의 안정성에 영향을 미칠 수 있다.
1번을 해결하고자 다음과 같이 코드를 변경해보았다.
public class ThreadPool implements Executor {
private final BlockingQueue<Runnable> queue = new LinkedTransferQueue<>();
private final Thread[] threads;
private final AtomicBoolean started = new AtomicBoolean();
public ThreadPool(int numThreads) {
threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(() -> {
try {
for (; ; ) {
final Runnable task = queue.take();
task.run();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
@Override
public void execute(Runnable command) {
if (started.compareAndSet(false, true)) {
for (Thread thread : threads) {
thread.start();
}
}
queue.add(command);
}
}
AtomicBoolean 변수 started를 사용하여 스레드의 시작을 execute메소드가 처음 호출될 때까지 지연시켰다. -> 리소스 사용 최적화
또한 AtomicBoolean의 compareAndSet 메소르를 사용하여 멀티스레드 환경에서도 동시성을 안전하게 관리하였다. -> 경쟁 상태 방지
public class ThreadPoolTest {
@Test
void submittedTasksAreExecuted() {
final Executor executor = new ThreadPool(2);
for (int i = 0; i < 10; i++) {
int finalI = i;
executor.execute(()->{
System.err.println("Thread " + Thread.currentThread().getName() + " executes a task" + finalI);
});
}
}
}
다음과 같이 테스트코드를 작성하고 실행해보자.

한 스레드 안에서는 순서가 맞지만 다른 스레드에서는 순서가 좀 다른걸 볼 수 있다.
Task 실행시간이 너무 짧으니까 하나의 스레드에서 대부분의 작업을 처리하고자 해서 3의 비율이 좀 높게 나온 것 같다.
횟수도 좀 늘리고 10ms 만큼의 시간을 줘보자.
public class ThreadPoolTest {
@Test
void submittedTasksAreExecuted() {
final Executor executor = new ThreadPool(2);
for (int i = 0; i < 100; i++) {
int finalI = i;
executor.execute(()->{
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.err.println("Thread " + Thread.currentThread().getName() + " executes a task" + finalI);
});
}
}
}
이렇게 하였는데 테스트 메소드가 작업이 완료될 때 까지 기다리지 않고 종료되버린다.

따라서 다음과 같이 수정하였다.
@Test
void submittedTasksAreExecuted() throws InterruptedException {
final Executor executor = new ThreadPool(2);
final int numTasks = 100;
final CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < numTasks; i++) {
int finalI = i;
executor.execute(()->{
System.err.println("Thread " + Thread.currentThread().getName() + " executes a task" + finalI);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
latch.countDown();
});
}
latch.await();
}
countDownLatch를 사용하여 100개의 작업이 모두 완료될 때 까지 기다리고 latch.await()를 통해 모든 카운트가 0이 될때까지 현재 스레드를 블로킹 하였다.

이렇게 가장 원초적인? 방식으로 쓰레드 풀을 만들어 보았는데
이 방식에는 여전히 문제점이 있다.
InterruptedExecption) 발생시 쓰레드가 정지하게 된다.다음 글에서는 이러한 문제점을 해결해보도록 하자.