스레드 풀과 Executor 프레임워크1

황상익·2024년 10월 23일

Inflearn JAVA

목록 보기
54/61

스레드를 직접 사용할 때의 문제점

  1. 스레드 생성 비용으로 인한 문제
    스레드를 사용하려면, 먼저 스레드를 생성, 그런데 스레드는 무거움
  • 메모리 할당 : 각 스레드는 자신만의 호출 스택(call stack)을 가지고 있어야 한다. 이 호출 스택은 스레드가 실행되는 동안 사용하는 메모리 공간. 스레드를 생성할때 호출 스택을 위한 메모리를 할당
  • 운영체제 자원 사용 : 스레드를 생성하는 작업은 운영체제 커널 수준에서 이루어지며, 시스템 콜(system call)을 통해 처리
  • 운영체제 스케줄러 설정 : 새로운 스레드가 생성되면 운영체제의 스케줄러는 이 스레드를 관리하고 실행 순서를 조정. 운영체제의 스케줄링 알고리즘에 따라 추가적인 오버헤드가 발생
  1. 스레드 관리 문제
    서버의 CPU, 메모리 자원은 한정 -> 스레드르 무한정 생성 할 수는 없음

  2. Runnable의 불편함

  • 반환 값이 없다. : run 메서드는 반환 값을 갖지 않음. 실행 결과를 얻기 위해서는 별도의 메커니즘 사용.
    스레드의 실행 결과를 직접 받을 수는 없음.
  • 예외 처리 : run 메서드는 체크 예외를 던질 수 없음.

위 3가지 방법을 해결하기 위해선 Thread Pool 필요

  • 스레드 관리 하는 풀
  • 스레드는 스레드 풀에서 대기
  • 스레드 풀에서 이미 만들어진 스레드를 하나 조회
  • 조회한 스레드 1로 작업을 처리

  • 스레드 1은 작업을 완료
  • 작업을 완료한 스레드는 종료하는게 아니라 다시 스레드 풀에 반납

Executor 프레임워크 소개

멀티스래딩 및 병렬 처리를 쉽게 돕는 기능.

Executor interface

public interface Executor {
void execute(Runnable command);
}

주요 메서드
인터페이스를 확장해서 작업 제출과 제어 기능을 추가로 제공

로그 출력 유틸리티 만들기

package chap50;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static chap41.util.MyLogger.log;

public abstract class ExecutorUtils {
    public static void printState(ExecutorService executorService) {
        if (executorService instanceof ThreadPoolExecutor poolExecutor) { //구현체로 ThreadPoolExecutor 넘어옴 (casting)
            int pool = poolExecutor.getPoolSize(); //현재 생성된 스레드 개수
            int active = poolExecutor.getActiveCount(); //현재 일하고 있는 스래드

            int queueTasks = poolExecutor.getQueue().size(); //스레드가 작업을 던지면 queue의 작업을 당김(큐에 작업이 얼마나 들어있는지)
            long completedTasks = poolExecutor.getCompletedTaskCount();//완료된 작업은??

            log("[pool= " + pool + ", active=" + active + ", queueTasks=" + queueTasks + ", completedTasks=" + completedTasks + "]");
        } else {
            log(executorService);
        }
    }

    //추가
    public static void printState(ExecutorService executorService, String taskName) {
        if (executorService instanceof ThreadPoolExecutor poolExecutor) { //구현체로 ThreadPoolExecutor 넘어옴 (casting)
            int pool = poolExecutor.getPoolSize(); //현재 생성된 스레드 개수
            int active = poolExecutor.getActiveCount(); //현재 일하고 있는 스래드

            int queueTasks = poolExecutor.getQueue().size(); //스레드가 작업을 던지면 queue의 작업을 당김(큐에 작업이 얼마나 들어있는지)
            long completedTasks = poolExecutor.getCompletedTaskCount();//완료된 작업은??

            log("[pool= " + pool + ", active=" + active + ", queueTasks=" + queueTasks + ", completedTasks=" + completedTasks + "]");
        } else {
            log(executorService);
        }
    }
}
  • pool : 스레드 풀에서 관리되는 스레드의 숫자
  • active : 작업을 수행하는 스레드의 숫자
  • queuedTasks : 큐에 대기중인 작업의 숫자
  • completedTask : 완료된 작업의 숫자
    참고로 ExecutorService 인터페이스는 getPoolSize(), getActiveCount() 같은 자세한 기능은 제공 X

