Executor <2>

sungs·2025년 7월 20일

자바

목록 보기
51/95

invokeAll(), invokeAny()

submit으로 작업 하나씩 집어넣는 대신, 컬렉션을 만들어 한꺼번에 넣을 수 있다.

invokeAll()은 작업이 모두 완료되면 반환받는 것이고, invokeAny()는 먼저 완료된 작업을 반환받는 것이다. Timeout과 TimeUnit을 지정해 일정한 시간 동안 기다리고 완료되지 않으면 나머지 작업은 취소한다.

get()과 발생하는 예외는 같다.

예제

invokeAll()

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class InvokeAllSimpleExample {

    public static void main(String[] args) {
        System.out.println("--- invokeAll() 예제 시작 ---\n");

        // 1. ExecutorService 생성 (2개의 스레드 풀)
        ExecutorService executor = Executors.newFixedThreadPool(2);
        System.out.println("ExecutorService (2개 스레드) 생성됨.");

        // 2. Callable 작업 리스트 생성
        List<Callable<String>> tasks = new ArrayList<>();

        // 3개의 간단한 Callable 작업 추가
        tasks.add(() -> {
            TimeUnit.SECONDS.sleep(2); // 2초 작업
            return "Task 1 완료 (2초)";
        });
        tasks.add(() -> {
            TimeUnit.SECONDS.sleep(1); // 1초 작업
            return "Task 2 완료 (1초)";
        });
        tasks.add(() -> {
            TimeUnit.SECONDS.sleep(3); // 3초 작업
            return "Task 3 완료 (3초)";
        });

        System.out.println("총 " + tasks.size() + "개의 Callable 작업을 제출합니다.");

        List<Future<String>> futures = null;
        try {
            System.out.println("invokeAll() 호출: 모든 작업이 완료될 때까지 기다립니다...");
            long startTime = System.currentTimeMillis();

            // 3. invokeAll() 호출: 모든 작업이 완료될 때까지 블로킹
            futures = executor.invokeAll(tasks);

            long endTime = System.currentTimeMillis();
            System.out.println("invokeAll() 완료! 총 소요 시간: " + (endTime - startTime) + "ms");

            // 4. 모든 Future에서 결과 가져오기 (이미 완료되었으므로 블로킹되지 않음)
            for (int i = 0; i < futures.size(); i++) {
                Future<String> future = futures.get(i);
                try {
                    String result = future.get(); // 이미 완료된 작업의 결과를 즉시 가져옴
                    System.out.println("결과 " + (i + 1) + ": " + result);
                } catch (Exception e) {
                    System.err.println("작업 " + (i + 1) + " 결과 가져오기 실패: " + e.getMessage());
                }
            }

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("invokeAll() 호출 중 인터럽트 발생: " + e.getMessage());
        } finally {
            // 5. ExecutorService 종료
            executor.shutdown();
            try {
                if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                executor.shutdownNow();
            }
        }

        System.out.println("\n--- invokeAll() 예제 종료 ---");
    }
}

invokeAny()

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;

public class InvokeAnySimpleExample {

