멀티 스레드의 상태 제어

임준영·2021년 4월 10일
0

멀티 스레드의 상태 제어

1. 스레드 상태

스레드는 객체를 생성하고, start() 메소드를 호출하면 곧바로 스레드가 실행되는 것처럼 보이지만 실은 실행 대기 상태가 됩니다. 실행 대기 상태란 아직 스케줄링이 되지 않아서 실행을 기다리고 있는 상태를 말합니다. 실행 대기 상태에 있는 스레드 중에서 스레드 스케줄링으로 선택된 스레드가 비로서 CPU를 점유하고 run() 메소드를 실행합니다.

이때를 실행(Running)상태라고 합니다. 실행 상태의 스레드는 run() 메소드를 모두 실행하기 전에 스레드 스케줄링에 의해 다시 실행 대기 상태로 돌아 갈 수 있습니다. 그리고 실행 대기 상태에 있는 다른 스레드가 선택되어 실행 상태가 됩니다. 이렇게 스레드는 실행 대기 상태와 실행 상태를 번갈아가면서 자신의 run() 메소드를 조금씩 실행합니다. 실행 상태에서 run() 메소드가 종료되면, 더 이상 실행할 코드가 없기 때문에 실행은 멈추게 됩니다. 이 상태를 종료 상태라고 합니다.

Untitled Diagram (1)

경우에 따라서 스레드는 실행 상태에서 실행 대기 상태로 가지 않을 수도 있습니다. 실행 상태에서 일시 정지 상태로 가기도 하는데, 일시 정지 상태는 스레드가 실행할 수 없는 상태입니다. 일시 정지 상태는 WAITINGM, TIMED_WATING, BLOCKED가 있습니다. 스레드가 다시 실행 상태로 가기 위해서는 일시 정지 상태에서 실행 대기 상태로 가야 한다는것을 알아둬야 합니다.

Untitled Diagram

이러한 스레드의 상탱를 코드에서 확인할 수 있도록 하기 위해 자바 5부터 Thread 클래스에 getState() 메소드가 추가 되었습니다. getState() 메소드는 아래 표처럼 스레드 상태에 따라서 Thread.State 열거 상수를 리턴합니다.

상태열겨 상수설명
객체 생성NEW스레드 객체가 생성, 아직 start() 메소드가 호출되지 않은 상태
실행 대기RUNNABLE실행 상태로 언제든지 갈 수 있는 상태
일시정지WAITING다른 스레드가 통지할 때까지 기다리는 상태
일시정지TIMED_WAITING주어진 시간 동안 기다리는 상태
일시정지BLOCKED사용하고자 하는 객체의 락이 풀릴 때까지 기다리는 상태
종료TERMINATED실행을 마친 상태

아래 코드는 스레드의 상태를 출력하는 StatePrintThread 클래스 입니다. 생성자 매개값으로 받는 타겟 스레드 상태를 0.5초 주기로 출력합니다.

타겟 스레드의 상태를 출력하는 스레드

public class StatePrintThread extends Thread {

    private Thread targetThread;

    public StatePrintThread(Thread targetThread) {
        this.targetThread = targetThread;
    }

    @Override
    public void run() {

        while(true){

            Thread.State state = targetThread.getState();
            System.out.println("타겟 스레드 상태: " + state);

            if(state == Thread.State.NEW){
                targetThread.start();
            }

            if(state == State.TERMINATED){
                break;
            }


            try {
                Thread.sleep(500);
            }catch (Exception e) {}

        }
    }
}

다음은 타겟 스레드 클래스입니다. 처음 for문에서 10억 번 루핑을 돌게 해서 RUNNABLE 상태를 유지하고 그 후에 sleep() 메소드를 호출해서 1.5초간 TIME_WAITING 상태를 유지합니다. 그리고 마지막으로 다시 for문으로 10억번 루핑을 돌게 해서 RUNNABLE 상태를 유지합니다.

타겟 스레드

public class TargetThread extends Thread {

    @Override
    public void run() {
    
        for (int i = 0; i < 1000000000; i++) {}

        try{
            Thread.sleep(1500);
        } catch (Exception e){}

        for (int i = 0; i < 1000000000; i++) {}
    }
}

TargetThread가 객체로 생성되면 NEW 상태를 가지고, run() 메소드가 종료되면 TERMINATED 상태가 되므로 결국 아래와 같은 상태로 변합니다.

NEW -> RUNNABLE -> TIME_WAITING -> RUNNABLE -> TERMINATED

다음은 StatePrintThread를 생성해서 매개값으로 전달 받은 TargetThread의 상태를 출력하도록 작성된 실행 클래스입니다.

2. 스레드 상태 제어

사용자는 미디어 플레이어에서 동영상을 보다가 일시 정지시킬 수도 있고, 종료시킬 수도 있습니다. 일시 정지는 조금 후 다시 동영상을 보겠다는 의미로 미디어 플레이어는 동영상 스레드를 일시 정지 상태로 만들어야 합니다. 그리고 종료는 더 이상 동영상을 보지 않겠다는 의미이므로 미디어 플레리어는 스레드를 종료 상태로 만들어야 합니다. 이와 같이 실행 중인 스레드의 상태를 변경하는 것을 스레드 상태 제어라고 합니다.

멀티 스레드 프로그램을 만들기 위해서는 정교한 스레드 상태 제어가 필요한데, 상태 제어가 잘못되면 프로그램은 불안정해져서 먹통이 되거나 다운됩니다. 멀티 스레드 프로그래밍이 어렵다고 하는 이유는 여기에 있습니다. 스레드는 잘 사용하면 약이 되지만, 잘못 사용하면 치명적인 프로그램의 버그가 되기 때문에 스레드를 정확하게 제어하는 방법을 잘 알고 있어야 합니다. 스레드 제어를 제대로 하기 위해서는 스레드의 상태 변화를 가져오는 메소드를 파악하고 있어야 합니다. 아래 그림은 상태 변화를 가져오는 메소드의 종류를 보여줍니다.

Untitled Diagram (2)

위 그림에서 취소선을 가진 메소드는 스레드의 안전성을 해친다고 하여 더 이상 사용하지 않도록 권장된 Deprecated 메소드들 입니다.

스레드 상태를 제어하는 대표적인 메소드는 아래와 같습니다

interrupt() : 일시 정지 상태의 스레드에서 InterruptedException 예외를 발생시켜, 예외처리 코드(catch)에서 실행 대기 상태로 가거나 종료 상태로 갈 수 있도록 합니다

notify(), notifyAll() : 동기화 블록 내에서 wait() 메소드에 의해 일시 정지 상태에 있는 스레드를 실행 대기 상태로 만듭니다.

resume() : suspend() 메소드에 의해 일시 정지 상태에 있는 스레드를 실행 대기 상태로 만듭니다.

  • Deprecated (대신 notify(), notifyAll() 사용)

sleep(long mills), sleep(long mills, int nanos): 주어진 시간 동안 스레드를 일시 정지 상태로 만듭니다. 주어진 시간이 지나면 자동적으로 실행 대기 상태가 됩니다.

join(), join(long mills), join(long mills, int nanos) : join() 메서드를 호출한 스레드는 일시 정지 상태가 됩니다. 실행 대기 상태로 가려면 join() 메서드를 맴버로 가지는 스레드가 종료되거나, 매개값으로 주어진 시간이 지나야 합니다.