ExecutorService 코드로 시작하기

package chap49.executor;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;

public class RunnableTask implements Runnable {

    private final String name;
    private int sleepMs = 1000;

    public RunnableTask(String name) {
        this.name = name;
    }

    public RunnableTask(String name, int sleepMs) {
        this.name = name;
        this.sleepMs = sleepMs;
    }

    @Override
    public void run() {
        log(name + " 시작");
        sleep(sleepMs);
        log(name + " 완료");
    }
}
package chap49.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
import static chap49.executor.ExecutorUtils.printState;

public class ExecutorBasicMain {
    public static void main(String[] args) {
        ExecutorService es = new ThreadPoolExecutor
                (2,2,0, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>());
        //LinkedBlockingQueue -> 무제한으로 꺼낼 수 있음, 없으면 대기

        log("== 초기 상태 ==");
        printState(es);
        es.execute(new RunnableTask("taskA"));
        es.execute(new RunnableTask("taskB"));
        //재사용 구간
        es.execute(new RunnableTask("taskC"));
        es.execute(new RunnableTask("taskD"));

        log("== 작업 수행 중 ==");
        printState(es);

        sleep(3000);
        //두개의 스레드가 있는데 둘다 쉬는 중
        log("== 작업 수행 완료 ==");
        printState(es);

        es.shutdown();
        log("== shutdown 완료 ==");
        printState(es);
    }
}

ThreadPoolExecutor(ExecutorService)

  • 스레드 풀 : 스레드를 관리
  • BlockingQueue : 작업을 보관, 생산자 소비자 문제를 해결 -> 단순한 큐가 아니라 BlockingQueue를 사용

ThreadPoolExecutor 생성자
corePoolSize : 스레드 풀에서 관리되는 기본 스레드의 수
maximumPoolSize : 스레드 풀에서 관리되는 최대 스레드 수
keepAliveTime, TimeUnit unit : 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기시간이다. 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.
BlockingQueue workQueue : 작업을 보관할 블로킹 큐

Runnable 사용

package chap49.executor;

import java.util.Random;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;

public class RunnableMain {
    public static void main(String[] args) throws InterruptedException {
        MyRunnable task = new MyRunnable();
        Thread thread = new Thread(task, "T1");
        thread.start();
        thread.join();
        int result = task.value;
        log("result Value = " + result);
    }

    static class MyRunnable implements Runnable {

        int value;

        @Override
        public void run() {
            log("Runnable 시작");
            sleep(2000);
            value = new Random().nextInt(10);
            log("create Value = " + value);
            log("Runnable 완료 ");
        }
    }
}

Future1 - 시작

Callable

public interface Callable<V> {
V call() throws Exception;
}

Callable과 Future 사용

package chap49.executor.future;

import java.util.Random;
import java.util.concurrent.*;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;

public class CallableMainV1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(1);
        Future<Integer> future = es.submit(new MyCallable()); //future를 반환
        Integer result = future.get(); //Main이 바로 호출
        log("result Value = " + result);
        es.shutdown();
    }

    static class MyCallable implements Callable<Integer> {


        @Override
        public Integer call() throws Exception {
            log("Callable 시작");
            sleep(2000);
            int value = new Random().nextInt(10);
            log("create value = " + value);
            log("Callable 끝");
            return value;
        }
    }
}

newFixedThreadPool(size) : 편리하게 ExecutorService를 생성 가능

submit : <T> Future<T> submit(Callable<T> task); //인터페이스 정의
submit을 통해 Callable 작업으로 전달 가능

MyCallable 인스턴스가 블로킹 큐에 절달, 스레드 풀의 스레드 중 하나가 이 작업을 실행
Future.get()은 InterruptException, ExecutionExeception 체크 예외를 던진다.

Executor 프레임워크의 강점
요청 스레드가 결과를 받아야 하는 상황, Callable 사용한 방식은 Runnable을 사용하는 방식보다는 훨씬 편함. 단순한 싱글스레드 방식으로 개발한다는 느낌을 받음
내가 스레들 따로 생성, Join으로 제어 하는 일이 없음

