이것이 자바다 정리 #11 멀티 스레드 2 (스레드 상태 제어, 스레드 종료, 스레드 그룹, 스레드풀 생성, 스레드풀 작업 생성 및 처리

Jake Seo·2021년 5월 12일
1

이것이자바다

목록 보기
11/17

이것이 자바다 정리 #11 멀티 스레드 2

이것이 자바다 책을 참고하였습니다.

스레드 상태 제어

이전엔 스레드 상태에 대해 알아봤는데, 해당 스레드 상태를 제어하는 메소드에 대해 알아보자.

메소드와 상태 다이어그램

취소선으로 표기된 메소드는 Deprecated 메소드들이다.

Thread 내부 상태제어 메소드

interrupt()

일시 정지 상태의 스레드에서 InterruptedException 예외를 발생시켜 예외처리 코드에서 실행 대기 상태로 가거나 종료 상태로 갈 수 있도록 한다.

sleep(long millis[, int nanos])

주어진 시간 동안 스레드를 일시 정지 상태로 만든다. (TIMED_WAITING) 주어진 시간이 지나면 자동적으로 실행 대기 상태가 된다.

일시 정지 상태에서 주어진 시간을 다 기다리기 전에 .interrupt() 메소드가 호출되면 InterruptedException이 발생한다.

join(), join(long millis[, int nanos])

join() 메소드를 호출한 스레드는 일시 정지 상태가 된다. 실행 대기 상태로 가려면 join() 메소드를 멤버로 가지는 스레드가 종료되거나, 매개값으로 주어진 시간이 지나야 한다.

ThreadA에서 ThreadB.join() 메소드를 호출하면, ThreadAThreadB의 작업이 끝날 때까지 일시정지 상태가 된다.

비동기에서 타이밍이 안 맞아 버그가 생기거나 의도하지 않았던 동작이 생기는 경우는 매우 흔하다. 이 경우 .join() 메소드를 이용해 해결할 수 있다.

public class CalcThread extends Thread{
    private long sum = 0;

    public long getSum() {
        return sum;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 100; i++) {
            this.sum += i;
        }
    }
}

위와 같이 CalcThread 클래스를 생성하고,

public class Main {
    public static void main(String[] args) {
        CalcThread calcThread = new CalcThread();
        calcThread.start();

        long sum = calcThread.getSum();
        System.out.println("sum = " + sum);
    }
}

위와 같은 Main 클래스를 생성하면, 결과가 어떻게 나올까?

sum = 0이 나온다. CalcThread.run() 메소드가 끝나기 전에 print가 먼저 호출되어서 그렇다.

이럴 때 .join()을 써서 다른 스레드의 종료를 기다릴 수 있다.

public class Main {
    public static void main(String[] args) {
        CalcThread calcThread = new CalcThread();
        calcThread.start();

        try {
            calcThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        long sum = calcThread.getSum();
        System.out.println("sum = " + sum);
    }
}

이후 정상적으로 결과가 잘 나온다.

.join() 중간에 InterruptedException이 발생할 수 있으므로 예외처리를 잘해주어야 한다.

.join() 메소드에 인자로 시간을 주면, 해당 스레드가 죽을 때까지 기다리는 것이 아니라 해당 시간만큼만 기다리고 바로 코드 흐름이 진행되므로, 주의해야 한다.

네트워크 프로그램에서는 .join(5000) 과 같은 코드 이후 결과가 제대로 입력되지 않았다면, TimeoutException을 던지는 등 예외처리를 할 수도 있다.

yield()

실행 중에 우선순위가 동일한 다른 스레드에게 실행을 양보하고 실행 대기 상태가 된다.

스레드에 무의미한 반복이 있는 시점에 .yield() 메소드를 사용하면 성능에 도움이 된다.

public class ThreadA extends Thread{
    public boolean stop = false; // 종료 플래그
    public boolean work = true; // 작업 진행 여부 플래그