wait(), wait(long mills), wait(long mills, int nanos) : 동기화(synchronized) 블록 내에서 스레드를 일시 정지 상태로 만듭니다. 매개값으로 주어진 시간이 지나면 자동적으로 실행 대기 상태로 됩니다. 시간이 주어지지 않으면 notify(), notifyAll() 메소드에 의해 실행 대기 상태로 갈 수 있습니다.

suspend() : 스레드를 일시 정지 상태로 만듭니다. resume() 메소드를 호출하면 다시 실행 대기 상태가 됩니다.

  • Deprecated(대신 wait() 사용)

    yield() : 실행 중에 우선순위가 동일한 다른 스레드에게 실행을 양보하고 실행 대기 상태가 됩니다.

    stop() : 스레드를 즉시 종료시킵니다

  • Deprecated

    위에서 설명한 wait(), notify(), notifyAll()은 Object 클래스의 메소드이고, 그 이외의 메소드는 모두 Thread 클래스의 메소드들입니다. wait(), notify(), notifyAll Object 클래스의 메소드이고, 그 이외의 메소드는 모두 Thread 클래스의 메소드들 입니다.

주어진 시간동안 일시정지(sleep())

실행 중인 스레들를 일정 시간 멈추게 하고 싶다면 Thread 클래스의 정적 메소드인 sleep()을 사용하면 됩니다. 다음과 같이 Thread.sleep() 메소드를 호출한 스레드는 주어진 시간 동안 일시 정지 상태가 되고, 다시 실행 대기 상태로 돌아갑니다.

try{
    Thread.sleep(1000);
}catch(Exception e){
    // interrupt() 메소드가 호출되면 실행
}

매개값에는 얼마 동안 일시 정지 상태로 있을 것인지, 밀리세컨드(1/1000) 단위로 시간을 주면 됩니다. 위와 같이 1000이라는 값을 주면 스레드는 1초가 경과할 동안 일시 정지 상태로 있게 됩니다. 일시 정지 상태로 주어진 시간이 되기 전에 interrupt() 메소드가 호출되면 InterruptedException이 발생하기 때문에 예외 처리가 필요합니다.
아래 예제는 3초 주기로 비프(beep)음을 10번 발생 시킵니다.

import java.awt.*;

public class SleepExample {
    public static void main(String[] args) {

        Toolkit toolkit = Toolkit.getDefaultToolkit();

        for (int i = 0; i < 10; i++) {
            toolkit.beep();

            try {
                Thread.sleep(3000);
            } catch (Exception e) {

            }
        }
    }
}

메인 스레드를 3초 동안 일시 정지 상태로 보내고, 3초가 지나면 다시 실행 준비 상태로 돌아오도록 했습니다.

3. 다른 스레드에게 실행 양보

스레드가 처리하는 작업은 반복적인 실행을 위해 for문이나 while문을 포함하는 경우가 많습니다. 가끔은 이 반복문들이 무의미한 반복을 하는 경우가 있습니다.

public void run(){
    while(true){
        if(work){
            System.out.println("ThreadA 작업 내용");
        }
    }
}

스레드가 시작되어 run() 메소드를 실행하면 while(true) {} 블록을 무한 반복 실행합니다. 만약 work 값이 false라면 그리고 work의 값이 false에서 true로 변경되는 시점이 불명확하다면, while 문은 어떠한 실행문도 실행하지 않고 무의미한 반복을 합니다. 이것보다는 다른 스레드에게 실행을 양보하고 자신은 실행 대기 상태로 가는 것이 전체 프로그램 성능에 도움이 됩니다. 이런 기능을 위해서 스레드는 yield() 메소드를 제공하고 있습니다. yield() 메소드를 호출한 스레드는 실행 대기 상태로 돌아가고 동일한 우선순위 또는 높은 우선순위를 갖는 다른 스레드가 실행 기회를 가질 수 있도록 해줍니다.

Untitled Diagram

다음 코드는 의미 없는 반복을 줄이기 위해 yield() 메소드를 호출해서 다른 스레드에게 실행 기회를 주도록 수정하였습니다.

public void run(){
    while(true){
        if(work){
            System.out.println("ThreadA 작업 내용");
        } else {
            Thread.yield();
        }
    }
}

다음 예제에서는 처음 실행 후 3초 동안은 ThreadA와 ThreadB가 번갈아가며 실행됩니다. 3초 뒤에 메인 스레드가 ThreadA의 work 필드를 false로 변경함으로써 ThreadA는 yield() 메소드를 호출합니다. 따라서 이후 3초 동안에는 ThreadB가 더 많은 실행 기회를 얻게 됩니다. 메인 스레드는 3초 뒤에 다시 ThreadA의 work 필드를 true로 변경해서 ThreadA와 ThreadB가 번갈아가며 실행하도록 합니다. 마지막으로 메인 스레드는 3초 뒤에 ThreadA와 ThreadB의 stop 필드를 true로 변경해서 두 스레드가 반복 작업을 중지하고 종료하도록 합니다.

스레드 실행 양보 예제 코드

public class YieldExample {
    public static void main(String[] args) {
        ThreadA threadA = new ThreadA();
        ThreadB threadB = new ThreadB();

        // ThreadA, ThreadB 모두 실행
        threadA.start();
        threadB.start();

        try{
            Thread.sleep(3000);
        } catch (InterruptedException e) {}

        // ThreadB만 실행
        threadA.work = false;

        try{
            Thread.sleep(3000);
        }catch (InterruptedException e) {}

        // ThreadA, ThreadB 모두 실행
        threadA.work = true;

        // ThreadA, ThreadB 모두 종료
        threadA.stop = true;
        threadB.stop = true;
    }
}

ThreadA 클래스

public class ThreadA extends Thread {

    public boolean stop = false; // 종료 플래그
    public boolean work = true; // 작업 진행 여부 플래그

    @Override
    public void run() {
        while (!stop) { // stop이 true가 되면 while 문 종
            if (work) {
                System.out.println("ThreadA 작업 내용");
            } else {
                Thread.yield(); // work가 false가 되면 다른 스레드에게 실행 양보
            }
        }

        System.out.println(this.getName() + "종료");
    }
}

ThreadB 클래스

public class ThreadB extends Thread {

    public boolean stop = false; // 종료 플래그
    public boolean work = true; // 작업 진행 여부 플래그

    @Override
    public void run() {
        while (!stop) { // stop이 true가 되면 while 문 종
            if (work) {
                System.out.println("ThreadB 작업 내용");
            } else {
                Thread.yield(); // work가 false가 되면 다른 스레드에게 실행 양보
            }
        }

        System.out.println(this.getName() + "종료");
    }
}

4. 다른 스레드의 종료를 기다림(join())

스레드는 다른 스레드와 독립적으로 실행하는 것이 기본이지만 다른 스레드가 종료될 때까지 기다렸다가 실행해야 하는 경우가 발생할 수도 있습니다. 예를 들어 계산 작업을 하는 스레드가 모든 계산 작업을 마쳤을 때, 계산 결과값을 받아 이용하는 경우가 이에 해당합니다. 이런 경우를 위해서 Thread는 join() 메서드를 제공하고 있습니다. 아래 그림을 보고 이해하면 됩니다. ThreadA가 ThreadB의 join() 메소드를 호출하면 ThreadA는 ThreadB가 종료할 때까지 일시 정지 상태가 됩니다. ThreadB의 run() 메소드가 종료되면 비로소 ThreadA는 일시 정지에서 풀려 다음 코드를 실행하게 됩니다.

Untitled Diagram (1)

