[JAVA] 쓰레드 - 6.Thread Pool - ExecutorService

유알·2023년 2월 24일
0

Thread Pool

병렬 작업 처리가 많아지게 되면 쓰레드가 계속해서 생성되게 되고, 이는 메모리 사용량이 계속해서 늘어나게 됨을 의미한다.

Thread는 switching 에 비용이 발생하고 시간이 걸리므로, 순차적으로 실행하는 것 보다 오래걸리는데, 이
쓰레드가 계속해서 생성된다면, 메모리는 둘째치더라도 성능저하는 물보듯 뻔한 이야기다.

이를 해결하고자 Thread Pool 이 등장했는데, 쉽게 말하면, 쓰레드를 미리 만들어 놓고 가지고 있다가, 필요한 작업이 있을 때 하나씩 할당해 주는 것이다.

우리가 흔히 쓰는 Spring Boot는 내장서버로 Tomcat 을 가지고 있는데, 이 Tomcat은 쓰레드풀을 만들고 요청당 하나씩 할당해준다.

바로 이 java.util.concurrent.ScheduledThreadPoolExecutor를 사용해서 Tomcat이 쓰레드풀을 만들게 된다.
여기서 보이는 상위 클래스들이 오늘 다뤄볼 것들이다.


ExecutorService

public interface Executor {
	//Runnable을 실행 하는 메서드
    void execute(Runnable command);
}

An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

그러니까 ExecutorSevice는 종료와 비동기 작업을 추적하는 Future을 만드는 메서드를 관리하는 Executor이라는 것이다.
이해가 안되면 그냥 쓰레드 풀 인터페이스 라고 생각

생성

직접 ExecutorService의 구현체를 생성해도 되지만, java.util.concurrent.Excutors가 팩토리 메서드를 제공해준다.

ExecutorService executorService = Executors.newCachedThreadPool();

ExecutorService executorService = Executors.newFixedThreadPool(int nThreads);

ExecutorService executorService = Executors.newSingleThreadExecutor();
//이 외에도 몇개 더 있음

CachedThreadPool
이전에 만들어 놓은 쓰레드 중 사용가능한게 있으면 재활용을 하고, 아닐경우 필요에 따라 쓰레드를 생성하는 쓰레드풀을 만듭니다.
이러한 풀은 일반적으로 수명이 짧은 많은 비동기 작업을 실행하는 프로그램의 성능을 향상시킵니다.
60초 동안 사용되지 않은 스레드는 종료되고 캐시에서 제거됩니다.
따라서 오랫동안 쉬는 상태로 있는 쓰레드 풀은 리소스를 소모하지 않습니다.
속성은 비슷하지만 세부 정보(예: 시간 초과 매개 변수)가 다른 풀은 ThreadPoolExecutor 생성자를 사용하여 생성할 수 있습니다.

FixedThreadPool
공유되는 무제한 대기열에서 작동하는, 고정된 수의 스레드를 재사용하는 스레드 풀을 만듭니다.
어느 시점에서든 대부분의 nThreads 스레드는 활성 처리 작업이 될 것입니다.
모든 스레드가 활성 상태일 때 추가 태스크가 제출되면 스레드가 사용 가능할 때까지 대기열에서 대기합니다.
종료 전에 만약 실행 중 오류로 인해 스레드가 종료되는 경우 후속 작업을 실행하기 위해 필요한 경우 새 스레드를 생성해 그 자리를 대신합니다.
풀의 스레드는 명시적으로 종료될 때까지 존재합니다.

SingleThreadExecutor
무제한 대기열에서 작동하는, 단일 스레드를 사용하는 Excutor를 만듭니다.
필요할 때 제공된 스레드 팩토리를 사용하여 새 스레드를 만듭니다.
FixedThreadPool과 달리 추가 스레드를 사용하지 않음을 보장합니다.

이외에도 다양한 메서드가 Executors에 있다.
또한 이러한 팩토리 메서드를 사용하지 않고 직접 구현체를 생성자를 통해 생성해도 된다.

ExecutorService threadPool = new ThreadPoolExecutor(
    3,                                     // 코어 스레드 개수
    100,                                // 최대 스레드 개수
    120L,                                // 놀고 있는 시간
    TimeUnit.SECONDS,                     // 놀고 있는 시간 단위
    new SynchronousQueue<Runnable>()     // 작업큐
);