    @Override
    public void run() {
        while (!stop) {
            if(work) {
                System.out.println("ThreadA.run");
            } else {
                Thread.yield();
            }
        }

        System.out.println("ThreadA 종료");
    }
}

위와 같이 코드를 짜면, workfalse일 때, threadA.yield()를 호출하여 최대한 나중에 스케줄링된다.

Object 클래스 내부 상태제어 메소드

notify(), notifyAll()

동기화(synchronized) 블록 내에서 .wait() 메소드에 의해 일시 정지 상태에 있는 스레드를 실행 대기 상태로 만든다.

notify() 메소드는 wait()에 의해 일시정지된 스레드 1개를 실행 대기 상태로 만든다.

notifyAll() 메소드는 wait()에 의해 일시정지된 스레드를 모두 실행 대기 상태로 만든다.

wait(), wait(long millis[, int nanos])

동기화(synchronized) 블록 내에서 스레드를 일시 정지 상태로 만든다. 매개값으로 주어진 시간이 지나면 자동적으로 실행 대기 상태가 된다. 시간이 주어지지 않으면, notify(), notifyAll() 메소드에 의해 실행 대기 상태로 갈 수 있다.

notify()와 wait()의 활용

번갈아가며 출력하기

public class WorkObject {
    public synchronized void methodA() {
        notify();
        System.out.println("WorkObject.methodA");

        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized void methodB() {
        notify();
        System.out.println("WorkObject.methodB");

        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class WorkThreadA extends Thread {
    WorkObject workObject;

    public WorkThreadA(WorkObject workObject) {
        this.workObject = workObject;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            workObject.methodA();
        }
    }
}

public class WorkThreadB extends Thread{
    WorkObject workObject;

    public WorkThreadB(WorkObject workObject) {
        this.workObject = workObject;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            workObject.methodB();
        }
    }
}

public class Main {
    public static void main(String[] args) {
        WorkObject sharedWorkObject = new WorkObject();

        Thread workThreadA = new WorkThreadA(sharedWorkObject);
        Thread workThreadB = new WorkThreadB(sharedWorkObject);

        workThreadA.start();
        workThreadB.start();
    }
}

위는 WorkObject, WorkThreadA, WorkThreadB 클래스를 정의하고 객체를 생성하여 .notify().wait()을 이용해 출력 메소드를 번갈아가며 출력해본 예제이다.

.notify()가 처음 호출되고, WorkThreadAWorkObject.methodA()를 호출하여 프린트문을 한번 출력 후에 .wait()을 만나서 WorkThreadBWorkObject.methodB()를 출력해주지 않으면, notify()가 호출되지 않아 실행 대기상태로 갈 수 없다.

WorkThread가 서로서로 .notify()를 해주어 결과적으로는 번갈아가며 출력된다.

생산자와 소비자 문제

스레드를 2가지 분류로 나누어서, 생산자 스레드와 소비자 스레드로 나눈다. 생산자 스레드는 데이터를 만들어내고, 소비자 스레드는 데이터를 처리하는 교대 작업을 구현해야 한다.

생산과 소비의 열쇠 DataStorage 클래스

public class DataStorage {
    String data;

    public synchronized String consumeData() throws InterruptedException {
        if(data == null) {
            System.out.println("[소비자] 데이터를 소비했습니다. 데이터가 생산되기를 기다리고 있습니다.");
            wait();
        }

        System.out.println("[소비자] 데이터가 있어서 데이터가 소비되었습니다. (데이터명: " + data + ")");
        data = null;
        notify();
        return data;
    }


    public synchronized void produceData(String data) throws InterruptedException {
        this.data = data;
        System.out.println("[생산자] 데이터가 없어서 데이터가 생산되었습니다. (데이터명: " + data + ")");
        notify();

        if(data != null) {
            System.out.println("[생산자] 데이터를 생산했습니다. 데이터가 소비되기를 기다리고 있습니다.");
            wait();
        }
    }
}

위와 같이 작성하면