다음 예제를 보면 메인 스레드는 SumThread가 계산 작업을 모두 마칠 때까지 일시 정지 상태에 있다가 SumThread가 최종 계산된 결과값을 산출하고 종료하면 결과값을 받아 출력합니다.

public class SumThread extends Thread {

    private long sum;

    public long getSum(){
        return sum;
    }

    public void setSum(long sum){
        this.sum = sum;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
    }
}
public class JoinExample {
    public static void main(String[] args) {

        SumThread sumThread = new SumThread();
        sumThread.start();

        try {
            sumThread.join();
        } catch (Exception e) {
        }

        System.out.println("1 ~ 100 합: " + sumThread.getSum());
    }
}

JoinExample 클래스에서 sumThread.join()를 주석 처리하고 실행하면 1~100까지의 합을 출력하는 콘솔 창에 결과 값이 0이 나옵니다. 그 이유는 SumThread가 계산 작업을 완료하지 않는 상태에서 합을 먼저 출력하기 때문입니다.
스레드는 하나의 독립적인 실행코드기 때문에 발생하는 문제입니다.

5. 스레드간 협엽(wait(), notify(), notifyAll())

경우에 따라서는 두개의 스레드를 교대로 번갈아가며 실행해야 할 경우가 있습니다. 정확한 교대 작업이 필요할 경우, 자신의 작업이 끝나면 상대방 스레드를 일시 정지 상태에서 풀어주고, 자신은 일시 정지 상태로 만드는 것입니다. 이 방법의 핵심은 공유 객체에 있습니다. 공유 객체는 두 스레드가 작업할 내용을 각각 동기화 메소드로 구분해 놓습니다.
한 스레드가 작업을 완료하면 notify() 메소드를 호출해서 일시 정지 상태에 있는 다른 스레드를 실행 대기 상태로 만들고, 자신은 두 번 작업을 하지 않도록 wait() 메소드를 호출하여 일시 정지 상태로 만듭니다.

Untitled Diagram

만약 wait() 대신 wait(long timeout)이나, wait(long timeoutm, int nanos)를 사용하면 notify()를 호출하지 않아도 지정된 시간이 지나면 스레드가 자동적으로 실행 대기 상태가 됩니다. notify() 메소드와 동일한 역할을 하는 notifyAll() 메소드도 있는데, notify()는 wait()에 의해 일시 정지된 모든 스레드들을 실행 대기 상태로 만듭니다. 이 메소드들은 Thread 클래스가 아닌 Object 클래스에 선언된 메소드이므로 모든 공유 객체에서 호출이 가능합니다. 주의할 점은 이 메소드들은 동기화 메소드 또는 동기화 블록 내에서만 사용할 수 있습니다. 다음 예제는 두 스레드 작업을 WorkObject의 methodA()와 methodB()에 정의해 두고, 두 스레드 ThreadA와 ThreadB가 교대로 methodA()와 methodB()를 호출하도록 했습니다.

두 스레드의 작업 내용을 동기화 메소드로 작성한 공유 객체

public class WorkObject {
    public synchronized void methodA(){
        System.out.println("ThreadA의 methodA() 작업 실행");
        notify(); // 일시 정지 상태에 있는 ThreadB를 실행 대기 상태로 만듬.
        try {
            wait(); // ThreadA를 일시 정지 상태로 만듬.
        } catch (Exception e){}

    }

    public synchronized void methodB(){
        System.out.println("ThreadB의 methodB() 작업 실행");
        notify();  // 일시 정지 상태에 있는 ThreadA를 실행 대기 상태로 만듬.

        try {
            wait(); // ThreadB를 일시 정지 상태로 만듬.
        } catch (Exception e) {}
    }
}

WorkThreadA 클래스

public class WorkThreadA extends Thread {

    private  WorkObject workObject;
     // 공유 객체를 매개값으로 받아 필드에 저장
    public WorkThreadA(WorkObject workObject) {
        this.workObject = workObject;
    }
    // 공유 객체의 methodA()를 10번 반복 호출
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
           workObject.methodA();
        }
    }
}

WorkThreadB 클래스

public class WorkThreadB extends Thread {

    private WorkObject workObject;
    // 공유 객체를 매개값으로 받아 필드에 저장
    public WorkThreadB(WorkObject workObject) {
        this.workObject = workObject;
    }
    // 공유 객체의 methodB()를 10번 반복 호출
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            workObject.methodB();
        }
    }
}
public class WaitNotifyExample {
    public static void main(String[] args) {

        // 공유객체 생성
        WorkObject sharedObject = new WorkObject();

        WorkThreadA threadA = new WorkThreadA(sharedObject);
        WorkThreadB threadB = new WorkThreadB(sharedObject);

        // ThreadA와 ThreadB를 실행
        threadA.start();
        threadB.start();
        
    }
}

실행 결과

스크린샷 2020-01-01 오후 10 40 55

실행 결과 WorkThreadA와 WorkThreadB가 번갈아가면서 공유 객체의 methodA, methodB를 수행하는 것을 알 수가 있습니다.

다음 예제는 데이터를 저장하는 스레드(생산자 스레드)가 데이터를 저장하면, 데이터를 소비하는 스레드(소비자 스레드)가 데이터를 읽고 처리하는 교대 작업을 구현한 것입니다.

Untitled Diagram (1)

생산자 스레드는 소비자 스레드가 읽기 전에 새로운 데이터를 두 번 생성하면 안되고 (setData() 메소드를 두 번 실행하면 안됨), 소비자 스레드는 생산자 스레드가 새로운 데이터를 생성하기 전에 이전 데이터를 두 번 읽어서도 안 됩니다.(getData() 메소드를 두 번 실행하면 안 됨). 구현 방법은 공유 객체(DataBox)에 데이터를 저장할 수 있는 data 필드 값이 null이면 생산자 스레드를 실행 대기 상태로 만들고, 소비자 스레드를 일시 정지 상태로 만드는 것입니다. 반대로 data 필드의 값이 null이 아니면 소비자 스레드를 실행 대기 상태로 만들고, 생산자 스레드를 일시 정지 상태로 만들면 됩니다.

DataBox 클래스

public class DataBox {

    private String data;

    public synchronized String getData(){
        if(data == null){
            try {
                wait();
            } catch (InterruptedException e) {}
        }

        String resultValue = data;
        System.out.println("ConsumerThread가 읽은 데이터: " + resultValue);
        data = null;
        notify();

        return resultValue;
    }


    public synchronized void setData(String data){
       if(this.data != null){
           try {
               wait();
           } catch (InterruptedException e) {}
       }
        this.data = data;
        System.out.println("ProductThread가 생성한 데이터: " + data);
        notify();
    }
}

ProducerThread 클래스

public class ProducerThread extends Thread {

    private DataBox dataBox;

    // 공유 객체를 필드에 저장
    public ProducerThread(DataBox dataBox) {
        this.dataBox = dataBox;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 3; i++) {
            String data = "Data-" + i;
            dataBox.setData(data); // 새로운 데이터 저장
        }
    }
}

ConsumerThread 클래스

public class ConsumerThread extends Thread {

    private DataBox dataBox;

    public ConsumerThread(DataBox dataBox) {
        this.dataBox = dataBox;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 3; i++) {
            String data = dataBox.getData();
        }
    }
}

WaitNotiExample 클래스

public class WaitNotiExample {
    public static void main(String[] args) {

        DataBox dataBox = new DataBox();

        ProducerThread producerThread = new ProducerThread(dataBox);
        ConsumerThread consumerThread = new ConsumerThread(dataBox);

        producerThread.start();
        consumerThread.start();
    }
}

