스레드 풀과 Executor 프레임워크2

황상익·2024년 10월 23일

Inflearn JAVA

목록 보기
55/61

ExecutorService 우아한 종료 - 소개

서버를 운영중에 만약 서버 기능을 update 하기 위해 서버를 재시작
어떤 작업을 처리 중에 갑자기 재시작, 해당 작업의 작동이 제대로 진행 안됨
가장 이상적인 방형은 새로운 진행 중인 작업은 막고, 이미 진행중인 주문은 모두 완료한 다음 재시작

ExecutorService의 종료 메서드
void shutdown()

  • 새로운 작업을 받지 않고, 이미 제출된 작업을 모두 완료한 후에 종료
  • 논 블로킹 메서드

List< Runnable > shutdownNow()
boolean isShutdown()

  • 서비스가 종료 되었는지 확인
    boolean isTerminated()
  • shutdown(), shutdownNow() 호출 후, 모든 작업이 완료되었는지 확인한

shutdown() - 처리중인 작업이 없는 경우

스레드만 2개 대기
shutdown()을 호출한다. ExecutorService는 새로운 요청을 거절한다.
거절시 기본적으로 java.util.concurrent.RejectedExecutionException 예외가 발생

shutdown() - 처리중인 작업이 있는 경우


스레드 풀의 스레드는 처리중인 작업을 완료한다.
스레드 풀의 스레드는 큐에 남아있는 작업도 모두 꺼내서 완료한다.

shutdownNow() - 처리중인 작업이 있는 경우


큐를 비우면서, 큐에 있는 작업을 모두 꺼내서 컬렉션으로 반환

  • List< Runnable > runnables = es.shutdownNow()
    스레드에 인터럽트가 발생
  • 작업 중인 taskA , taskB 는 인터럽트가 걸린다.
  • 큐에 대기중인 taskC , taskD 는 수행되지 않는다.
package chap50;

import chap49.executor.RunnableTask;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static chap41.util.MyLogger.log;
import static chap49.executor.ExecutorUtils.printState;

public class ExecutorShutdownMain {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        es.execute(new RunnableTask("taskA"));
        es.execute(new RunnableTask("taskB"));
        es.execute(new RunnableTask("taskC"));
        es.execute(new RunnableTask("longTask", 100_000));
        printState(es);

        log("== shutdown 시작 ==");
        shutdownAndAwaitTermination(es);
        log("== shutdown 완료 ==");
        printState(es);
    }

    static void shutdownAndAwaitTermination(ExecutorService es) {
        es.shutdown(); //non-blocking, 새로운 작업을 받지 않는다. 처리중이거나 큐에 이미 대기중인 작업까지만 처리

        try {
            //이미 대기중인 잡업들 완료까지 10초 기다림
            log("서비스 정상 종료 시도");
            if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
                //정상 종료 기다려줌
                //정상 종료가 너무 오래 걸림..
                log("정상 종료 실패 -> 강제 종료");
                es.shutdownNow();
                // 자원 종료돨 때까지 기다림
                // 작업이 취소될 떄 대기
                // 종료가 안될 경우..
                if (es.awaitTermination(10, TimeUnit.SECONDS)) {
                    log("서비스가 종료되지 않았습니다.");
                }
            }
        } catch (InterruptedException e) {
            //대기 중인 현재 스레드가 인터럽트 될 수 있음
            es.shutdownNow();
        }
    }
}

서비스 종료
es.shutdown();
새로운 작업을 받지 않음. 처리중이거나 큐에 이미 대기중인 작업은 처리. 풀에 스레드 종료함.
shutdown 블로킹 메서드가 아님. 서비스가 종료될 때 까지 main 스레드가 대기 X

블로킹 메서드

if (!es.awaitTermination(10, TimeUnit.SECONDS)){...}

taskA , taskB , taskC 의 수행이 완료된다. 그런데 longTask 는 10초가 지나도 완료되지 않았다. 따라서 false 를 반환
종료가 10초 이상 너무 오래 거림, shutdownNow을 통해 강제 종료에 들어간다. 블러킹 메서드는 아님
강제 종료를 하면, 작업중인 스레드에 인터럽트 발생.

서비스 종료 실패
왜 10초간 기다릴까? 작업 중인 스레드에 인터럽트를 호출 하는 것은 맞다. 인터럽트가 호출되더라도, 여러 이유로 시간이 걸릴 수도 있음.