  • 절대 데이터를 연속으로 두 번 생성하거나 두 번 소비하지 않는다.
    • synchronized 메소드라 임계 영역이 되기 때문에 wait()이 끝나지 않으면 진입할 수 없다.
  • GetterSetter에 임계를 잘 설정해주면 된다.
    • Getter가 완료된 순간에 .notify()를 날려준 후 .wait()
    • Setter가 완료된 순간에 .notify()를 날려준 후 .wait()
    • 임계 영역으로 인해 완성된다.

Deprecated 메소드

resume() - Deprecated

.suspend() 메소드에 의해 일시 정지 상태에 있는 스레드를 실행 대기 상태로 만든다. (Deprecated 되었기 때문에, notify(), notifyAll()을 사용하는 것이 좋다.)

stop() - Deprecated

스레드를 즉시 종료시킨다. (Deprecated)

스레드의 안전한 종료(stop 플래그, interrupt())

  • 스레드는 기본적으로 .run() 메소드 실행을 마치면, 자동으로 종료된다.
    • 이전엔 .stop() 메소드로 종료시킬 수 있었으나, 자원이 불안전하게 종료되는 문제로 Deprecated 되었다.
      • 자원이란, 파일, 네트워크 연결 등을 말한다.

stop 플래그로 스레드 종료하기

public class MyThread extends Thread {
  private boolean stop;
  
  public void run() {
    while(!stop) {
      스레드 코드
    }
    
    // 스레드가 사용한 자원 정리
  }
}

stop 이라는 boolean 멤버 변수를 false로 바꾸면, 마지막 실행되던 루프까지 실행 후에 종료될 것이다.

interrupt() 메소드로 스레드 종료하기

  • interrupt() 메소드의 기본 역할은 스레드가 일시 정지 상태에 있을 때 InterruptedException 예외를 발생시키는 역할이다.
  • 요는 일시 정지 상태 에 있을 때, InterruptedException 예외를 발생시킨다는 것이다.
    • 일시 정지 상태 가 아니면 발동하지 않는다.
  • 단, 일시 정지를 하지 않아도, interrupt()가 호출되었는지 알 수 있는 방법은 있다.
    • 스레드의 interrupted()는 정적 메소드로 현재 스레드가 interrupted 되었는지 확인하는 메소드다.
    • 스레드의 isinterrupted()는 인스턴스 메소드로 현재 스레드가 interrupted가 되었는지 확인하는 메소드다.

데몬(Daemon) 스레드

  • 데몬 스레드는 주 스레드의 동작을 돕는 스레드다.
  • 주 스레드가 종료되면 데몬 스레드는 강제로 자동 종료된다.
    • 이 점을 제외하면 일반 스레드와 큰 차이가 없다.
  • 스레드를 데몬으로 만드려면 데몬이 될 스레드의 setDeamon(true)를 호출해주면 된다.

데몬 스레드 예제

public class AutoSaveDaemonThread extends Thread {
    public void save() {
        System.out.println("작업 내용을 저장하였습니다.");
    }

    @Override
    public void run() {
        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }

            save();
        }
    }
}

public class DaemonExample {
    public static void main(String[] args) {
        AutoSaveDaemonThread autoSaveDaemonThread = new AutoSaveDaemonThread();
        autoSaveDaemonThread.setDaemon(true);
        autoSaveDaemonThread.start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("종료합니다.");
    }
}

위와 같이 만들어 실행하면, 작업 내용을 저장하였습니다. 라는 메세지가 송출되다가, 3초 뒤 종료된다.

참고로 autoSaveDaemonThread.setDaemon(true);를 주석처리하면 프로세스는 1개의 스레드라도 남아있으면 종료되지 않는 원칙 때문에, 프로세스가 계속 종료되지 않고 남아있게 된다.

스레드 그룹

  • 스레드 그룹(Thread Group)은 관련된 스레드를 묶어서 관리할 목적으로 사용된다.
  • JVM도 운영에 필요한 스레드를 묶어서 system 스레드 그룹으로 묶는다.
  • 모든 스레드는 반드시 하나의 스레드 그룹에 포함된다.
    • 기본 값은 자신을 생성한 스레드의 스레드 그룹에 묶이게 된다.
      • 보통 main에 묶인다.