실행 결과

스크린샷 2020-01-01 오후 11 31 37

6. 스레드의 안전한 종료(stop 플래그, interrupt())

스레드는 자신의 run() 메소드가 모두 실행되면 자동적으로 종료된다. 경우에 따라서는 실행 중인 스레드를 즉시 종효할 필요가 있습니다. 예를 들어 동영상을 끝까지 보지 않고, 사용자가 멈춤을 요구할 수 있습니다. Thread는 스레드를 즉시 종료시키기 위해서 stop() 메소드를 제공하고 있는데, 이 메소드는 deprecated 되었습니다. 그 이유는 stop() 메소드로 스레드를 갑자기 종료하게 되면 스레드가 사용 중이던 자원들이 불완전한 상태로 남겨지기 때문입니다. 여기서 자원이란 파일, 네트워크 연결 등을 말합니다. 그렇다면 스레드를 즉시 종료시키기 위한 최선의 방법은 무엇일까요?

stop 플래그를 이용하는 방법

스레드는 run() 메소드가 끝나면 자동적으로 종료되므로, run() 메소드가 정상적으로 종료되도록 유도하는 것입니다. 다음 코드는 stop 풀래그를 이용해서 run() 메소드의 종료를 유도합니다.

public class XXXThread extends Thread{

    private boolean stop; // stop 플래그

    public void run(){
        while(!stop){
            // 스레드가 반복 실행하는 코드;
        }
        // 스레드가 사용한 자원 정리
    }
} 

위 코드에서 stop 필드가 false일 경우에는 while문의 조건식이 true가 되어 반복 실행하지만, stop 필드가 true일 경우에는 while문의 조건식이 false가 되어 while문을 빠져나옵니다. 그리고 스레드가 사용한 자원을 정리하고, run() 메소드가 끝나게 됨으로써 스레드는 안전하게 종료됩니다.

아래 예제는 PrintThread1을 실행한 후 1초 후에 PrintThread1을 멈추도록 setStop() 메소드를 호출합니다.

StopFlagExample 클래스

public class StopFlagExample {
    public static void main(String[] args) {

        PrintThread1 printThread1 = new PrintThread1();
        printThread1.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {}
            printThread1.setStop(true);
    }
}

PrintThread1 클래스

public class PrintThread1 extends Thread {

    private boolean stop;

    public PrintThread1() {
        System.out.println(stop);
    }

    public void setStop(boolean stop){
        this.stop = stop;
    }

    @Override
    public void run() {

        while(!stop){
            System.out.println("실행 중");
        }

        System.out.println("자원 정리");
        System.out.println("실행 종료");
    }
}

7. 데몬 스레드

데몬 스레드는 주 스레드의 작업을 돕는 보조적인 역할을 수행하는 스레드 입니다. 주 스레드가 종료되면 데몬 스레드는 강제적으로 자동 종료되는데, 그 이유는 주 스레드의 보조 역할을 수행하므로 주 스레드가 종료되면 데몬 스레드의 존재 의미가 없어지기 때문입니다. 이 점을 제외하면 데몬 스레드는 일반 스레드와 크게 차기아 없습니다. 데몬 스레드의 적용 예는 워드프로세서의 자동 저장, 미디어 플레이어의 동영상 및 음악 재생, 가비지 컬렉터 등이 있습니다. 이 기능들은 주 스레드 프로세스, 미디어 플레이어,JVM가 종료되면 같이 종료됩니다.

스레드를 데몬으로 만들기 위해서는 주 스레드가 데몬이 될 스레드의 setDaemon(true)를 호출해주면 됩니다. 아래 코드를 보면 메인 스레드가 주 스레드가 되고 AutoSaveThread가 데몬 스레드가 됩니다.

public static void main(String[] args){
    AutoSaveThread thread = new AutoSaveThread();
    thread.setDaemon(true);
    thread.start();
    ...
}

주의할 점은 start() 메소드가 호출되고 나서 setDaemon(true)를 호출하면 IllegalThreadStateException이 발생하기 때문에 start() 메소드 호출 전에 setDaemon(true)를 호출해야 됩니다.

현재 실행 중인 스레드가 데몬 스레드인지 아닌지를 구별하는 방법은 isDaemon()메소드의 리턴값을 조사해보면 됩니다. 데몬 스레드 일 경우 true를 리턴합니다. 다음 예제는 1초 주기로 save() 메소드를 자동 호출하도록 AutoSaveThread를 작성하고, 메인 스레드가 3초 후 종료되면 AutoSaveThread도 같이 종료되도록 AutoSaveThread를 데몬 스레드로 만들었습니다.

AutoSaveThread 클래스

public class AutoSaveThread extends Thread {

    private void save(){
        System.out.println("작업 내용을 저장함");
    }


    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {}
                save();
        }
    }
}

DaemonExample 클래스

public class DaemonExample {
    public static void main(String[] args) {

        AutoSaveThread thread = new AutoSaveThread();
        thread.setDaemon(true);
        thread.start();

        try {
            Thread.sleep(3000);
        }catch (InterruptedException e) {}


        System.out.println("메인 스레드 종료");
    }
}

8. 스레드 그룹

스레드 그룹은 관련된 스레드를 묶어서 관리할 목적으로 이용됩니다. JVM이 실행되면 system 스레드 그룹을 만들고, JVM 운영에 필요한 스레드들을 생성해서 system 스레드 그룹에 포함시킵니다. 그리고 sysyem 하위 스레드 그룹으로 main을 만들고 메인 스레드를 main 스레드 그룹에 포함시킵니다. 스레드는 반드시 하나의 스레드 그룹에 포함되는데, 명시적으로 스레드 그룹에 포함시키지 않으면 기본적으로 자신을 생성한 스레드와 같은 스레드 그룹에 속하게 됩니다. 우리가 생성하는 작업 스레드는 대부분 main 스레드가 생성하므로 기본적으로 main 스레드 그룹에 속하게 됩니다.

스레드 그룹 이름 얻기

현재 스레드가 속한 스레드 그룹의 이름을 얻고 싶다면 다음과 같은 코드를 사용 할 수 있습니다.

ThreadGroup group = Thread.currentThread().getThreadGroup();
String groupName = group.getName();

Thread의 정적 메소드인 getAllStackTraces()를 이용하면 프로세스 내에서 실행하는 모든 스레드에 대한 정보를 얻을 수 있습니다.

Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();

getAllStackTraces() 메소드는 Map 타입의 객체를 리턴하는데, 키는 스레드 객체이고, 값은 스레드의 상태 기록들을 갖고 있는 StackTraceElement[] 배열입니다.

// 현재 실행 중인 스레드 정보
public class ThreadInfoExample {
    public static void main(String[] args) {
        AutoSaveThread autoSaveThread = new AutoSaveThread();
        autoSaveThread.setName("AutoSaveThread");
        autoSaveThread.setDaemon(true);
        autoSaveThread.start();

        Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
        Set<Thread> threads = map.keySet();


        for (Thread thread : threads){
            System.out.println("Name: " + thread.getName() +
                    ((thread.isDaemon()) ? "(데몬)" : "(주)" ));
            System.out.print("\t" + "소속 그룹: " + thread.getThreadGroup().getName());
            System.out.println();
        }
        
    }
}

실행 결과

스크린샷 2020-01-02 오후 8 40 24