Executor 스레드 풀 관리 - 코드

ExecutorService 의 기본 구현체인 ThreadPoolExecutor 의 생성자는 다음 속성을 사용한다.

  • corePoolSize : 스레드 풀에서 관리되는 기본 스레드의 수
  • maximumPoolSize : 스레드 풀에서 관리되는 최대 스레드 수
  • keepAliveTime , TimeUnit unit : 기본 스레드 수를 초과해서 만들어진 초과 스레드가 생존할 수 있는 대기 시간, 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.
  • BlockingQueue workQueue : 작업을 보관할 블로킹 큐
package chap50;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static chap41.util.MyLogger.log;

public abstract class ExecutorUtils {
    public static void printState(ExecutorService executorService) {
        if (executorService instanceof ThreadPoolExecutor poolExecutor) { //구현체로 ThreadPoolExecutor 넘어옴 (casting)
            int pool = poolExecutor.getPoolSize(); //현재 생성된 스레드 개수
            int active = poolExecutor.getActiveCount(); //현재 일하고 있는 스래드

            int queueTasks = poolExecutor.getQueue().size(); //스레드가 작업을 던지면 queue의 작업을 당김(큐에 작업이 얼마나 들어있는지)
            long completedTasks = poolExecutor.getCompletedTaskCount();//완료된 작업은??

            log("[pool= " + pool + ", active=" + active + ", queueTasks=" + queueTasks + ", completedTasks=" + completedTasks + "]");
        } else {
            log(executorService);
        }
    }

    //추가
    public static void printState(ExecutorService executorService, String taskName) {
        if (executorService instanceof ThreadPoolExecutor poolExecutor) { //구현체로 ThreadPoolExecutor 넘어옴 (casting)
            int pool = poolExecutor.getPoolSize(); //현재 생성된 스레드 개수
            int active = poolExecutor.getActiveCount(); //현재 일하고 있는 스래드

            int queueTasks = poolExecutor.getQueue().size(); //스레드가 작업을 던지면 queue의 작업을 당김(큐에 작업이 얼마나 들어있는지)
            long completedTasks = poolExecutor.getCompletedTaskCount();//완료된 작업은??

            log("[pool= " + pool + ", active=" + active + ", queueTasks=" + queueTasks + ", completedTasks=" + completedTasks + "]");
        } else {
            log(executorService);
        }
    }
}
package chap50;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;

public class RunnableTask implements Runnable {

    private final String name;
    private int sleepMs = 1000;

    public RunnableTask(String name) {
        this.name = name;
    }

    public RunnableTask(String name, int sleepMs) {
        this.name = name;
        this.sleepMs = sleepMs;
    }

    @Override
    public void run() {
        log(name + " 시작");
        sleep(sleepMs);
        log(name + " 완료");
    }
}
package chap50.poolsize;

import chap50.RunnableTask;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
import static chap50.ExecutorUtils.printState;

public class PoolSizeMainV1 {
    public static void main(String[] args) {
        BlockingQueue<Runnable> wokeQueue = new ArrayBlockingQueue<>(2); //최대 사이즈 2개
        ExecutorService es = new ThreadPoolExecutor(2, 4, 3000, TimeUnit.MILLISECONDS, wokeQueue);
        printState(es);

        es.execute(new RunnableTask("task1"));
        printState(es);

        es.execute(new RunnableTask("task2"));
        printState(es);

        //queue에 들어가버린다.
        es.execute(new RunnableTask("task3"));
        printState(es);

        es.execute(new RunnableTask("task4"));
        printState(es);

        //queue 가득 참 -> pool이 하나 생성 (큐까지 가득 차면 pool size가 늘어난다)
        es.execute(new RunnableTask("task5"));
        printState(es);

        es.execute(new RunnableTask("task6"));
        printState(es);

        //모두 다 찬 상태에서 더 넣으면 rejectedException 발생하게 됨
        try {
            es.execute(new RunnableTask("task7"));
        } catch (RejectedExecutionException e) {
            log("task7 실행 거절 예외 발생 : " + e);
        }
        sleep(3000);
        log("== 수행 작업 완료 ==");
        printState(es);

        //초과 스레드들은 대기 시간만큼 일이 없다면 삭제된다.
        sleep(3000);
        log("== maximumPoolSize 대기 시간 초과 ==");
        printState(es);

        es.shutdown();
        log("== shutdown 완료 ==");
        printState(es);
    }
}