종료

ExecutorService는 shut down 될 수 있다. shut down 은 새로운 작업을 거절하는 것을 말한다.

두개의 메서드가 주어진다.

  • shutdown() = 작업큐의 작업까지 모두 처리 후 종료
  • shutdownNow() = 지금 처리 중인 작업을 즉시 중단하고 셧다운하려고 시도

중요한 것은 사용하지 않는 ExecutorService는 셧다운을 통해 종료시켜줘야 리소스를 회수할 수 있다.

쓰레드 작업 생성

Thread는 Runnable을 구현한 객체만 받았던거와는 달리
Callable을 구현한 객체도 처리를 요청할 수 있다.

Runnable task = new Runnable() {
    @Override
    public void run() {
        // 스레드가 처리할 작업 내용
    }
}
Callalbe<T> task = new Callable<T>() {
    @Override
    public T call throws Exception {
        // 스레드가 처리할 작업 내용
        return T;
    }
}

차이점은 리턴값이 있고, throw Exception이 붙어 있다는 점이다.

Excutors에 Runnable을 Callable로 바꾸는 static 메서드가 있다.

쓰레드 처리 요청

우선 처리 요청에는 두가지가 있다.

  • execute()
  • submit()

execute()는 부모 인터페이스인 Executor 에 정의된 메서드이다.
Runnable만 인자로 넣을 수 있다.

public interface Executor {
	//Runnable을 실행 하는 메서드
    void execute(Runnable command);
}

submit()은 이 execute()를 확장한 메서드로 리턴값으로 Future를 반환하며, CallableRunnable두개를 모두 인자로 받을 수 있다.

public interface ExecutorService extends Executor {
	//...
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    //...

예시

package utils;

import java.time.LocalTime;
import java.util.concurrent.*;

public class CallableExample2 {

    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            String result = "Called at " + LocalTime.now();
            return result;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyCallable callable = new MyCallable();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(callable);

        // 결과가 리턴되기를 기다립니다.
        System.out.println("result : " + future.get());
    }
}

다른 예시

package utils;

import java.time.LocalTime;
import java.util.concurrent.*;

public class CallableExample2 {

    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            String result = "Called at " + LocalTime.now();
            return result;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyCallable callable = new MyCallable();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(callable);

        // 결과가 리턴되기를 기다립니다.
        System.out.println("result : " + future.get());
    }
}

Future

Future은 실행을 취소하거나, 실행을 기다릴 때 사용되는 객체이다.
Future의 메서드들은 실행이 완료되었는지 확인하고, 기다리고 결과값을 반환하는 일을 합니다.

V get()메서드를 통해 실행이 완료되었을 때 결과 값을 가져올 수 있고, 결과값이 준비될 때까지 blocking됩니다.

boolean cancel(boolean mayInterruptIfRunning) 메서드로 작업을 취소할 수 있습니다.
취소는 이미 취소되었거나, 실행이 완료되었거나, 혹은 어떤 이유에 의해 실패할 수 있으며 이때는 false를 반환합니다.
취소가 성공하면, 실행 전 대기하고 있는 업무는 실행하지 않게 됩니다. 만약 실행중이라면 전달된 파라미터에 따라 달라지는데

  • true : 강제종료
  • false : 현재 실행중인건 다 완료될 때까지 기다리기

번외

실행중 발생한 예외를 알고 싶다면?

ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();

// Submitting threads to executor service
for (int i = 0; i < 5; i++) {
    futures.add(executorService.submit(() -> {
        // Code for each thread
        // For example, if an exception occurs:
        throw new RuntimeException("Exception in thread " + Thread.currentThread().getName());
    }));
}

// Checking for exceptions in threads
for (Future<?> future : futures) {
    try {
        future.get(); // This will throw an ExecutionException if an exception occurred in the thread
    } catch (InterruptedException e) {
        // Handle InterruptedException
    } catch (ExecutionException e) {
        Throwable cause = e.getCause();
        if (cause instanceof RuntimeException) {
            System.out.println(cause.getMessage()); // Print the exception message
        }
        // Handle other types of exceptions
    }
}

// Shutting down the executor service
executorService.shutdown();
profile
더 좋은 구조를 고민하는 개발자 입니다

0개의 댓글