이전 글에서는 Java 비동기 프로그래밍에 사용되는 Future와 Callback 패턴을 알아 보았는데요.
오늘은 Java에서 스레드 풀을 관리하는 방법에 대해 알아보겠습니다. Java는 병렬 처리 및 멀티스레드 환경을 효율적으로 관리하기 위해 ExecutorService라는 인터페이스를 제공합니다. 이를 통해 우리는 스레드 풀을 쉽게 생성하고 제어할 수 있는데요.
이번 글에서 ExecutorService와 ScheduledExecutorService를 자세하게 뜯어본 뒤, 다음 글에서 ExecutorService의 Factory 역할을 하는 Executors에 대해 알아봅시다.
ExecutorService는 Java에서 병렬 처리와 비동기 작업을 효율적으로 관리하기 위한 중요한 인터페이스입니다. 이 인터페이스는 Executor를 확장하여, 단순히 작업을 실행하는 것 외에도 스레드 풀을 통해 작업을 관리하고 종료할 수 있는 더 다양한 기능을 제공합니다.
물론 ExecutorService 없이도 Thread를 그냥 여러 개 생성해서 병렬로 처리해도 되겠지만 Thread는 스위칭 작업을 할 때 비용 및 시간이 발생하기 때문에 순차적으로 실행하는 것보다 더 오래걸릴 수 있습니다.
따라서 Thread를 잘못 사용하면 오히려 순차적으로 작업할 때 보다 성능이 더 안 좋을 수 있기 때문에 미리 사용할 Thread를 정의하고 필요할 때 하나씩 할당하도록 ExecutorService을 사용하는 것이 좋습니다.

1. void execute(Runnable r)
주어진 Runnable 작업을 스레드 풀에 제출해 비동기적으로 실행합니다. 작업이 완료되면 결과를 반환하지 않고 바로 종료되기 때문에 주로 결과가 필요하지 않은 작업을 실행할 때 사용합니다.

ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> {
System.out.println("Runnable 작업 실행 중...");
});
executor.shutdown();
2. Future submit(..)

Callable(또는 Runnable) 작업을 제출하고 그 작업의 결과를 담고 있는 Future 객체를 즉시 반환합니다. 작업이 완료되면 Future 객체에 결과가 반영되고, get()을 통해 결과를 얻을 수 있습니다.

ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Integer> future = executor.submit(() -> {
// 시간이 오래 걸리는 작업
Thread.sleep(1000);
return 42;
});
// 비동기 작업이 완료되면 Future.get()으로 결과 얻기 가능
System.out.println("작업 결과: " + future.get());
executor.shutdown();
3. 작업 중단 및 종료 API
ExecutorSevice는 스레드 풀을 종료하기 위해 2가지 메서드를 제공합니다.

shutdown을 호출한 스레드는 실행 중인 작업이 종료될 때까지 기다리지 않고 바로 다음 라인을 실행합니다. 만약 스레드가 작업이 종료될 때까지 기다리길 원한다면 awaitTermination을 사용해야 합니다.
shutdown 후 작업을 제출하려고 시도하면 RejectedExecutionException 예외가 발생합니다.


4. 다중 작업 처리 API
invokeAll()은 여러 개의 Callable 작업을 동시에 비동기적으로 실행하고 모든 작업이 완료될 때까지 블록되며 각 작업의 결과를 나타내는 Future 객체의 리스트를 반환합니다. 따라서 invokeAll()이 완료되는 시점은 가장 오래 걸리는 작업이 완료되는 시점입니다.
Future의 결과의 순서는 완료 순서에 상관없이 Collection으로 제출한 Callable 순서대로 정렬되어서 반환됩니다. 여기서 완료는 정상 종료 뿐만 아니라 예외가 발생한 경우도 완료라고 생각합니다.