corePoolSize=2 , maximumPoolSize=4 : 기본 스레드는 2개, 최대 4개까지 설정
요청이 너무 많거나 급한 경우 스레드 풀은 4개까지 스레들 확장 -> 초과 스레드라고 함
3000 , TimeUnit.MILLISECONDS : 초과 스레드가 생존할 수 있는 대기 시간

큐와 스레드에 모두 다 값이 있을경우 , 스레드 값을 강제로 늘려 일부 수용해서 돌아감


초과 스레드가 더이상 들어갈 곳이 없다면, error를 날린다.

스레드 미리 생성

public class PrestartPoolMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1000);
printState(es);
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) es;
poolExecutor.prestartAllCoreThreads();
printState(es);
}
}

전략 - 고정 풀 전략

스레드 풀 관리 - 고정 풀 전략

package chap50.poolsize;

import chap50.RunnableTask;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
import static chap50.ExecutorUtils.printState;

public class PoolSizeMainV2 {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        //ExecutorService es = new ThreadPoolExecutor(2, 4, 3000, TimeUnit.MILLISECONDS, wokeQueue);
        log("pool 셍성");
        for (int i = 1; i <= 6; i++) {
            String taskName = "task" + i;
            es.execute(new RunnableTask(taskName));
            printState(es, taskName);
        }

        es.shutdown();
        log("== shutdown ==");
    }
}

자바는 Executors 클래스를 통해 3가지 기본 전력을 제공한다.
newSingleThreadPool(): 단일 스레드 풀 전략
newFixedThreadPool(nThreads): 고정 스레드 풀 전략
newCachedThreadPool(): 캐시 스레드 풀 전략

장점은 스레드 수가 고정되어서 CPU, 메모리 리소스가 어느정도 예측 가능하다는 점이다. 따라서 일반적인 상황에 가장 안정적으로 서비스를 운영할 수 있다. 하지만 상황에 따라 장점이 가장 큰 단점이 되기도 한다.

캐시 풀 전략

newCachedThreadPool()
기본 스레드를 상용하지 않고 생존 주기를 가진 초과 스레드만 사용
초과 스레드의 수는 제한이 없다.
큐에 작업을 저장하지 않는다. (SynchronousQueue ) 바로 받아서 씀 = 직거래

package chap50.poolsize;

import chap50.RunnableTask;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;
import static chap41.util.ThreadUtils.sleep;
import static chap50.ExecutorUtils.printState;

public class PoolSizeMainV3 {
    public static void main(String[] args) {
        //ExecutorService es = Executors.newCachedThreadPool();
        ThreadPoolExecutor es = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3,
                TimeUnit.SECONDS  ,
                new SynchronousQueue<>());
        log("pool 셍성");
        printState(es);

        for (int i = 1; i <= 4; i++) {
            String taskName = "task" + i;
            es.execute(new RunnableTask(taskName));
            printState(es, taskName);
        }

        sleep(3000);
        log("==작업 수행 완료==");
        printState(es);

        sleep(3000);
        log("== maximumPoolSize 대기 시간 초과==");
        printState(es);


        es.shutdown();
        log("== shutdown ==");
    }
}

기본 스레드도 없고, 대기 큐에 작업도 쌓이지 않는다. 대신에 작업 요청이 오면 초과 스레드로 작업을 바로바
로 처리한다. 따라서 빠른 처리가 가능하다. 초과 스레드의 수도 제한이 없기 때문에 CPU, 메모리 자원만 허용한다면 시스템의 자원을 최대로 사용할 수 있다. 추가로 초과 스레드는 60초간 생존하기 때문에 작업 수에 맞추어 적절한 수의 스레드가 재사용된다. 이런 특징 때문에 요청이 갑자기 증가하면 스레드도 갑자기 증가하고, 요청이 줄어들면 스레드도 점점 줄어든다. 이 전략은 작업의 요청 수에 따라서 스레드도 증가하고 감소하므로, 매우 유연한 전략

사용자 정의 풀 전략

package chap50.poolsize;

import chap49.executor.RunnableTask;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;
import static chap49.executor.ExecutorUtils.printState;