실행 결과를 보면 가비지 컬렉션을 담당하는 Finalizer 스레드를 비롯한 일부 스레드들이 system 그룹에 속하고, main() 메소드를 실행하는 main 스레드는 system 하위 그룹인 main 그룹에 속하는 것을 볼수 있습니다. 그리고 main 스레드가 실행시킨 AutoSaveThread는 main 스레드가 소속된 main 그룹에 포함되어 있는 것을 볼 수 있습니다.

9. 스레드 풀

병렬 작업 처리가 많아지면 스레드 개수가 증가되고 그에 따른 스레드 생성과 스케줄링으로 인해 CPU가 바빠져 메모리 사용량이 늘어납니다. 따라서 애플리케이션의 성능이 저하됩니다. 갑작스런 병렬 작업의 폭증으로 인한 스레드의 폭증을 막으려면 스레드 풀을 사용해야 합니다. 스레드풀은 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해 놓고 작업 큐에 들어오는 작업들을 하나씩 스레드가 맡아 처리합니다. 작업 처리가 끝난 스레드는 다시 작업 큐에서 새로운 작업을 가져와 처리합니다. 그렇기 때문에 작업 처리 요청이 폭증 되어도 스레드의 전체 개수가 늘어나지 않으므로 애플리케이션 성능이 급격히 저하되지 않습니다.

자바는 스레드 풀을 생성하고 사용할 수 있도록 java.util.concurrent 패키지에서 ExcutorService 인터페이스와 Excutors 클래스를 제공하고 있습니다. Excutors의 다양한 정적 메소드를 이용해서 ExcutorService 구현 객체를 만들 수 있는데, 이것이 바로 스레드 풀입니다. 아래 그림은 ExcutorService가 동작하는 방식을 보여줍니다.

Untitled Diagram (1)

스레드풀 생성 및 종료

ExcutorService 구현 객체는 Excutors 클래스의 다음 두 가지 메소드 중 하나를 이용해서 간편하게 생성할 수 있습니다.

  • newCachedThreadPool()
  • newFixedThreadPool(int nThreads)

초기 스레드 수는 ExcutorService 객체가 생성될 때 기본적으로 생성되는 스레드 수를 말하고, 코어 스레드 수는 스레드 수가 증가된 후 사용되지 않는 스레드를 스레드 풀에서 제거할 때 최소한 유지해야 할 스레드 수를 말합니다. 최대 스레드 수는 스레드 풀에서 관리하는 최대 스레드 수입니다. newCachedThreadPool() 메소드로 생성된 스레드 풀의 특징은 초기 스레드 개수와 코어 스레드 개수는 0이고, 스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시켜 작업을 처리합니다. 이론적으로 int 값이 가질 수 있는 최대값 만큼 스레드가 추가되지만, 운영체제의 성능과 상황에 따라 달라집니다. 1개 이상의 스레드가 추가되었을 경우 60초동안 추가된 스레드가 아무런 작업을 하지 않으면 스레드를 종료하고 풀에서 제거합니다. 다음은 newCachedThreadPool()을 호출해서 ExecutorService 구현 객체를 얻는 코드입니다.

ExecutorService excutorService = Executors.newCachedThreadPool();

newFixedThreadPool(int nThreads) 메소드로 생성된 스레드풀의 초기 스레드 개수는 0개이고, 코어 스레드 수는 nThreads 입니다. 스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시키고 작업을 처리합니다. 최대 스레드 개수는 매개값으로 준 nThreads 입니다. 이 스레드풀은 스레드가 작업을 처리하지 않고 놀고 있더라도 스레드 개수가 줄지 않습니다. CPU 코어의 수 만큼 최대 스레드를 사용하는 스레드 풀을 생성합니다.

ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

newCachedThreadPool()과 newFixedThreadPool() 메소드를 사용하지 않고 코어 스레드 개수와 최대 스레드 개수를 설정하고 싶다면 직접 ThreadPoolExecutor 객체를 생성하면 됩니다. 사실 위 두가지 메소드 내부적으로 ThreadPoolExecutor 객체를 생성해서 리턴합니다. 다음은 초기 스레드 개수가 0개, 코어 스레드 개수가 3개, 최대 스레드 개수가 100개인 스레드 풀을 생성합니다. 그리고 코어 스레드 3개를 제외한 나머지 추가된 스레드가 120초 동안 놀고 있을 경우 해당 스레드를 제거해서 스레드 수를 관리합니다.

ExecutorService threadPool = new ThreadPoolExecutor(
    3,    //코어 스레드 개수
    100,  //최대 스레드 개수
    120L, //놀고 있는 시간
    TimeUnit.SECONDS, // 놀고 있는 시간 단위
    new SynchronousQueue<Runnable>() // 작업 큐 
);

스레드 풀 종료

스레드풀의 스레드는 기본적으로 데몬 스레드가 아니기 때문에 main 스레드가 종료되더라도 작업을 처리하기 위해 게속 실행되는 상태로 남아있습니다. 그래서 main()메소드가 실행이 끝나도 애플리케이션 프로세스는 종료되지 않습니다. 애플리케이션을 종료하려면 스레드풀을 종료시켜 스레드들이 종료상태가 되도록 처리해줘야 합니다. ExecutorService는 종료와 관련해서 다음 세 개의 메소드를 제공하고 있습니다.

리턴타입메소드(매개변수)설명
voidshutdown()현재 처리 중인 작업뿐만 아니라 작업 큐에 대기하고 있는 모든 작업을 처리한 뒤에 스레드풀을 종료시킵니다.
List<Runnable>shutdownNow()현재 작업 처리 중인 스레드를 interrupt해서 작업 중지를 시도하고 스레드풀을 종료시킵니다. 리턴값은 작업 큐에 있는 미처리된 작업(Runnable)의 목록입니다.
booleanawaitTermination(long timeout, TimeUnit unit)shutdown() 메소드 호출 이후, 모든 작업 처리를 timeout 시간 내에 완료하면 true를 리턴하고, 완료하지 못하면 작업 처리 중인 스레드를 interrupt하고 false를 리턴합니다.

남아있는 작업을 마무리하고 스레드풀을 종료할 때에는 shutdown()을 일반적으로 호출하고, 남아있는 작업과는 상관없이 강제로 종료할 때에는 shutdownNow()를 호출합니다.

executorService.shutdown();
또는
executorService.shutdownNow();

10. 작업 생성과 처리 요청

작업 생성

하나의 작업은 Runnable 또는 Callable 구현 클래스로 표현합니다. Runnable과 Callable의 차이점은 작업 처리 완료 후 리턴값이 있느냐 없느냐 입니다. 다음은 작업을 정의하기 위해 Runnable과 Callable을 구현 클래스를 작성하는 방법을 보여줍니다.

// Runnable 구현 클래스
Runnable task = new Runnable(){
    @Override
    public void run(){
        //스레드가 처리할 작업 내용
    }
}
//Callable 구현 클래스
Callable<T> task = new Callable<T>(){

    @Override
    public T call() throws Exception{
        //스레드가 처리할 작업 내용
        return T;
    }
}

Runnable의 run() 메소드는 리턴값이 없고, Callable의 call() 메소드는 리턴 값이 있습니다. call()의 리턴 타입은 implements Callable에서 지정한 T타입입니다. 스레드풀의 스레드는 작업 큐에서 Runnable 또는 Callable 객체를 가져와 run()과 call() 메소드를 실행합니다.

작업 처리 요청