@Test
void invokeAll_테스트() {
// given
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<Integer>> tasks = new ArrayList<>();
tasks.add(() -> {
throw new RuntimeException("invokeAll");
});
tasks.add(() -> {
Thread.sleep(1000);
return 1;
});
tasks.add(() -> {
Thread.sleep(2000);
return 1;
});
// when
List<Future<Integer>> results = null;
try {
results = executorService.invokeAll(tasks);
// then
assertEquals(3, results.size(), "모든 작업이 제출되었는지 확인");
int successfulTaskCount = 0;
for (Future<Integer> future : results) {
try {
int value = future.get();
// 성공한 작업의 결과 확인
assertTrue(value == 1, "성공한 작업의 결과 확인");
successfulTaskCount++;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// 예외가 발생한 작업
if (cause instanceof RuntimeException) {
assertEquals("invokeAll", cause.getMessage(), "예외 메시지 확인");
} else {
fail("Unexpected exception type");
}
}
}
assertEquals(2, successfulTaskCount, "성공적으로 완료된 작업 개수 확인");
} catch (InterruptedException e) {
fail("테스트가 InterruptedException으로 중단되었습니다.");
} finally {
executorService.shutdown();
}
}
invokeAny()는 여러 개의 Callable 작업을 비동기적으로 동시에 실행한 뒤 그 중 가장 빠르게 성공적으로 완료한 작업의 결과를 반환합니다. 이때는 예외가 발생한 경우를 성공적인 완료라고 보지 않습니다.
여러 작업 중 어떤 것 하나라도 성공적으로 완료하면 블로킹이 해제되고 해당 작업의 결과를 반환하고, 실행중이던 완료되지 않은 작업들은 모두 취소시킵니다.


@Test
void invokeAny_테스트() {
// given
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<Integer>> tasks = new ArrayList<>();
tasks.add(() -> {
Thread.sleep(3000);
return 1;
});
tasks.add(() -> {
Thread.sleep(1000);
return 2;
});
tasks.add(() -> {
throw new RuntimeException("invokeAny 예외");
});
// when
Integer result = null;
try {
result = executorService.invokeAny(tasks);
// 가장 빨리 완료된 작업이 반환되는지 확인
assertEquals(2, result, "가장 먼저 완료된 작업의 결과가 2인지 확인");
} catch (ExecutionException e) {
fail("예상치 못한 ExecutionException 발생");
} catch (InterruptedException e) {
fail("테스트가 InterruptedException으로 중단되었습니다.");
} finally {
executorService.shutdown();
}
}
ScheduledExecutorService는 주어진 지연 시간 후에 작업을 실행하거나 특정한 주기를 가지고 작업을 반복적으로 실행할 수 있는 인터페이스입니다.
ExecutorService를 구현했기 때문에 마찬가지로 팩토리 역할의 Executors가 존재합니다.

schedule()은 주어진 지연 시간 이후에 Runnable이나 Callable로 전달된 작업을 예약하고, ScheduledFuture를 즉시 반환합니다. 지연 시간이 경과하면 주어진 작업이 딱 한 번 실행됩니다.

ScheduledFuture는 주로 지연이나 주기적인 작업 실행을 위한 것으로 getDelay(TimeUnit unit)으로 작업이 실행 되기까지 남은 지연 시간을 반환받을 수 있습니다.



서버 상태는 scheduleAtFixedRate()로 5초마다 주기적으로 체크하고, 로그는 scheduleWithFixedDelay()를 사용해 작업이 끝난 후 3초마다 로그 데이터를 처리합시다.
public class MonitoringAndLogProcessingSystem {
public static void main(String[] args) {
// 2개짜리 스레드 풀을 만들어줍니다~ Executors로 스레드풀 만드는건 다음 글에서!
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 1. 서버 상태 모니터링 작업 (5초마다)
ScheduledFuture<?> monitoringTask = scheduler.scheduleAtFixedRate(() -> {
// 서버 상태 체크 로직 (ex: HTTP 요청으로 상태 확인)
System.out.println("서버 상태 확인 중..");
}, 0, 5, TimeUnit.SECONDS);
// 2. 백그라운드 로그 처리 (이전 로그 처리 후(2초 소요) 3초 주기로)
ScheduledFuture<?> logProcessingTask = scheduler.scheduleWithFixedDelay(() -> {
// 로그 처리 로직(ex: 로그 읽어서 DB에 저장)
System.out.println("로그 처리 중..");
try {
Thread.sleep(2000); // 2초 소요
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 0, 3, TimeUnit.SECONDS);
// 15초 후 일회성 스케줄링으로 서버 모니터링 작업 중단
scheduler.schedule(() -> {
System.out.println("서버 모니터링 중단");
monitoringTask.cancel(true);
}, 15, TimeUnit.SECONDS);
// 20초 후 일회성 스케줄링으로 로그 처리 작업 중단
scheduler.schedule(() -> {
System.out.println("로그 처리 중단");
logProcessingTask.cancel(true);
}, 20, TimeUnit.SECONDS);
}
}

하나하나 뜯어보면서 하니까 기억에 더 오래 남는 것 같네요! 다음 포스트에서는 ExecutorService의 팩토리 역할을 하는 Executors를 자세히 뜯어보겠습니다.
감사합니다 🤭