스레드 그룹 이름 확인하기

public class ThreadGroupExample {
    public static void main(String[] args) {
        Map<Thread, StackTraceElement[]> allStackTraces = Thread.getAllStackTraces();
        for (Thread thread : allStackTraces.keySet()) {
            String thisThreadName = thread.getName();
            System.out.println("thisThreadName = " + thisThreadName + (thread.isDaemon() ? "(데몬)" : "(주)"));
            ThreadGroup thisThreadGroup = thread.getThreadGroup();
            System.out.println("thisThreadGroup = " + thisThreadGroup);
            String thisThreadGroupName = thisThreadGroup.getName();
            System.out.println("thisThreadGroupName = " + thisThreadGroupName);
            System.out.println();
        }
    }
}
  • Thread.getAllStackTraces() 메소드의 리턴 값의 key 값을 보면 현재 실행중인 모든 스레드 목록을 확인할 수 있다.

대부분 데몬 스레드인 것을 확인할 수 있다. 가비지 컬렉션을 담당하는 Finalizer 스레드 등이 system 그룹에 속해있다.

스레드 그룹 생성

간단하게 생성자로 만들면 된다.

ThreadGroup tg = new ThreadGroup(String name);
ThreadGroup tg = new ThreadGroup(ThreadGroup parent, String name);
  • 부모 스레드 그룹도 지정할 수 있다.

새로운 스레드를 스레드 그룹에 넣고 싶다면 이것 역시 생성자로 가능하다.

Thread t = new Thread(ThreadGroup threadGroup, Runnable target);
Thread t = new Thread(ThreadGroup threadGroup, Runnable target, String name);
Thread t = new Thread(ThreadGroup threadGroup, Runnable target, String name, long stackSize);
Thread t = new Thread(ThreadGroup threadGroup, String name);
  • Runnable targetRunnable을 구현한 구현체를 말한다.
  • String name은 스레드의 이름이다.
  • stackSize는 JVM이 해당 스레드에 할당할 stack 크기이다.

스레드 그룹 일괄 interrupt()

  • 스레드 그룹에 포함된 모든 스레드를 일괄 .interrupt()할 수 있다.
    • 스레드 그룹의 .interrupt()는 포함된 모든 스레드의 interrupt()를 내부적으로 호출한다.

단, 안전한 종료를 위해서는 InterruptedException에 대한 예외처리를 스레드마다 잘 해주어야 한다.

이전에는 스레드 그룹이 사용하는 메소드로 suspend(), resume(), stop() 메소드들이 있었는데 모두 Deprecated 되었다.

스레드 그룹 메소드

  • int activeCount(): 현재 그룹 및 하위 그룹에서 활동중인 모든 스레드의 수를 반환한다.
  • int activeGroupCount(): 현재 그룹에서 활동중인 모든 하위 그룹의 수를 반환한다.
  • void checkAccess(): 현재 스레드가 스레드 그룹을 변경할 권한이 있는지 체크한다. 권한이 없는 경우 SecurityException()이 발생한다.
  • void destory(): 현재 그룹 및 하위 그룹을 모두 삭제한다. 단, 그룹 내에 포함된 모든 스레드들이 종료 상태가 되어야 한다.
  • int getMaxPrioirty(): 현재 그룹에 포함된 스레드가 가질 수 있는 최대 우선순위를 반환한다.
  • void setMaxPrioirty(int pri): 현재 그룹에 포함된 스레드가 가질 수 있는 최대 우선순위를 설정한다.
  • String getName(): 현재 그룹의 이름을 반환한다.
  • ThreadGroup getParent(): 현재 그룹의 부모 그룹을 반환한다.
  • boolean parentOf(ThreadGroup g): 현재 그룹이 매개값으로 지정한 스레드 그룹의 부모인지를 확인한다.
  • boolean isDaemon(): 현재 그룹이 데몬 그룹인지 여부를 반환한다.
  • void setDaemon(boolean daemon): 현재 그룹을 데몬 그룹으로 설정한다.
  • void list(): 현재 그룹에 포함된 스레드와 하위 그룹에 대한 정보를 출력한다.
  • void interrupt(): 현재 그룹에 포함된 모든 스레드들을 interrupt한다.