단 future.get을 호출하는 요청 스레드 main은 future.get을 호출 했을 때 2가지 상황으로 나뉨
MyCallable 작업을 처리하는 스레드 풀의 스레드가 작업을 완료했다.
MyCallable 작업을 처리하는 스레드 풀의 스레드가 아직 작업을 완료하지 못했다.

Future2 - 분석

Future<Integer> future = es.submit(new MyCallable());

submit 호출로 MyCallable 인스턴스를 전달
submit은 MyCallable이 반환하는 무작위 숫자 대신 Future을 반환
MyCallable이 즉시 실행되어 즉시 결과를 반환하는 것은 불가능.
생각해보면 MyCallable 이 즉시 실행되어서 즉시 결과를 반환하는 것은 불가능. 스레드 풀의 스레드가 미래의 어떤 시점에 이 코드를 대신 실행.
MyCallable.call() 메서드는 호출 스레드가 실행하는 것도 아니고, 스레드 풀의 다른 스레드가 실행하기 때문에 언제 실행이 완료되어서 결과를 반환할 지 알 수 없다. 결과를 즉시 받는 것은 불가능. es.submit()MyCallable 의 결과를 반환하는 대신에 MyCallable 의 결과를 나중에 받을 수 있는 Futurue 라는 객체를 대신 제공.

package chap49.executor.future;

import java.util.Random;
import java.util.concurrent.*;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;

public class CallableMainV2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(1);
        log("submit 호출");

        Future<Integer> future = es.submit(new MyCallable()); //future를 반환
        log("future 즉시 반환, future = " + future);

        log("future.get() [블로킹] 메서드 호출 시작 -> main 스레드 Waiting");
        Integer result = future.get(); //Main이 바로 호출 (결과가 value를 반환할때까지 대기)
        log("future.get() [블로킹] 메서드 호출 완료 -> main 스레드 Runnable");

        log("result Value = " + result);
        log("future 완료, future = " + future);
        es.shutdown();
    }

    static class MyCallable implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            log("Callable 시작");
            sleep(2000);
            int value = new Random().nextInt(10);
            log("create value = " + value);
            log("Callable 끝");
            return value;
        }
    }
}

실행 결과 분석

es.submit(new MyCallable())

MyCallable 인스턴스를 편의상 taskA 라고 하겠다.
편의상 스레드풀에 스레드가 1개 있다고 가정하겠다.
submit() 을 호출해서 ExecutorServicetaskA 를 전달

Future의 생성

요청 스레드 es.submit(taskA)를 호출하는 중
ExecutorService는 전달한 taskA의 미래 결과를 알 수 있는 Future 객체를 생성

  • Future는 인터페이스이다. 이때 생성되는 실제 구현체는 FutureTask


submit()을 호출한 경우 Future가 만들어지고, 전달한 작업인 taskA가 바로 블로킹 큐에 담기는 것이 아니라, 그림처럼 taskA를 감싸고 있는 Future가 대신 블로킹 큐에 담긴다.

Future<Integer> future = es.submit(new MyCallable());

