[Java] Callable, Executors, ExecutorService 의 이해 및 사용법

이수진·2023년 7월 4일
1
post-thumbnail

최근에 진행하고있는 프로젝트에서는 id 값을 auto-increment로 할당해주는 것이 아니라
직접 만든 id-generator로 id값을 할당을 해줍니다.

이때 주의해야 할 점은 동시에 여러 요청이 들어왔을때 이를 어떻게 처리해야 할 지에 대한,
즉 동시성 요청에 대한 문제를 해결해주어야 합니다.
id값은 고유 식별값이므로, 이에 대한 동시성 문제는 매우 중요합니다.

해당 기능에 대한 개발을 완료하고 이에 대한 동시성 테스트가 필요한데,
여러 멀티 쓰레드로 동시 요청이 들어오는 상황을 만들어 이를 테스트하고자 하였습니다.

이를 위해 필요한 Callable, Executors, ExecutorService에 대해 알아보도록 하겠습니다.

1. Callable 인터페이스란? (vs. Runnable)

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

기존의 Runnable 인터페이스는 결과를 반환할 수 없다는 한계점이 있었습니다. 반환값을 얻으려면 공용 메모리나 파이프 등을 사용해야 했는데, 이러한 작업은 상당히 번거롭습니다.

그래서 Runnable의 발전된 형태로써, Java5에 함께 추가된 제네릭을 사용해 “결과를 받을 수 있는” Callable이 추가되었습니다.

Runnable 의 인터페이스도 살펴보면 다음과 같습니다.

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

즉, Callable과 Runnable의 차이는 작업의 결과를 받을 수 있느냐 없느냐입니다.

2. Future 인터페이스란?

Callable 인터페이스의 구현체인 작업(Task)은 가용 가능한 쓰레드가 없어서 실행이 미뤄질 수 있고, 작업 시간이 오래 걸릴 수도 있습니다.

그래서 실행 결과를 바로 받지 못하고 미래의 어느 시점에 얻을 수 있는데, 미래에 완료된 Callable의 반환값을 구하기 위해 사용되는 것Future입니다.

즉, Future는 비동기 작업을 갖고 있어 미래에 실행 결과를 얻도록 도와줍니다.
이를 위해 비동기 작업의 현재 상태를 확인하고, 기다리며, 결과를 얻는 방법 등을 제공합니다

Future의 인터페이스를 살펴보며 다음과 같습니다.

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

여기서 get() 은 blocking 방식으로 결과를 가져오며, 타임아웃 설정이 가능합니다

3. Executor 인터페이스

동시에 여러 요청을 처리해야 하는 경우 매번 새로운 쓰레드를 만드는 것은 비효율적입니다.
그래서 쓰레드를 미리 만들어두고 재사용하기 위한 쓰레드 풀(Thread Pool)이 등장하게 되었는데,
Executor 인터페이스는 쓰레드 풀의 구현을 위한 인터페이스입니다.

이러한 Executor 인터페이스를 간단히 정리하면 다음과 같습니다.

  • 등록된 작업(Runnable)을 실행하기 위한 인터페이스
  • 작업 등록과 작업 실행 중에서 작업 실행만을 책임짐

쓰레드는 크게 작업의 등록실행으로 나누어집니다. 그 중에서도 Executor 인터페이스는 인터페이스 분리 원칙에 맞게 등록된 작업을 실행하는 책임만 갖습니다. 그래서 전달받은 작업(Runnable)을 실행하는 메소드만 가지고 있습니다.

Executor 인터페이스의 코드를 살펴보면 다음과 같습니다.

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

The Executor implementations provided in this package implement ExecutorService, which is a more extensive interface. The ThreadPoolExecutor class provides an extensible thread pool implementation. The Executors class provides convenient factory methods for these Executors.
Memory consistency effects: Actions in a thread prior to submitting a Runnable object to an Executor happen-before its execution begins, perhaps in another thread.

즉, ExecutorService 구현체로 더 광범위하게 쓰이며, Executors 클래스는 더 편리한 팩토리 메서드들을 제공합니다.
또한 메모리 일관성 효과(?)란 한 스레드에서 작업이 끝나기 전에 다른 스레드에서 작업이 진행됩니다.

이러한 Executor 인터페이스는 해당 작업의 실행과 스레드의 사용 및 스케줄링 등으로부터 벗어날 수 있도록 도와줍니다.