    public static void main(String[] args) {
        System.out.println("--- invokeAny() 예제 시작 ---\n");

        // 1. ExecutorService 생성 (3개의 스레드 풀)
        ExecutorService executor = Executors.newFixedThreadPool(3);
        System.out.println("ExecutorService (3개 스레드) 생성됨.");

        // 2. Callable 작업 리스트 생성
        List<Callable<String>> tasks = new ArrayList<>();

        // 3개의 Callable 작업 추가. 각각 다른 시간 동안 대기하도록 설정
        tasks.add(() -> {
            // 작업 1: 2초 소요
            System.out.println(Thread.currentThread().getName() + ": Task 1 시작 (2초)");
            TimeUnit.SECONDS.sleep(2);
            System.out.println(Thread.currentThread().getName() + ": Task 1 완료.");
            return "Task 1 결과: 2초 걸렸습니다!";
        });
        tasks.add(() -> {
            // 작업 2: 1초 소요 (가장 빠를 것으로 예상)
            System.out.println(Thread.currentThread().getName() + ": Task 2 시작 (1초)");
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + ": Task 2 완료.");
            return "Task 2 결과: 1초 만에 끝났습니다!";
        });
        tasks.add(() -> {
            // 작업 3: 3초 소요
            System.out.println(Thread.currentThread().getName() + ": Task 3 시작 (3초)");
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread().getName() + ": Task 3 완료.");
            return "Task 3 결과: 3초 걸렸습니다!";
        });

        System.out.println("총 " + tasks.size() + "개의 Callable 작업을 제출합니다.");

        try {
            System.out.println("invokeAny() 호출: 가장 먼저 완료되는 작업의 결과를 기다립니다...");
            long startTime = System.currentTimeMillis();

            // 3. invokeAny() 호출: 첫 번째 완료되는 작업의 결과만 반환
            String result = executor.invokeAny(tasks);

            long endTime = System.currentTimeMillis();
            System.out.println("\ninvokeAny() 완료! 총 소요 시간: " + (endTime - startTime) + "ms");
            System.out.println("가장 먼저 얻은 결과: \"" + result + "\"");

        } catch (Exception e) { // InterruptedException 또는 ExecutionException을 한 번에 잡습니다.
            System.err.println("invokeAny() 호출 중 오류 발생: " + e.getMessage());
            e.printStackTrace(); // 어떤 예외인지 자세히 보기 위해
        } finally {
            // 4. ExecutorService 종료
            executor.shutdownNow(); // invokeAny는 완료되지 않은 작업들을 자동으로 취소하려 하지만,
                                  // 확실하게 스레드 풀을 종료합니다.
            try {
                if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    System.out.println("스레드 풀이 시간 내에 완전히 종료되지 않았습니다.");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        System.out.println("\n--- invokeAny() 예제 종료 ---");
    }
}

가장 먼저 완료된 작업만 반환한 뒤 완료되지 않은 직업은 인터럽트를 통해 취소한다.

작업 제출 메서드

  • void execute(Runnable command): Runnable 작업을 제출한다. 반환값 없다.
  • <T> Future<T> submit(Callable<T> task): Callable 작업을 제출하고 결과를 반환받는다.
  • Future<?> submit(Runnable task): Runnable 작업을 제출하고 결과를 반환받는다. 대신 Runnable이라 null을 반환받는다.

종료 메서드

void shutdown()

새로운 작업을 받지 않고 이미 제출된 작업을 모두 완료한 후에 종료한다. 논블로킹 메서드다.

List<Runnable> shutdownNow()

새로운 작업을 받지 않고 이미 제출된 작업도 인터럽트 발생시켜 즉시 종료한다. 큐에 대기중인 작업은 실행하지 않고 Runnable 컬렌션에 저장한다. 논 블로킹 메서드다.

List<Runnable> runnables = es.shutdownNow()

boolean isShutdown()

종료되었는지 확인한다.

boolean isTerminated()

shutdown, shutdownNow을 실행한 후 모든 작업이 완료되었는지 확인한다.

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

일정 시간 동안만 작업이 완료되기를 기다린다. 블로킹 메서드다.

close()

shutdown을 호출한 다음 작업이 완료되거나 인터럽트가 발생할 때 까지 무한정 반복 대기 한다. 호출한 스레드에 인터럽트가 발생해도 shutdownNow() 를 호출한다.

사용 방법

close을 실행한 다음 종료되지 않으면 shutdownNow로 강제로 종료하는 방식이 공식 api 문서에서 제안하는 방식이다.

 static void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // 새 작업을 받지 않음
        try {
            // 모든 기존 작업이 60초 안에 종료되도록 대기
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // 시간 초과 시, 강제로 실행 중인 작업 종료
                // 모든 작업이 종료될 때까지 다시 60초 대기
                if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("풀이 종료되지 않았습니다.");
            }
        } catch (InterruptedException ie) {
            // 현재 스레드가 대기 중에 인터럽트되면
            pool.shutdownNow(); // 풀을 즉시 종료
            Thread.currentThread().interrupt(); // 인터럽트 상태 유지
        }
    }

정리

바로 종료시키기보다는 자원 정리할 시간 주고 로그 찍을 시간 주고 어느 정도 시간을 주고 종료시키는 게 좋다. 시간이 지날 경우 강제로 종료시켜야 한다.

profile
앱 개발 공부 중

0개의 댓글