Future 는 내부에 작업의 완료 여부와, 작업의 결과 값을 가진다. 작업이 완료되지 않았기 때문에 아직은 결과 값이 없다

  • 로그를 보면 Future 의 구현체는 `FutureTask
  • Future 의 상태는 "Not completed"(미 완료)이고, 연관된 작업은 전달한 taskA(MyCallable 인스턴스) 이다.
    여기서 중요한 핵심이 있는데, 작업을 전달할 때 생성된 Future 는 즉시 반환

생성한 Future 를 즉시 반환하기 때문에 요청 스레드는 대기하지 않고, 자유롭게 본인의 다음 코드를 호출

  • Thread.start를 호출 한 것과 비슷


큐에 들어있는 Future[taskA] 를 꺼내서 스레드 풀의 스레드1이 작업을 시작
Future의 구현체인 FutureTaskRunnable 인터페이스도 함께 구현
스레드1은 FutureTaskrun() 메서드를 수행
FutureTask.run() -> MyCallable.call()


스레드1 : taskA의 작업을 아직 처리중
요청 스레드
요청 스레드는 Future 인스턴스의 참조
Future.get()을 호출해서 taskA 작업의 미래 결과를 받을 수 있다.
요청 스레드는 작업의 결과가 필요해서 future.get() 을 호출

  • Future에는 완료 상태가 있다. taskA 의 작업이 완료되면 Future 의 상태도 완료로 변경
  • 그런데 여기서 taskA 의 작업이 아직 완료되지 않았다. 따라서 Future 도 완료 상태가 아니다.
    요청 스레드의 상태는 RUNNABLE WAITING


요청 스레드
대기(WAITING ) 상태로 future.get() 을 호출하고 대기중이다.
스레드1

  • taskA 작업을 완료한다.
  • FuturetaskA 의 반환 결과를 담는다.
  • Future 의 상태를 완료로 변경한다.
  • 요청 스레드를 깨운다. 요청 스레드는 WAITING RUNNABLE 상태로 변한다.


요청 스레드
요청 스레드는 RUNNABLE 상태가 되었다. 그리고 완료 상태의 Future 에서 결과를 반환 받는다. 참고로taskA 의 결과가 Future 에 담겨있다.
스레드1
작업을 마친 스레드1은 스레드 풀로 반환된다. RUNNABLE WAITING


Future 의 인스턴스인 FutureTask 를 보면 "Completed normally"로 정상 완료

Future가 필요한 이유?

Integer result = es.submit(new MyCallable()); // 여기서 블로킹
이렇게 하면 submit이 언제 결과 값을 반환하는지 알 수 없음

Future3 - 활용

package chap49.executor.future;

import java.util.concurrent.Callable;

import static chap41.util.MyLogger.log;

public class SumTaskMainV1 {
    public static void main(String[] args) throws InterruptedException {
        SumTask task1 = new SumTask(1, 50);
        SumTask task2 = new SumTask(51, 100);

        Thread t1 = new Thread(task1, "t1");
        Thread t2 = new Thread(task2, "t2");

        t1.start();
        t2.start();

        log("join() - main 스레드가 t1, t2 종료까지 대기");
        t1.join();
        t2.join();
        log("main 스레드 대기 완료");

        log("task1.result =" + task1.result);
        log("task2.result =" + task2.result);

        int sumAll = task1.result + task2.result;
        log("task1 + task2 = " + sumAll);
        log("End");
    }

    static class SumTask implements Runnable {
        int startValue;
        int endValue;
        int result = 0;

        public SumTask(int startValue, int endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }

        @Override
        public void run() {
            log("작업 시작");

            //예외를 잡았어야 한다.
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            int sum = 0;
            for (int i = startValue; i <= endValue ; i++) {
                sum += i;
            }

            result = sum;

            log("작업 완료 result = " + sum);
        }
    }
}
package chap49.executor.future;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;

public class SumTaskMainV2 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        SumTask task1 = new SumTask(1, 50);
        SumTask task2 = new SumTask(51, 100);
        ExecutorService es = Executors.newFixedThreadPool(2);

        //main 스레드가 던자고 바로 또 다른 요청을 요구 -> 알아서 여러곳에서 실행
        //submit으로 원하는 작업 마음껏 던지고, 결과 받는 건 마지막
        Future<Integer> f1 = es.submit(task1);
        Future<Integer> f2 = es.submit(task2);

        //기다리고 return 여기서 블로킹
        Integer sum1 = f1.get();
        Integer sum2 = f2.get();

        log("task1.result =" + sum1);
        log("task2.result =" + sum2);

        int sumAll = sum1 + sum2;
        log("task1 + task2 = " + sumAll);
        log("end");
        es.shutdown();
    }

    static class SumTask implements Callable<Integer> {
        int startValue;
        int endValue;
        int result = 0;

        public SumTask(int startValue, int endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }

        @Override
        //Exception의 자식들을 모두 잡아서 던질 수 있다.
        public Integer call() throws Exception {
            log("작업 시작");
            Thread.sleep(2000);
            int sum = 0;
            for (int i = startValue; i <= endValue; i++) {
                sum += i;
            }

            result = sum;

            log("작업 완료 result = " + sum);

            return sum;
        }
    }
}

작업의 결과를 반환, 요청 스레드에서 결과를 바로 받아서 처리 하는 부분이 매우 직관적, 코드만 보면 마치 멀티스레드를 사용하지 않고, 단일 스레드 상황에서 일반적 메서드 호출, 결과.

Future4 - 이유

Future<Integer> future1 = es.submit(task1); // 여기는 블로킹 아님
Future<Integer> future2 = es.submit(task2); // 여기는 블로킹 아님

Integer sum1 = future1.get(); // 여기서 블로킹
Integer sum2 = future2.get(); // 여기서 블로킹

Future 없이 결과를 직접 반환

Future 를 사용하지 않는 경우 결과적으로 task1 의 결과를 기다린 다음에 task2 를 요청한다. 따라서 총 4초의 시간이 걸렸다. 이것은 마치 단일 스레드가 작업을 한 것과 비슷한 결과

Future 반환


f1.get을 호출해 대기. -> f2.get 호출해 즉시 결과를 받음

Future를 잘못 사용하는 예
1. 적절히 사용

Future<Integer> future1 = es.submit(task1); // non-blocking
Future<Integer> future2 = es.submit(task2); // non-blocking

Integer sum1 = future1.get(); // blocking, 2초 대기
Integer sum2 = future2.get(); // blocking, 즉시 반환
  1. 잘못 사용
Future<Integer> future1 = es.submit(task1); // non-blocking
Integer sum1 = future1.get(); // blocking, 2초 대기

Future<Integer> future2 = es.submit(task2); // non-blocking
Integer sum2 = future2.get(); // blocking, 2초 대기
package chap49.executor.future;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;

public class SumTaskBadMainV2 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        SumTask task1 = new SumTask(1, 50);
        SumTask task2 = new SumTask(51, 100);
        ExecutorService es = Executors.newFixedThreadPool(2);

        //하나 던지고 결과 받고 하나 던지고 결과 받고
        Future<Integer> f1 = es.submit(task1);
        Integer sum1 = f1.get(); //2초 대기
        Future<Integer> f2 = es.submit(task2); //2초 대기
        Integer sum2 = f2.get();

        log("task1.result =" + sum1);
        log("task2.result =" + sum2);

        int sumAll = sum1 + sum2;
        log("task1 + task2 = " + sumAll);
        log("end");
        es.shutdown();
    }

    static class SumTask implements Callable<Integer> {
        int startValue;
        int endValue;
        int result = 0;

        public SumTask(int startValue, int endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }

        @Override
        //Exception의 자식들을 모두 잡아서 던질 수 있다.
        public Integer call() throws Exception {
            log("작업 시작");
            Thread.sleep(2000);
            int sum = 0;
            for (int i = startValue; i <= endValue; i++) {
                sum += i;
            }

            result = sum;

            log("작업 완료 result = " + sum);

            return sum;
        }
    }
}

Future5 - 정리

  1. boolean cancel(boolean mayInterruptIfRunning)
    기능 : 아직 완료되지 않은 작업을 취소
    mayInterruptIfRunning
  • cancel(true) : Future 를 취소 상태로 변경한다. 이때 작업이 실행중이라면
    Thread.interrupt() 를 호출해서 작업을 중단한다.
  • cancel(false) : Future 를 취소 상태로 변경, 작업을 중단하지 않는다.
    반환값 : 작업이 성공적으로 취소된 경우 true , 이미 완료되었거나 취소할 수 없는 경우 false
    설명 : 작업이 실행 중이 아니거나 아직 시작되지 않았으면 취소하고, 실행 중인 작업의 경우
  1. boolean isCancelled()
    기능 : 작업이 취소되었는지 여부
    반환값 : 작업이 취소된 경우 true , 그렇지 않은 경우 false
    설명 : 이 메서드는 작업이 cancel() 메서드에 의해 취소된 경우에 true 를 반환

  2. boolean isDone()
    기능 : 작업이 완료되었는지 여부를 확인한다.
    반환값: 작업이 완료된 경우 true , 그렇지 않은 경우 false
    설명: 작업이 정상적으로 완료되었거나, 취소되었거나, 예외가 발생하여 종료된 경우에 true 를 반환한다.

  3. State state()
    기능 : Future 상태를 반환
    RUNNING : 작업 실행 중
    SUCCESS : 성공 완료
    FAILED : 실패 완료
    CANCELLED : 취소 완료

  4. V get()
    기능 : get과 같은데 시간 초과시 예외
    매개변수

  • timeout : 대기할 최대 시간
  • unit: timeout 매개변수의 시간 단위 지정
    반환값 : 작업의 결과
    예외:
  • InterruptedException : 대기 중에 현재 스레드가 인터럽트된 경우 발생
  • ExecutionException : 계산 중에 예외가 발생한 경우 발생
  • TimeoutException : 주어진 시간 내에 작업이 완료되지 않은 경우 발생

Future6 - 취소

package chap49.executor.future;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;

public class FutureCancelMain {

    private static boolean mayInterruptIfRunning = true;
    //private static boolean mayInterruptIfRunning = false;

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1);
        Future<String> future = es.submit(new MyTask());
        //log("future state: " + future.state());

        sleep(3000);

        //cancel 호출
        log("future.cancel(" + mayInterruptIfRunning + ") 호출");
        boolean cancelResult1 = future.cancel(mayInterruptIfRunning);
        log("future.cancel(" + mayInterruptIfRunning + ") result: " + cancelResult1);

        //결과 확인
        try {
            log("Future result : " + future.get());
        } catch (CancellationException e) { //Runtime 예외
            log("Future는 이미 취소 되었습니다.");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        es.shutdown();
    }

    static class MyTask implements Callable<String> {
        @Override
        public String call() throws Exception {
            try {
                for (int i = 0; i < 10; i++) {
                    log("작업 중 : " + i);
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log("인터럽트 발생");
                return "Interrupted";
            }

            return "completed";
        }
    }
}

Future7 - 예외

package chap49.executor.future;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;

public class FutureExceptionMain {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1);
        log("작업 전달");
        Future<Integer> future = es.submit(new ExCallable());
        sleep(1000);

        try {
            log("future.get() 호출 시도, future.state()" + future.isDone()); //FAILED
            Integer result = future.get();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);

            //실행 중에 예외 발생 call 안에 코드를 발생하는 예외
        } catch (ExecutionException e) {
            log("e = " + e);
            Throwable cause = e.getCause(); //ExecutionException이 왜 발생했는지
            log("cause = " + cause);
        }

        es.shutdown();
    }

    static class ExCallable implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            log("Callable 실행, 예외 발생");
            throw new IllegalArgumentException("ex!"); //callable이 호출해서 예외 발생
        }
    }
}

ExecutorService - 작업 컬렉션 처리

package chap49.executor;

import java.util.concurrent.Callable;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;

public class CallableTask implements Callable<Integer> {

    private String name;
    private int sleepMs = 1000;

    public CallableTask(String name) {
       this.name = name;
    }

    public CallableTask(String name, int sleepMs) {
        this.name = name;
        this.sleepMs = sleepMs;
    }

    @Override
    public Integer call() throws Exception {
        log(name + "실행");
        sleep(sleepMs);
        log(name + " 완료, return = " + sleepMs);
        return sleepMs;
    }
}

invokeAll()
Callable 작업을 제출하고 모든 작업이 완료될 때까지 대기
지정된 시간 내에 모든 Callable 작업을 제출, 완료시 대기

package chap49.executor.future;

import chap49.executor.CallableTask;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static chap41.util.MyLogger.log;

public class InvokeAllMain {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newFixedThreadPool(10);

        CallableTask t1 = new CallableTask("t1", 1000);
        CallableTask t2 = new CallableTask("t2", 2000);
        CallableTask t3 = new CallableTask("t3", 3000);
        List<CallableTask> tasks = List.of(t1, t2, t3);


        List<Future<Integer>> futures = es.invokeAll(tasks);
        for (Future<Integer> future : futures) {
            Integer value = future.get();
            log("value = " + value);
        }
        es.shutdown();
    }
}

invokeAny()
하나의 Callable 작업이 완료될 때 까지 대기, 가장 먼저 완료된 작업의 결과를 반환
완료되자 않은 나머진 취소
지정된 시간 내에 하나의 Callable 작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과를 반
환한다.
완료되지 않은 나머지 작업은 취소

package chap49.executor.future;

import chap49.executor.CallableTask;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static chap41.util.MyLogger.log;

public class InvokeAnyMain {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newFixedThreadPool(10);

        CallableTask t1 = new CallableTask("t1", 1000);
        CallableTask t2 = new CallableTask("t2", 2000);
        CallableTask t3 = new CallableTask("t3", 3000);
        List<CallableTask> tasks = List.of(t1, t2, t3);


        Integer value = es.invokeAny(tasks);
        log("value = " + value);
        es.shutdown();
    }
}
profile
개발자를 향해 가는 중입니다~! 항상 겸손

0개의 댓글