4. ExecutorService 인터페이스

ExecutorService 인터페이스작업(Runnable, Callable) 등록을 위한 인터페이스입니다.
ExecutorService는 Executor를 상속받아서 작업 등록 뿐만 아니라 실행을 위한 책임도 갖습니다.
그래서 쓰레드 풀은 기본적으로 ExecutorService 인터페이스를 구현합니다. 대표적으로 ThreadPoolExecutor가 ExecutorService의 구현체인데, ThreadPoolExecutor 내부에 있는 블로킹 큐에 작업들을 등록해둡니다. 그리고 각 작업들을 Thread Pool의 사용 가능한 쓰레드들에 할당해서 작업을 수행하게 됩니다. 만약에 사용가능한 쓰레드가 없다면 작업은 큐에 대기하게 되고, 쓰레드가 작업이 끝나면 큐에 있는 다음 작업을 할당받게 됩니다.

멀티쓰레드의 구현을 위해서는 대부분 다음과 같이 ExecutorService 구현체를 생성해서 여러 작업들을 동시에 실행시키게 됩니다.

ExecutorService 인터페이스의 코드를 살펴보면 다음과 같습니다.

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService가 제공하는 퍼블릭 메소드들은 다음과 같이 분류가 가능합니다.

  • 라이프사이클 관리를 위한 기능들 - ex) shutdown()
  • 비동기 작업을 위한 기능들 - ex) invokeAll()

가장 대표적인 메서드이자, 제가 사용했던 메서드들만 간단히 소개하고자 합니다.

4.1 라이프사이클을 관리하기 위한 메서드 - shutdown()

ExecutorService를 만들어 작업을 실행하면, shutdown() 이 호출되기 전까지 계속해서 다음 작업을 대기하게 됩니다.
그러므로 작업이 완료되었다면 반드시 shutdown()을 명시적으로 호출해주어야 합니다.

4.2 비동기 작업을 위한 메서드 - invokeAll()

ExecutorService는 Runnable과 Callable을 작업으로 사용하기 위한 메서드를 제공합니다.

여러 작업들을 동시에 실행시키는 메소드도 제공하고 있는데( -invokeAll() 메소드), 비동기 작업의 진행을 추적할 수 있도록 Future를 반환합니다. (반환된 Future들은 모두 실행된 것이므로 반환된 isDone은 true입니다. 하지만, 작업들은 정상적으로 종료되었을 수도 있고, 예외에 의해 종료되었을 수도 있으므로 항상 성공한 것은 아닐 수도 있습니다.)

invokeAll() 메소드는 다음과 같습니다.

  • invokeAll()
    • 주어진 작업들을 동시에 모두 실행하고, 전부 끝나면 각각의 상태와 결과를 갖는 List을 반환함
    • 모든 결과가 나올 때까지 대기하는 Blocking 방식의 요청
    • 또한, invokeAll은 최대 쓰레드 풀의 크기만큼 작업을 동시에 실행시킵니다. 그러므로 쓰레드가 충분하다면 동시에 실행되는 작업 중에서 가장 오래 걸리는 작업만큼 시간이 소요됩니다. 하지만, 만약 쓰레드가 부족하다면 대기되는 작업들이 발생하므로 가장 오래 걸리는 작업의 시간에 더해 추가 시간이 필요합니다.

5. Executors 인터페이스

Executor, ExecutorService는 모두 쓰레드 풀을 위한 인터페이스입니다.

직접 쓰레드를 다루는 것은 번거로우므로, 이를 도와주는 팩토리 클래스인 Executors가 등장하게 되었습니다.
Executors는 High-Level의 동시성 프로그래밍 모델로써, Executor, ExecutorService를 구현한 쓰레드 풀을 손쉽게 생성해줍니다.

대표적인 정적 팩토리 메서드로는 newFixedThreadPool() 을 예시로 보면,

  • newFixedThreadPool()
    • 고정된 쓰레드 개수를 갖는 쓰레드 풀을 생성함
    • ExecutorService 인터페이스를 구현한 ThreadPoolExecutor 객체가 생성됨