스레드 풀 생성 및 종료

네트웍 서비스 등을 운영하는데 요청하는 클라이언트 1개당 스레드 1개를 부여한다고 가정하자. 갑작스레 사용자가 폭증했을 때, 스레드를 그만큼 생성해서 병렬처리를 하려하면 스레드 생성 및 스케줄링에 드는 메모리로 CPU가 바빠지고 메모리 사용량이 늘어나고 애플리케이션의 성능이 저하된다.

이 경우 오히려 스레드를 제한된 개수만큼 정해놓고 작업 큐에 들어오는 작업들을 하나씩 처리하는 것이 좋다. 스레드 풀에서 이러한 기능을 제공한다. java.util.concurrent 패키지에 ExecutorService 인터페이스와 Executors 클래스가 제공된다. Executors의 다양한 정적 메소드를 이용해서 ExecutorService 구현 객체를 만들면 이것이 바로 스레드 풀이 된다.

스레드 풀 생성

  • 스레드풀을 생성한다는 의미는 ExecutorService를 구현하는 것이라고 볼 수 있다.
  • Executors 클래스의 정적 메소드를 이용해 구현할 수 있다.

초기 스레드 수, 코어 스레드 수, 최대 스레드 수에 대한 설명

  • 초기 스레드 수: ExecutorService(스레드)가 생성될 때 기본적으로 생성되는 스레드 수를 말한다.
  • 코어 스레드 수: 초기 스레드 수를 설정 후 사용되지 않는 스레드를 스레드 풀에서 제거할 때, 최소한 유지해야 할 스레드 수를 말한다.
  • 최대 스레드 수: 스레드 풀에서 관리하는 최대 스레드 수이다.

가변 스레드 풀

  • Executors.newCachedThreadPool()
    • 초기 스레드 수: 0
    • 코어 스레드 수: 0
    • 최대 스레드 수: Integer.MAX_VALUE

작업의 수에 맞춰 스레드 풀의 스레드 개수를 최대까지 늘려서 작업하는 가변적인 스레드 풀이다. 코어 스레드 수가 0이므로, 작업이 끝나면 스레드는 회수된다.

ExecutorService executorService = Executors.newCachedThreadPool();

고정 스레드 풀

  • Executors.newFixedThreadPool(int nThreads)
    • 초기 스레드 수: 0
    • 코어 스레드 수: nThreads
    • 최대 스레드 수: nThreads

고정적인 스레드 수만을 갖고 동작하는 고정 스레드 풀이다. 코어 스레드 수와 최대 스레드 수가 같으므로, 작업이 끝나도 스레드는 회수되지 않는다.

ExecutorService executorService = Executors.newFixedThreadPool(
  Runtime.getRuntime().availableProcessors() // 런타임에 사용 가능한 CPU 코어 수 만큼 최대 스레드를 만든다.
);

사용자 정의 스레드 풀

ExecutorService threadPool = new ThreadPoolExecutor(
  3, // 코어 스레드 수
  100, // 최대 스레드 수
  120L, // 이 시간동안 놀고 있다면 제거
  TimeUnit.SECONDS, // 놀고 있는 시간의 단위 (120초 동안 놀고 있으면 제거)
  new SynchronousQueue<Runnable>() // 작업 큐
);

스레드 풀 종료

스레드 풀은 데몬 스레드가 아니기 때문에, main 스레드가 종료되어도 유지된다. main 스레드가 아닌 스레드가 살아있다면, 당연히 애플리케이션도 유지된다.