작업 처리 요청이란 ExecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위를 말합니다. ExecutorService는 작업 처리 요청을 위해 다음 두가지 종류의 메소드를 제공합니다.

리턴타입메소드(매개변수)설명
voidexecute(Runnable command)Runnalbe을 작업 큐에 저장, 작업 처리 결과를 받지 못함
Future<?>, Future<V>, Future<V>submit(Runnable task), submit(Runnable task, V result), submit(Callable\ task)Runnable 또는 Callable을 작업 큐에 저장, 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음

execute()와 submit() 메소드의 차이점은 두 가지 입니다.
하나는 execute()는 작업 처리 결과를 받지 못하고 submit()은 작업 처리 결과를 받을 수 있도록 Future를 리턴합니다. 또 다른 차이점은 execute()는 작업 처리 도중 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드풀에서 제거됩니다. 따라서 스레드풀은 다른 작업 처리를 위해 새로운 스레드를 생성합니다. 반면에 submit()은 작업 처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용 됩니다. 그렇기 때문에 가급적이면 스레드의 생성 오버헤더를 줄이기 위해서 submit()을 사용하는 것이 좋습니다.

아래 예제는 Runnable 작업을 정의할 때 Integer.parseInt("삼")을 넣어 NumberFormatException이 발생하도록 유도했습니다. 10개의 작업을 execute()와 submit() 메소드로 각각 처리 요청했을 경우 스레드풀의 상태를 살펴보겠습니다. 먼저 execute() 메소드로 작업 처리를 요청했을 경우를 보겠습니다.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecuteExample {
    public static void main(String[] args) throws Exception {
        // 최대 스레드 개수가 2인 스레드풀 생성
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        System.out.println(Runtime.getRuntime().availableProcessors());

        for (int i = 0; i < 10; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    // 스레드 총 개수 및 작업 스레드 이름 출력
                    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
                    int poolSize = threadPoolExecutor.getPoolSize();
                    String threadName = Thread.currentThread().getName();
                    System.out.println("[총 스레드 개수: " + poolSize + "] 작업 스레드 이름: " + threadName);

                    // 예외 발생 시킴
                    int value = Integer.parseInt("삼");
                }
            };
            // 작업 처리요청. 리턴값이 존재하지 않음
            executorService.execute(runnable);
            // 콘솔에 출력 시간을 주기 위해 0.01초 일시 정지 시킴
            Thread.sleep(10);
        }

        executorService.shutdown();
    }
}

아래는 위 코드의 실행 결과 입니다. 스레드풀의 스레드 최대 개수는 2는 변함이 없지만,실행 스레드의 이름을 보면 모두 다른 스레드가 작업을 처리하고 있습니다. 이것은 작업 처리 도중 에외가 발생했기 때문에 해당 스레드는 제거되고 새로운 스레드가 계속 생성되기 때문입니다.

실행결과

스크린샷 2020-01-05 오후 10 38 42

이번에는 submit() 메소드로 작업 처리를 요청한 경우를 보겠습니다. 아래 실행 결과 화면을 보면 확실히 execute() 와의 차이점을 발견할 수 있습니다. 예외가 발생하더라도 스레드가 종료되지 않고 게속 재사용되어 다른 작업을 처리하고 있는 것을 볼 수 있습니다.

실행 결과

스크린샷 2020-01-05 오후 10 59 46

11. 블로킹 방식의 작업 완료 통보

ExecutorService의 submit() 메소드는 매개값으로 준 Runnable 또는 Callable 작업을 스레드풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴합니다.

리턴타입메소드(매개변수)설명
Future<?>Runnable(Runnable task)Runnalbe 또는 Callable을 작업 큐에 저장, 리턴된 Future를 통해 작업 처리결과를 얻음
Future<V>submit(Runnable task, V result)Runnalbe 또는 Callable을 작업 큐에 저장, 리턴된 Future를 통해 작업 처리결과를 얻음
Future<V>submit(Callable<V> task)Runnalbe 또는 Callable을 작업 큐에 저장, 리턴된 Future를 통해 작업 처리결과를 얻음

Future 객체는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가 (지연했다가=블로킹되었다가) 최종 결과를 얻는데 사용됩니다. 그래서 Future를 지연 완료 객체라고 합니다. Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴합니다. 이것이 블로킹을 사용하는 작업 완료 통보 방식입니다. 다음은 Future가 가지고 있는 get() 메소드를 설명한 표입니다.

리턴타입메소드(매개변수)설명
Vget()작업이 완료될 때까지 블로킹 되었다가 처리 결과 V를 리턴
Vget(long timeout, TimeUnit unit)timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 작업이 완료되지 않으면 TimeoutException을 발생시킴

리턴 타입인 V는 submit(Runnable task, V result)의 두 번째 매개값인 V 타입이거나 submit(Callable task)의 Callable 타입 파라미터 V 타입이다. 다음은 세 가지 submit() 메소드별로 Future의 get() 메소드가 리턴하는 값이 무엇인지 보여줍니다.

메소드작업 처리 완료 후 리턴타입작업 처리 도중 예외 발생
submit(Runnable task)future.get() -> nullfuture.get() -> 예외 발생
submit(Runnable task, Integer result)future.get() -> int 타입 값future.get() -> 예외 발생
submit(Callable task)future.get() -> String 타입 값 future.get() -> 예외 발생

Future를 이용한 블로킹 방식의 작업 완료 통보에서 주의할 점은 작업을 처리하는 스레드가 작업을 완료하기 전까지 get() 메소드가 블로킹되므로 다른 코드를 실행할 수 없습니다.

블로킹이 무엇이냐면...에를 들어 Future의 get() 메서드를 main 스레드에서 호출하게 되면 블로킹 되어서 다음 로직이 처리 되지 않게 됩니다. UI를 변경하거나 이벤트를 처리하는 곳에서는 화면이 멈춰있는 것처럼 사용자에게 보일 수도 있기 때문에 Future.get() 메서드는 사용할 때 주의해야 합니다.

이러한 문제를 피하기 위해서는 get() 메서드의 호출은 다른 스레드 또는 다른 스레드풀에서 사용되어야 합니다. 이러한 방식으로 바꾸면 블로킹 방식이 일어나더라도 기본 로직(메인 스레드)은 멈추지 않고 다음 로직을 이어서 처리할 수 있습니다. 메인 스레드에서 계속 처리를 하던 중 get() 메서드에서 결과 값이 반환되면, 그 값을 다시 main 스레드로 넘겨 받아서 사용하면 됩니다.

// 새로운 스레드를 생성해서 호출
new Thread(new Runnable() {
    @Override
    public void run(){
        try {
            future.get();
        } catch(Exception e){
            e.printStrackTrace();
        }
    }
}).start();

// 스레드풀의 스레드가 호출
executorService.submit(new Runnable(){
    @Override
    public void run(){
        try {
            future.get();
       } catch(Exception e){
            e.printStrackTrace();
        }
    }
})

Future 객체는 작업 결과를 얻기 위한 get() 메소드 이외에도 다음과 같은 메소드를 제공합니다.

리턴 타입메소드명(매개 변수)설명
booleancancel(boolean mayInterruptRunning)작업 처리가 진행 중일 경우 취소시킴
booleanisCancelled()작업이 취소되었는지 여부
booleanisDone()작업처리가 완료되었는지 여부