Executors를 통해 쓰레드의 개수 및 종류를 정할 수 있으며, 이를 통해 쓰레드 생성과 실행 및 관리가 매우 용이해집니다. 하지만, 쓰레드 풀을 생성할 때에는 주의해야 하는 점이, 2개의 쓰레드를 갖는 쓰레드 풀을 생성했는데, 3개의 작업을 동시에 실행시킨다면 나머지 1개의 작업은 실행되지 못하고 블로킹 큐에 대기하게 됩니다. 그러다 쓰레드가 작업이 끝내고 가용가능한 쓰레드가 될 때에 대기하고 있던 작업이 실행이 됩니다.

6. 코드 예시

해당 Callable, Executors, ExecutorService 코드를 통해 예제로 살펴봅시다.

10개의 가용 쓰레드를 쓰레드 풀에 생성하고, 10개의 작업을 동시에 모두 실행시켜보겠습니다

내부에 static class를 정의하고,
해당 클래스는 Callable 인터페이스를 구현합니다

static class CallableEx implements Callable<String> {
		private int idx;

		CallableEx(int idx) {
			this.idx = idx;
		}

		@Override
		public String call() throws Exception {
			LocalTime startTime = LocalTime.now();
			System.out.println("Thread: " + Thread.currentThread().getName() + ", call idx: " + idx + ", startTime: " + startTime);
			LocalTime endTime = LocalTime.now();
			return String.valueOf("idx: " + idx + ", duration: " + Duration.between(startTime, endTime).toMillis());
		}
	}

작업을 수행시에 현재 쓰레드가 무엇이고, for문의 idx, 그리고 작업 시작 시간 정보까지 함께 출력하도록 하였습니다.
그리고 반환값으로 idx 와 작업 수행동안 총 걸린 시간값 정보를 더해 문자열로 반환하도록 하였습니다.

	@Test
	void callable_void() throws InterruptedException, ExecutionException {
		int tasksNum = 10;
		ExecutorService executorService = Executors.newFixedThreadPool(10);

		List<CallableEx> callableExList = new ArrayList<>();
		for(int i=0; i<tasksNum; i++) {
			callableExList.add(new CallableEx(i+1));
		}

		System.out.println("-------작업 실행------ ");
		List<Future<String>> futures = executorService.invokeAll(callableExList);
		System.out.println("-------작업 종료------\n");

		System.out.println("-------결과 출력------ ");
		for (Future<String> future : futures) {
			System.out.println(future.get());
		}

		executorService.shutdown();
	}

10개의 가용가능한 쓰레드를 쓰레드 풀에 생성하고,
총 10개의 테스크를 동시에 모두 실행하도록 테스트를 정의했습니다.

해당 테스트를 수행한 결과는 다음과 같습니다.

총 가용가능한 10개의 쓰레드가 각각의 작업을 할당하여 수행함을 알 수 있습니다. 멀테쓰레드로 순서와는 상관없이 진행이 됩니다.
그리고 결과 Future에는 순서가 동기화되어 저장되어 순서대로 출력됨까지 확인할 수 있었습니다.

만약, 작업량보다 쓰레드 풀의 가용영역 쓰레드가 더 적으면 어떻게 될까요?
작업량은 똑같이 10개로 하되, 쓰레드 풀의 가용가능한 쓰레드를 2개로 줄여 돌려보도록 하겠습니다.

	@Test
	void callable_void() throws InterruptedException, ExecutionException {
		int tasksNum = 10;
		ExecutorService executorService = Executors.newFixedThreadPool(2); // 쓰레드 풀의 가용가능한 쓰레드를 2개로 제한

		List<CallableEx> callableExList = new ArrayList<>();
		for(int i=0; i<tasksNum; i++) {
			callableExList.add(new CallableEx(i+1));
		}

		System.out.println("-------작업 실행------ ");
		List<Future<String>> futures = executorService.invokeAll(callableExList);
		System.out.println("-------작업 종료------\n");

		System.out.println("-------결과 출력------ ");
		for (Future<String> future : futures) {
			System.out.println(future.get());
		}

		executorService.shutdown();
	}

쓰레드 하나는 한 작업만을 처리할 수 있으므로, 현재 작업 외의 쓰레드들은 모두 블로킹 큐에 대기하고 있다가 해당 쓰레드의 작업이 끝난 후에 할당되어 수행됨을 알 수 있습니다

[Reference]

profile
꾸준히, 열심히, 그리고 잘하자

0개의 댓글