ExecutorService의 메소드를 이용해 스레드 풀을 종료할 수 있다.

  • void shutdown(): 현재 작업 및 작업 큐에 있는 모든 작업을 처리한 뒤에 스레드 풀을 종료한다.
  • List<Runnable> shutdownNow(): 현재 작업 처리 중인 스레드를 interrupt하여 작업 중지 시도 후 스레드 풀을 종료시킨다. 리턴 값은 미처리된 작업 목록이다.
  • boolean awaitTermination(long timeout, TimeUnit unit): shutdown() 메소드 호출 이후, 모든 작업 처리를 timeout 내에 완료하면 true를 리턴하고, 완료하지 못하면 작업 처리 중인 스레드를 interrupt하고 false를 리턴한다.

일반적으로 남아있는 작업을 마무리하고 스레드를 종료하면 .shutdown(), 남아있는 작업과 상관없이 강제로 종료할 때, shutdownNow()를 호출한다.

executorService.shutdown();
executorService.shutdownNow();

스레드 풀 작업 생성 및 처리 요청

작업 생성

  • 스레드 작업은 Runnable 혹은 Callable로 표현한다.
    • Runnable: 반환 값이 없는 작업
    • Callable: 반환 값이 있는 작업
Runnable task = new Runnable() {
  @Override
  public void run() {
    // 스레드가 처리할 작업 내용
  }
}
Callable<T> task = new Callable<T>() {
  @Override
  public T call() throws Exception {
    // 스레드가 처리할 작업 내용
    return T;
  }
}

작업 처리 요청

  • ExecutorService의 구현체인 스레드 풀에 위의 작업(Runnable 혹은 Callable)을 넣는 것을 말한다.

작업 처리 요청 메소드 execute() (반환 값이 없는 메소드)

  • void execute(Runnable command)

반환 값이 없는 메소드는 작업 처리 도중 예외가 발생하면, 스레드가 종료되고 해당 스레드가 스레드 풀에서 제거된다.

작업 처리 요청 메소드 submit() (반환 값이 있는 메소드)

  • Future<?> submit(Runnable task)
  • Future<V> submit(Runnable task, V result)
  • Future<V> submit(Callable<V> task)

반환 값이 있는 메소드는 작업 처리 도중 예외가 발생해도, 스레드가 종료되지 않고, 다음 작업을 위해 재사용된다.
스레드 생성의 오버헤드를 줄이기 위해서는 submit()을 사용하는 것이 좋다.

작업 처리 요청 예제 코드

public class ExecuteExample {
    public static void main(String[] args) {
        // create thread pool that sets fixed nThreads as 2.
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        for (int i = 0; i < 10; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
                    int poolSize = threadPoolExecutor.getPoolSize();
                    String threadName = Thread.currentThread().getName();

                    System.out.println("[total thread count: " + poolSize + "] task thread name: " + threadName);

                    int value = Integer.parseInt("three");
                }
            };

            executorService.execute(runnable);
            Future<?> future = executorService.submit(runnable);
            
            try {
                System.out.println("future = " + future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

            try {
                Thread.sleep(10); // sleep 10 milliseconds to make console to print
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        executorService.shutdown();
    }
}

executesubmit을 번갈아가며 주석처리하면서 실행해 볼 수 있다.

execute() 메소드로 스레드 작업처리 결과

execute() 시에는 위와 같이 스레드의 이름이 계속 변경되는 것을 볼 수 있다. 그 이유는 위에 설명했다시피 execute의 경우 예외가 발생하면 새로운 스레드를 생성하기 때문이다.

submit() 메소드로 스레드 작업처리 결과

submit() 시에는 스레드의 이름이 변하지 않는데, 이는 예외가 발생해도 새로운 스레드를 생성하지 않기 때문이다.

submit() 시에 주의할 점은 반드시 Future 객체의 결과를 확인해야 한다는 것이다. Future 객체를 열어보지 않으면 안에서 예외가 터져도 무슨 일이 일어났는지 전혀 알 수 없다.

profile
풀스택 웹개발자로 일하고 있는 Jake Seo입니다. 주로 Jake Seo라는 닉네임을 많이 씁니다. 프론트엔드: Javascript, React 백엔드: Spring Framework에 관심이 있습니다.

0개의 댓글