cancel() 메소드는 작업을 취소하고 싶을 경우 호출할 수 있습니다. 작업이 시작되기 전이라면 mayInterruptRunning 매개값과는 상관없이 작업 취소 후 true를 리턴하지만, 작업이 진행 중 이라면 mayInterruptRunning 매개값이 true일 경우에만 작업 스레드를 interrupt 합니다. 작업이 완료되었을 경우 또는 어떤 이유로 인해 취소될 수 없다면 cancel() 메소드는 false를 리턴합니다. isCancelled() 메소드는 작업이 완료되기 전에 작업이 취소되었을 경우에만 true를 리턴합니다. isDone() 메소드는 작업이 정상적, 예외, 취소등 어떤 이유에건 작업이 완료되었다면 true를 리턴합니다.

리턴값이 없는 작업 완료 통보

리턴값이 없는 작업일 경우는 Runnable 객체로 생성하면 됩니다. Runnable 객체를 생성하는 방법을 보여줍니다.

Runnable task = new Runnable(){
    @Override
    public void run(){
        // 스레드가 처리할 작업 내용
    }
};

결과값이 없는 작업 처리 요청은 submit(Runnable task) 메소드를 이용하면 됩니다. 결과값이 없음에도 불구하고 다음과 같이 Future 객체를 리턴하는데, 이것은 스레드가 작업 처리를 정상적으로 완료했는지, 아니면 어떤 작업 처리 도중에 예외가 발생했는지 확인하기 위해서입니다.

Future future = executorService.submit(task);

작업 처리가 정상적으로 완료되었다면 Future.get()메소드는 null을 리턴하지만 스레드가 작업 처리 도중 interrupt되면 InterruptException을 발생시키고, 작업 처리 도중 예외가 발생하면 ExecutionException을 발생시킵니다. 그래서 아래와 같은 예외 처리 코드가 필요합니다.

try {
    future.get();
} catch(InterruptedException e){
    // 작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
} catch(ExecutionException e){
    // 작업 처리 도중 예외가 발생된 경우 실행할 코드
}

다음 에제는 리턴값이 없고 단순히 1부터 10까지 합을 출력하는 작업을 Runnable 객체로 생성하고, 스레드풀의 스레드가 처리하도록 요청한 것입니다.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class NoResultExample {
    public static void main(String[] args) {

        // CPU의 코어 수만큼 스레드풀의 스레드들을 생성함.
        int coreCount = Runtime.getRuntime().availableProcessors();
        System.out.println(coreCount);
        ExecutorService executorService = Executors.newFixedThreadPool(coreCount);

        System.out.println("[작업 처리 요청]");

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for (int i = 1; i <= 10; i++) {
                    sum += i;
                }
                System.out.println("스레드 이름: " + Thread.currentThread().getName());
                System.out.println("[처리 결과]: " + sum);
            }
        };

        Future future = executorService.submit(runnable);

        try {
            future.get();
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            System.out.println("[실행 예외 발생함]" + e.getMessage());
        }

        executorService.shutdown();
    }
}

실행 결과

스크린샷 2020-01-06 오전 12 04 48

리턴값이 있는 작업 완료 통보

스레드풀의 스레드가 작업을 완료한 후에 애플리케이션이 처리 결과를 얻어야 된다면 작업 객체를 Callable로 생성하면 됩니다.다음은 Callable 객체를 생성하는 코드인데, 주의할 점은 제네릭 타입 파라미터 T는 call() 메소드가 리턴하는 타입이 되도록 합니다.

Callabe<T> task = new Callable<T>(){
    @Override
    public T call() throws Exception(){
        // 스레드가 처리할 작업 내용
        return T;
    }
};

Callable 작업의 처리 요청은 Runnable 작업과 마찬가지로 ExecutorService의 submit() 메소드를 호출하면 됩니다. submit() 메소드는 작업 큐에 Callable 객체를 저장하고 즉시 Future를 리턴합니다. 이때 T는 call() 메소드가 리턴하는 타입입니다.

Future<T> future = execitprService.submit(task);

스레드풀의 스레드가 Callable 객체의 call() 메소드를 모두 실행하고 T 타입의 값을 리턴하면, Future의 get() 메소드는 블로킹이 해제되고 T타입의 값을 리턴하게 됩니다.

try {

} catch(InterruptedExcpetion e){
    // 작업 처리 도중 스레드가 interrup 될 경우 실행할 코드
} catch (ExecutionException e){
    // 작업 처리 도중 예외가 발생된 경우 실행할 코드
}

결론은 동기, 비동기 메소드에 상관없이 Future 객체의 get() 메소드는 리턴 값을 받기 위해서는 쓰레드의 작업이 완전히 종료되어야 받을 수 있기 때문에 블로킹 된다는 것을 명심합시다.

다음 예제는 1부터 10까지의 합을 리턴하는 작업을 Callable 객체로 생성하고, 스레드풀의 스레드가 처리하도록 요청한 것입니다.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultByCallableExample {
    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());


        Callable<Integer> task = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 1; i <= 10; i++) {
                    sum += i;
                }

                return sum;
            }
        };
        Future<Integer> future = executorService.submit(task);

        try {
           int sum = future.get();
            System.out.println("[처리 결과] " + sum);
            System.out.println("[작업 처리 완료] ");


        } catch (Exception e) {
            System.out.println("[실행 예외 발생함]" + e.getMessage());
        }

        executorService.shutdown();
    }
}

12. 작업 처리 결과를 외부 객체에 저장

상황에 따라서 스레드가 작업한 결과를 외부 객체에 저장해야 할 경우도 잇습니다. 예를 들어 스레드가 작업 처리를 완료하고 외부 Result 객체에 작업 결과를 저장하면, 애플리케이션이 Result 객체를 사용해서 어떤 작업을 진행할 수 있을 것입니다. 대게 Result 객체는 공유 객체가 되어, 두 개 이상의 스레드 작업을 취합할 목적으로 이용됩니다.

스크린샷 2020-01-06 오후 9 12 42

이런 작업을 하기 위해서 ExecutorService의 submit(Runnable task, V result) 메소드를 사용할 수 있는데, V가 바로 Result 타입이 됩니다. 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹 되었다가 작업을 완료하면 V 타입 객체를 리턴합니다. 리턴된 객체는 submit()의 두 번째 매개값으로 준 객체와 동일한데, 차이점은 스레드의 처리 결과가 내부에 저장되어 있다는 것입니다.

Result result = '...'
Runnable task = new Task(result);
Future<Result> future = executorService.submit(task, result);
result = future.get();

작업 객체는 Runnable 구현 클래스로 생성하는데, 주의할 점은 스레드에서 결과를 저장하기 위해 외부 Result 객체를 사용해야 하므로 생성자를 통해 Result 객체를 주입받도록 해야 합니다.

class Task implements Runnable{

    private Result result;

    public Task(Result result){
        this.result = result;
    }
    @Override
    public void run(){
        // 작업 코드
        // 처리 결과를 result 저장
    }
}

다음 예제는 1부터 10까지의 합을 계산하는 두 개의 작업 스레드풀에 처리 요청하고, 각각의 스레드가 작업 처리를 완료한 후 산출된 값을 외부 Result 객체에 누적하도록 했습니다.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultByRunnableExample {
    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        System.out.println("[작업 처리 요청]");
        class Task implements  Runnable{

            private Result result;

            public Task(Result result){
                this.result = result;
            }

            @Override
            public void run() {
                int sum = 0;
                for (int i = 1; i <= 10; i++) {
                    sum += i;
                }
                result.addValue(sum); // Result 객체에 작업 결과 저장
            }
        }

        Result result = new Result();
        Runnable task1 = new Task(result);
        Runnable task2 = new Task(result);

        Future<Result> future1 = executorService.submit(task1, result);
        Future<Result> future2 = executorService.submit(task2, result);

        try {
            result = future1.get();
            result = future2.get();
            System.out.println("[처리 결과] " + result.accumValue);

        } catch (Exception e){
            e.printStackTrace();
            System.out.println("[실행 예외 발생함] " + e.getMessage());
        }

        executorService.shutdown();
    }
}