public class PoolSizeMainV4 {
    //static final int TASK_SIZE = 1100; //일반 -> 100개가 돌아감
    //static final int TASK_SIZE = 1200; //긴급 -> 200 개가 돌아감
    static final int TASK_SIZE = 1201;  //거절

    public static void main(String[] args) {
        ExecutorService es = new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000)); //100개는 스레드 1000개는 큐
        printState(es);

        long startMs = System.currentTimeMillis();

        for (int i = 1; i <= TASK_SIZE; i++) {
            String taskName = "task" + i;
            try {
                es.execute(new RunnableTask(taskName));
                printState(es);
            } catch (RejectedExecutionException e) {
                log(taskName + " -> " + e);
            }
        }

        es.shutdown();
        long endMs = System.currentTimeMillis();
        log("time : " + (endMs - startMs));
    }
}

각 상황에 맞게

public class PoolSizeMainV4 {
    //static final int TASK_SIZE = 1100; //일반 -> 100개가 돌아감
    //static final int TASK_SIZE = 1200; //긴급 -> 200 개가 돌아감
    static final int TASK_SIZE = 1201;  //거절

이부분을 조절하면 된다.

Executor 예외 정책

AbortPolicy()

package chap50.reject;

import chap49.executor.RunnableTask;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;

public class RejectMainV1 {
    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(1,1,0,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),//바로 reject를 받을 수 있음
                new ThreadPoolExecutor.AbortPolicy()); //RejectedExcutionHandler가 넘어온다. ThreadPoolExecutor 생성자는 RejectedExcutionHandler의 구현체를 전달 받는다.

        executor.submit(new RunnableTask("task1"));
        executor.submit(new RunnableTask("task2"));
        executor.submit(new RunnableTask("task3"));

        executor.shutdown();
    }
}
  • new ThreadPoolExecutor.AbortPolicy());
    RejectedExcutionHandler가 넘어온다. ThreadPoolExecutor 생성자는 RejectedExcutionHandler의 구현체를 전달 받는다.

DiscardPolicy

package chap50.reject;

import chap49.executor.RunnableTask;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;

public class RejectMainV2 {
    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(1,1,0,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),//바로 reject를 받을 수 있음
                new ThreadPoolExecutor.DiscardPolicy()); //조용히 버리는 정책, 아무 로직도 없음

        executor.submit(new RunnableTask("task1"));
        executor.submit(new RunnableTask("task2"));

        executor.shutdown();
    }
}

new ThreadPoolExecutor.DiscardPolicy()) : 조용히 버리는 정책, 아무 로직도 없음

CallerRunsPolicy

package chap50.reject;

import chap49.executor.RunnableTask;

import java.util.concurrent.*;

import static chap41.util.MyLogger.log;

public class RejectMainV3 {
    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(1,1,0,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),//바로 reject를 받을 수 있음
                new ThreadPoolExecutor.CallerRunsPolicy()); //main 스레드가 일하는 것을 볼 수 있음 (생산에 대한 속도를 조절 할 수 있음)

        executor.submit(new RunnableTask("task1"));
        executor.submit(new RunnableTask("task2"));
        executor.submit(new RunnableTask("task3"));
        executor.submit(new RunnableTask("task4"));

        executor.shutdown();
    }
}

Main 스레드가 일하는 것을 볼 수 있음 (생산에 대한 속도를 조절 할 수 있음)

MyRejectedExceptionHandler

package chap50.reject;

import chap49.executor.RunnableTask;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import static chap41.util.MyLogger.log;

public class RejectMainV4 {
    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(1,1,0,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),//바로 reject를 받을 수 있음
                new MyRejectedExecutionHandler()); //main 스레드가 일하는 것을 볼 수 있음 (생산에 대한 속도를 조절 할 수 있음)

        executor.submit(new RunnableTask("task1"));
        executor.submit(new RunnableTask("task2"));
        executor.submit(new RunnableTask("task3"));
        executor.submit(new RunnableTask("task4"));

        executor.shutdown();
    }


    static class MyRejectedExecutionHandler implements RejectedExecutionHandler {

        static AtomicInteger count = new AtomicInteger(0);

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            int i = count.incrementAndGet();
            log("[경고] 거절된 누적 작업 수 : " + i);
        }
    }
}
profile
개발자를 향해 가는 중입니다~! 항상 겸손

0개의 댓글