class Result{
    int accumValue;
    synchronized void addValue(int value){
        accumValue += value;
    }
}

13. 작업 완료 순으로 통보

작업 요청 순서대로 작업 처리가 완료되는 것은 아닙니다. 작업의 양과 스레드 스케줄링에 따라서 먼저 요청한 작업이 나중에 완료되는 경우도 발생합니다. 여러개의 작업들이 순차적으로 처리될 필요성이 없고, 처리 결과도 순차적으로 이용할 필요가 없다면 작업처리가 완료된 것부터 결과를 얻어 이용하면 됩니다.
스레드풀에서 작업 처리가 완료된 것만 통보받는 방법이 있는데, CompletionService를 이용하는 것입니다. CompletionService는 처리가 완료된 작업을 가져오는 poll()과 task() 메소드를 제공합니다.

리턴 타입메소드명(매개 변수)설명
Future<V>poll()완료된 작업의 Future를 가져옴. 완료된 작업이 없다면 즉시 null을 리턴합니다.
Future<V>poll(long timeout, TimeUnit unit)완료된 작업의 Future를 가져옴. 완료된 작업이 없다면 timeout까지 블로킹 됩니다.
Future<V>take()완료된 작업의 Future를 가져옵니다. 완료된 작업이 없다면 있을 때까지 블로킹 됩니다.
Future<V>submit(Callable task)스레드풀에 Callable 작업 처리 요청
Future<V>submit(Runnable task, V result)스레드풀에 Runnable 작업 처리 요청

CompletionService를 구현 클래스는 ExecutorCompletionService입니다. 객체를 생성할 때 생성자 매개값으로 ExecutorService를 제공하면 됩니다.

ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<V> completionService = new ExecutorCompletionService<V>(
    executorService
)

poll()과 take() 메소드를 이용해서 처리 완료된 작업의 Future를 얻으려면 CompletionService의 submit() 메소드로 작업 처리 요청을 해야합니다.

completionService.submit(Callable<V> task);
completionService.submit(Runnable task, V result);

다음은 take() 메소드를 호출하여 완료된 Callable 작업이 있을 때까지 블로킹되었다가 완료된 작업의 Future를 얻고, get() 메소드로 결과값을 얻어내는 코드입니다. while문은 애플리케이션이 종료될 때까지 반복 실행해야 하므로 스레드풀의 스레드에서 실행하는 것이 좋습니다.

executorService.submit(new Runnable()){
    @Override
    public void run(){
        while(true){
            try{
                // 완료된 작업이 있을 때까지 블로킹 / 완료된 작업이 있으면 Future를 리턴합니다.
                Future<Integer> future = completionService.take();
                // get()은 블로킹되지 않고 바로 작업결과를 리턴합니다.
                int value = future.get();    
                System.out.println("[처리 결과] " + value);
            }catch (Exception e){
                break;
            }
        }   
    }
}

take() 메소드가 리턴하는 완료된 작업은 submit()으로 처리 요청한 작업의 순서가 아님을 명심해야 합니다. 작업의 내용에 따라서 먼저 요청한 작업이 나중에 완료될 수도 있기 때문입니다. 더 이상 완료된 작업을 가져올 필요가 없다면 take() 블로킹에서 빠져나와 while문을 종료해야 합니다.
ExecutorService의 shutdownNow()를 호출하면 take()에서 InterruptedException이 발생하고 catch 절에서 break가 되어 while문을 종료하게 됩니다. 다음 예제는 3개의 Callable 작업을 처리 요청하고 처리가 완료하는 순으로 작업의 결과값을 콘솔에 출력하도록 했습니다.

import java.util.concurrent.*;

public class CompletionServiceExample {
    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);

        System.out.println("작업 처리 요청");

        for (int i = 0; i < 3; i++) {
            // 스레드풀에게 작업 처리 요청
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;
                    for (int j = 1; j <= 10; j++) {
                        sum += j;
                    }
                    return sum;
                }
            });
        }
                                           
        // 스레드풀의 스레드에서 실행하도록 
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        // 완료된 작업 가져오기
                        Future<Integer> future = completionService.take();
                        int value = future.get();
                        System.out.println("[처리 결과] " + value);
                    } catch (Exception e) {
                        break;
                    }
                }
            }
        });

        // 3초 후 스레드풀 종
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        }

        executorService.shutdownNow();
    }
}

14. 콜백 방식의 작업 완료 통보

이번에는 콜백(callback) 방식을 이용해서 작업 완료 통보를 받는 방법에 대해서 알아보겠습니다. 콜백이란 애플리케이션이 스레드에게 작업 처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법을 말합니다. 이때 자동 실행되는 메소드를 콜백 메소드라고 합니다. 다음은 블로킹 방식과 콜백 방식을 비교한 그림입니다.

Untitled Diagram

블로킹 방식은 작업 처리를 요청한 후 작업이 완료될 때까지 블로킹되지만, 콜백 방식은 작업 처리를 요청한 후 결과를 기다릴 필요 없이 다른 기능을 수행할 수 있습니다. 그 이유는 작업 처리가 완료되면 자동적으로 콜백 메소드가 실행되어 결과를 알 수 있기 때문입니다.

아쉽게도 ExecutorService는 콜백을 위한 별도의 기능을 제공하지 않습니다. 하지만 Runnable 구현 클래스를 작성할 때 콜백 기능을 구현할 수 있습니다. 먼저 콜백 메소드를 가진 클래스가 있어야 하는데, 직접 정의해도 좋고 java.nio.channels.CompletionHandler를 이용해도 좋습니다. 이 인터페이스는 NIO 패키지에 포함되어 있는데 비동기 통신에서 콜백 객체를 만들때 사용됩니다. 그럼 CompletionHandler를 이용해서 콜백 객체를 만드는 방법을 살펴보겠습니다. 다음은 CompletionHandler 객체를 생성하는 코드입니다.

CompletionHandler(V, A) callback = new CompletionHandler<V, A>(){
    @Override
    public void completed(V result, A attachment){

    }
    @Override
    public void failed(Throwable exc , A attachment){
        
    }
};

CompletionHandler는 completed()와 failed() 메소드가 있는데, completed()는 작업을 정상 처리 완료했을 때 호출되는 콜백 메소드이고, failed() 작업 처리 도중 예외가 발생했을 때 호출되는 콜백 메소드입니다.
CompletionHandler의 V 타입 파라미터는 결과값의 타입이고, A는 첨부값의 타입입니다. 첨부값은 콜백 메소드에 결과값 이외에 추가적으로 전달하는 객체라고 생각하면 됩니다. 만약 첨부값이 필요없다면 A는 Void로 지정해주면 됩니다. 다음은 작업 처리 결과에 따라 콜백 메소드를 호출하는 Runnable 객체입니다.

Runnable task = new Runnable(){
    @Override
    public void run(){
        try{
            V result = ..;
            callback.completed(result, null);
        }catch(Exception e){
           callback.failed(e, null); 
        }
    }
};

0개의 댓글