멀티 스레드

megaseunghan·2022년 2월 13일
0
post-thumbnail

멀티 스레드

  • 프로세스 :

    • 운영체제에서 실행 중인 하나의 애플리케이션
    • 애플리케이션을 실행하면 운영체제로부터 실행에 필요한 메모리를 할당받아 애플리케이션의 코드를 실행하는 것
    • 같은 프로그램을 2개 실행했다면 2개의 프로세스가 생성된 것.
  • 멀티 태스킹 :

    두 가지 이상의 작업을 동시에 처리하는 것,

    운영체제는 멀티 태스킹을 할 수 있도록 CPU와 메모리 자원을 프로세스마다 적절하게 할당해주고 병렬로 실행시킨다.

  • 멀티 스레드

    하나의 프로세스가 두 개의 작업을 처리할 때,

    동영상 플레이어는 동영상 재생음악 재생의 역할을한다. 멀티 스레드는 애플리케이션 내부에서의 멀티 태스킹이라고 볼 수 있다.

  • 멀티 프로세스와 멀티 스레드

멀티 프로세스들은 운영체제에서 할당받은 자신의 메모리를 가지고 실행하기 때문에 서로 독립적이다. 따라서 하나의 프로세스에서 오류가 발생해도 다른 프로세스에게 영향을 미치지 않는다.
하지만 멀티 스레드는 하나의 프로세스 내부에 생성되기 때문에 하나의 스레드가 예외를 발생시키면 나머지 프로세스 자체가 종료될 수 있어 다른 스레드도 영향을 미치게 된다.

그렇기 때문에 멀티 스레드에서는 예외 처리에 만전을 기해야 한다.

멀티 스레드 사용의 예

  1. 대용량 데이터의 처리 시간을 줄이기 위해 데이터를 분할해서 병렬로 처리
  2. UI를 가지고 있는 애플리케이션에서 네트워크 통신을 위해 사용
  3. 다수 클라이언트의 요청을 처리하는 서버를 개발할 때

메인 스레드

모든 자바 애플리케이션은 메인 스레드가 main() 메서드를 실행하면서 시작된다.

첫 코드부터 순차적으로 아래로 실행되며 return문을 만나거나, 마지막 코드를 실행하면 종료된다.

  • 메인 스레드는 필요에 따라 작업 스레드를 만들어서 병렬로 코드를 실행할 수 있다. (멀티 스레드를 생성해서 멀티 태스킹을 수행한다. )

  • 싱글 스레드 애플리케이션에서는 메인 스레드가 종료하면 프로세스도 종료된다. 하지만 멀티 스레드 애플리케이션에서는 실행 중인 스레드가 하나라도 있다면 프로세스는 종료되지 않는다. ( 메인 스레드가 먼저 종료되더라도 작업 스레드가 실행중이라면 프로세스는 종료되지 않는다. )

작업 스레드 생성과 실행

  • Runnable 을 매개값으로 갖는 생성자를 호출해야 한다.

    Thread thread = new Thread(Runnable target);
    • Runnable은 작업 스레드가 실행할 수 있는 코드를 가지고 있는 객체라고 해서 붙여진 이름이다. 인터페이스 타입이기 때문에 구현 객체를 만들어 대입해야한다.
    • Runnable에는 run() 메서드 하나가 정의되어 있는데 구현 클래스는 run()을 재정의해서 작업 스레드가 실행할 코드를 작성해야 한다.
  • Runnable 구현 클래스 작성 방법
class Task implements Runnable {
    public void run() {
        스레드가 실행할 코드
    }
}
  • Runnable은 작업 내용을 가지고 있는 객체이지 실제 스레드는 아니다. Runnable 구현 객체를 생성한 후, 이것을 매개값으로 해서 Thread 생성자를 호출하면 비로소 작업 스레드가 생성된다.
Runnable task = new Task();

Thread thread = new Thread(task);
  • 코드를 좀 더 절약하기 위해 생성자를 호출할 때 Runnable 익명 객체를 매개값으로 사용할 수 있다. 이 방법이 더 많이 사용된다.
Thread thread = new Thread(new Runnable() {
    public void run() {
        스레드가 실행할 코드;
    }
});
  • Runnable 인터페이스는 run() 메소드 하나만 정의되어 있기 때문에 함수적 인터페이스이다. 따라서 다음과 같이 람다식을 매개값으로 사용할 수도 있다.
Thread thread = new Thread( () -> {
    스레드가 실행할 코드;
});
  • 작업 스레드는 생성 즉시 실행되는 것이 아니라, start() 메서드를 다음과 같이 호출해야만 비로소 실행된다.
thread.start();

start() 메서드가 호출되면, 작업 스레드는 매개값으로 받은 Runnable의 run() 메서드를 실행하여 자신의 작업을 처리한다.

0.5초 주기로 비프음을 발생시키면서 동시에 프린팅 하는 작업의 예제

메인스레드만 사용한 경우

import java.util.*; 

public class BeepPrintExam01 {
    public static void main(String[] args) {
        Toolkit toolkit = new Toolkit.getDefaultToolkit(); // Toolkit 객체 얻기
        for(int i = 0; i < 5; i++) {
            toolkit.beep();
            try {
                Thread.sleep(500);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
        
        for(int i = 0; i < 5; i++) {
            System.out.println("띵");
            try {
                Thread.sleep(500);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
}

메인 스레드는 비프음을 모두 발생한 다음, 프린팅을 시작한다.

메인 스레드 + 작업 스레드로 수정

  • 비프음 작업 스레드
public class BeepTask implements Runnable {
    public void run() {
        Toolkit toolkit = new Toolkit.getDefaultToolkit();
        for(int i = 0; i < 5; i++) {
            toolkit.beep();
            try {
                Thread.sleep(500);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  • 메인 스레드에 작업 스레드 import
public class BeepPrintExam02 {
    public static void main(String[] args) {
        Runnable beepTask = new BeepTask();
        Thread thread = new Thread(beepTask);
        thread.start();
        
        for(int i = 0; i < 5; i++) {
            System.out.println("띵");
            try {
                Thread.sleep(500);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
}

스레드의 이름

스레드는 자신의 이름을 가지고 있다. 큰 역할을 하는 것은 아니지만, 디버깅할 때 어떤 스레드가 어떤 작업을 하는지 조사할 목적으로 가끔 사용된다. 메인 스레드는 "main"이라는 이름을 가지고 있고, 우리가 직접 생성한 스레드는 자동적으로 "Thread-n"이라는 이름으로설정된다.

이름 설정 및 불러오기 메서드

  • setName :
    • thread.setName("스레드 이름"); 방식으로 설정할 수 있다.
  • getName :
    • thread.getName();으로 스레드의 이름을 알 수 있다.

setName과 getName은 Thread의 인스턴스 메서드이므로 스레드 객체의 참조가 필요하다.

스레드 우선순위

멀티 스레드는 동시성 또는 병렬성으로 실행되기 때문에 이 용어들에 대해 정확히 이해하는 것이 좋다.

동시성

  • 멀티 작업을 위해 하나의 코어에서 멀티 스레드가 번갈아가며 실행하는 성질

병렬성

  • 멀티 작업을 위해 멀티 코어에서 개별 스레드를 동시에 실행하는 성질

스케줄링

스레드의 개수가 코어의 개수보다 많을 경우, 스레드를 어떤 순서에 의해 동시성으로 실행할 것인가를 결정해야 하는데, 이를 스케줄링이라고 한다.

자바의 스레드 스케줄링

  • 우선순위 방식(Priority)과 순환 할당(Round-Robin)방식을 사용

우선순위 방식(Priority)

  • 우선순위가 높은 스레드가 실행 상태를 더 많이 가지도록 스케줄링.

  • 스레드들이 동시성을 가질 경우 우선적으로 실행할 수 있는 순위이다.

  • 코드로 제어가 가능하다.

  • 우선 순위는 1(낮음) ~ 10(높음)까지로 부여된다. 기본적으로 5가 부여된다.

  • setPriority() 메서드를 통해 할당할 수 있다. 아래는 Thread 우선 순위 상수

    • Thread.MAX_PRIORITY - 10
    • Thread.NORM_PRIORITY - 5
    • Thread.MIN_PRIORITY - 1

순환할당 방식(Round-Robin)

  • 시간 할당량을 정해서 하나의 스레드를 정해진 시간만큼 실행
  • JVM에 의해서 정해지기 때문에 코드로 제어할 수 없다.

동기화 메소드와 동기화 블록

공유 객체를 사용할 때의 주의할 점

  1. 스레드1에서는 변수값을 100으로 초기화하고 2초를 대기한다.
  2. 2초를 대기하는 동안 스레드2에서 변수값을 50으로 초기화한다.
  3. 스레드1이 변수값을 출력한다 -> 50
  4. 스레드2가 변수값을 출력한다 -> 100

멀티 스레드가 하나의 객체를 공유하므로 오류가 생길 수 있다. 공유 객체를 사용할 때 다른 스레드가 값을 변경하기 때문에 엉뚱한 값을 산출하는 문제가 있다.

동기화 메소드 및 동기화 블록

스레드가 사용 중인 객체를 다른 스레드가 변경할 수 없도록 하려면 스레드 작업이 끝날 때까지 객체에 잠금을 걸어서 다른 스레드가 사용할 수 없도록 해야한다.

멀티 스레드에서 단 하나의 스레드만 실행할 수 있는 코드 영역을 임계 영역(Critical Section)이라고 한다.

  • 자바는 이 임계 영역을 지정하기 위해 동기화 메서드 및 동기화 블록을 제공한다 - synchronized 키워드 사용

동기화 메소드 예

public synchronized void method() {
  임계 영역; // 단 하나의 스레드만 실행
}

동기화 메소드는 synchronize라는 키워드가 붙어있는 것을 볼 수 있다. 메서드의 전체 내용이 임계 영역이 된다.

실행 시에는 공유 객체에 잠금이 일어나고, 스레드가 동기화 메소드를 실행 종료하면 잠금이 풀린다. 일부 내용만 임계 영역으로 만들고 싶다면 동기화 블록을 사용한다.

동기화 블록 예

public void method() {
  // 여러 스레드가 실행 가능한 영역
  .
  .
  .
  synchronized(공유객체){
    임계 영역; // 단 하나의 스레드만 실행
  }
  // 여러 스레드가 실행 가능한 영역
  .
  .
  .
}

메소드 내부에 synchronized 키워드가 붙어있다. 매개변수로는 공유 객체를 받는다.

동기화 블록의 외부 코드들은 여러 스레드가 동시에 실행될 수 있지만 내부 코드는 임계 영역이 된다.

스레드 상태

스레드 객체를 생성하고 start() 메소드를 호출하면 곧바로 스레드가 실행되는 것처럼 보이지만 실행대기 상태가 된다.

  • 실행대기 상태란 아직 스케줄링이 되지 않아서 실행을 기다리는 상태이다.
  • 실행대기 상태에 있는 스레드 중에서 스레드 스케줄링으로 선택된 스레드가 비로소 CPU를 점유하고 run() 메소드를 실행한다. 이때를 실행 상태라고 한다.
  • 실행 상태 스레드는 메소드를 모두 실행하기 전에 다시 실행 대기 상태로 돌아갈 수 있다. 그리고 실행 대기 상태에 있는 다른 실행 대기 스레드가 선택되어 실행 상태가 된다.
  • 이런식으로 번갈아 가면서 자신의 run() 메소드를 실행하고 모든 run() 메소드가 종료되면 종료 상태가 된다.

경우에 따라 대기 상태로 가지 않을 수도 있다.

실행 상태에서 일시 정지 상태로 가기도 하는데 일시정지 상태는 스레드가 실행할 수 없는 상태이다.

  • 이러한 스레드의 상태를 확인하기 위해 자바 5부터 Thread 클래스에 getState()가 추가되었다.
  • getState()는 다음 표처럼 스레드 상태에 따라서 Thread.State 열거 상수를 리턴한다.
상태열거 상수설명
객체 생성NEW스레드 객체가 생성, 아직 start() 메서드가 호출되지 않은 상태
실행 대기RUNNABLE실행 상태로 언제든지 갈 수 있는 상태
일시 정지WAITING다른 스레드가 통지할 때까지 기다리는 상태
TIMED_WAITING주어진 시간 동안 기다리는 상태
BLOCKED사용하고자 하는 객체의 락이 풀릴 때까지 기다리는 상태
종료TERMINATED실행을 마친 상태

스레드의 상태를 출력하는 예시

  • 상태 출력 스레드
public class StatePrintThread extends Thread {
  private Thread targetThread;
  
  public StatePrintThread(Thread targetThread) {
    this.targetThread = targetThread;
  }
  
  public void run() {
    while(true) {
      Thread.State state = targetThread.getState(); // 스레드 상태 얻기
      System.out.println("타겟 스레드 상태 : " + state);
      
      if(state == Thread.State.NEW) { // 객체 생성 상태일 경우 실행대기 상태로 만듦
        targetThread.start();
      }
      
      if(state == Thread.State.TERMINATED) { // 종료 상태일 경우 while문 종료
        break;
      }
      
      try {
        Thread.sleep(500);
      } catch(Exception e) {}
    }
  }
}
  • 타겟 스레드
public class TargetThread extends Thread {
    public void run() {
        for (long i = 0; i < 1_000_000_000; i++) {
        }

        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (long i = 0; i < 1_000_000_000; i++) {
        }
    }
}
  • 실행 클래스 및 실행 결과
public class ThreadStateExample {

    public static void main(String[] args) {
        StatePrintThread statePrintThread = new StatePrintThread(new TargetThread());

        statePrintThread.start();
    }
}
타겟 스레드 상태NEW
타겟 스레드 상태TIMED_WAITING
타겟 스레드 상태TIMED_WAITING
타겟 스레드 상태TIMED_WAITING
타겟 스레드 상태RUNNABLE
타겟 스레드 상태TERMINATED

종료 코드 0()로 완료된 프로세스

스레드 상태 제어

  • 실행중인 스레드의 상태를 변경하는 것.
  • 멀티 스레드 프로그램을 만들기 위해서는 정교한 스레드 상태 제어가 필요한데, 상태 제어가 잘못되면 프로그램은 불안정해져서 먹통이 되거나 다운된다. 멀티 프로그래밍이 어려운 이유가 여기에 있다.

스레드의 상태 변화를 가져오는 메소드

메소드설명
interrupt()일시 정지 상태의 스레드에서 InterruptException 예외를 발생시켜, 예외 처리 코드에서 실행 대기 상태로 가거나 종료 상태로 갈 수 있도록 한다.
notify()
notyfyAll()
동기화 블록 내에서 wait() 메소드에 의해 일시 정지 상태에 있는 스레드를 실행 대기 상태로 만든다.
sleep(long mills)
sleep(long mills, int nanos)
주어진 시간 동안 스레드를 일시 정지 상태로 만든다. 주어진 시간이 지나면 자동적으로 실행 대기 상태가 된다.
join()
join(long mills)
join(long mills, int nanos)
join() 메소드를 호출한 스레드는 일시 정지 상태가 된다. 실행 대기 상태로 가려면 join()메소드를 멤버로 가지는 스레드가 종료되거나, 매개값으로 주어진 시간이 지나야 한다.
wait()
wait(long mills)
wait(long mills, int nanos)
동기화 블록 내에서 스레드를 일시 정지 상태로 만든다. 매개값으로 주어진 시간이 지나면 자동적으로 실행 대기 상태가 된다. 시간이 주어지지 않으면 notify(), notifyAll() 메소드에 의해 실행 대기 상태로 갈 수 있다.
yield()실행 중에 우선순위가 동일한 다른 스레드에게 실행을 양보하고 실행 대기 상태가 된다.
  • wait(), notify(), notifyAll()은 Object의 메소드이다. 그 외에는 모두 Thread 클래스의 메소드이다.ㄴ

주어진 시간동안 일시 정지(sleep())

실행 중인 스레드를 일정 시간 멈추게 하고 싶다면 Thread 클래스의 정적 메소드인 sleep()을 사용하면 된다.

  • sleep()
    • 매개값에는 얼마 동안 일시 정지 상태로 있을 것인지 밀리세컨드 단위로 시간을 주면된다.
    • InterruptException이 발생하기 때문에 예외 처리가 필요하다.

다른 스레드에게 실행 양보(yield())

스레드가 처리하는 작업은 반복적 실행을 위해 for문, while문을 포함하는 경우가 많다. 가끔은 이 반복문이 무의미한 반복을 하는 경우가 있다.

// #1. yield() 사용 X
pubic void run() {
  while(true) {
    if(work) {
      System.out.println("Thread-n 작업 내용");
    }
  }
}

// #2. yield() 사용 O
public void run() {
  while(true) {
    if(work) {
      System.out.println("Thread-n 작업내용");
    } else {
      Thread.yield();
    }
  }
}
  • while문의 work값이 false라면, 그리고 work의 값이 false에서 true로 변경되는 시점이 불명확하다면 무의미한 반복을 한다.
  • 다른 스레드에게 실행을 양보하고 자신은 실행 대기 상태로 가는 것이 프로그램 성능에 도움이 되는데 스레드는 이를 위해 yield() 메소드를 제공하고 있다.
  • yield()
    • 호출한 스레드는 실행 대기 상태로 돌아가고 동일 우선순위 또는 높은 우선순위 스레드가 실행 기회를 가질 수 있도록 함.

다른 스레드의 종료를 기다림(join())

스레드는 다른 스레드와 독립적으로 실행하는 것이 기본이지만, 다른 스레드가 종료될 때까지 기다렸다가 실행해야 하는 경우가 발생할 수도 있다. 예를 들어 작업을 하는 스레드가 모든 작업이 종료될 때, 계산 결과를 받아 이용하는 경우가 해당된다.

1~100까지의 합을 구하는 join() 예제

  • TargetThread
public class TargetThread extends Thread {

    int sum;

    public int getSum() {
        return sum;
    }

    public void setSum(int sum) {
        this.sum = sum;
    }

		@Override  
    public void run() {
        for (int i = 0; i < 99; i++) {
            sum += i + 1;
        }
    }
}
  • main
public class ThreadJoinExample {

    public static void main(String[] args) {
        TargetThread targetThread = new TargetThread();

        targetThread.start();

        try {
            targetThread.join(); // 연산 결과가 끝날때까지 기다림
        } catch (InterruptedException e) {}

        System.out.println(targetThread.getSum());
    }
}

join()이 없다면 연산이 끝나기 전에 getSum을 불러와 엉뚱한 값이 출력되는 것을 볼 수 있다.

스레드 간 협업(wait(), notify(), notifyAll())

동기화 메소드 또는 블록에서만 호출 가능한 Object의 메소드

경우에 따라서 두 개의 스레드를 교대로 번갈아가며 실행해야 할 때가 있다.

예 : 자신의 작업이 끝나면 상대방 스레드를 일시 정지 상태에서 풀어주고 자신은 실행 대기 상태로 돌아간다.

이 방법의 핵심은 공유 객체에 있다. 공유 객체는 두 스레드가 작업할 내용을 각각 동기화 메소드로 구분해 놓는다. A스레드가 일을 마치면 notify()를 호출하여 B스레드를 실행 상태로 바꾸고, A스레드는 2번 작업하지 않기 위해 wait()을 통해 실행 대기 상태로 만드는 식이다.

wait()은 총 3가지의 인자값을 받아 구성될 수 있다.

다소 차이가 있는데 아래를 살펴보자.

  • wait() : notify()를 기다려야 한다.
  • wait(long timeout), wait(long timeout, int nanos) : notify()를 호출하지 않아도 스레드가 일정 시간이 지나면 자동적으로 실행 대기 상태가 된다.

또한 notify()와 동일한 역할을 하는 notifyAll()도 있다. 이름만 봐도 유추가 쉽다. 실행 대기 중인 하나의 스레드를 여는 것과 실행 대기 중인 모든 스레드를 여는것의 차이이다.

예제

  • DataBox
package thread;

public class DataBox {

    private String data;

    public synchronized String getData() {
        if (this.data == null) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String returnValue = data;
        System.out.println("ConsumerThread가 읽은 데이터 : " + returnValue);
        data = null;
        notify();
        return returnValue;
    }

    public synchronized void setData(String data) {
        if (this.data != null) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.data = data;
        System.out.println("ProducerThread가 생성한 데이터 : " + data);
        notify();
    }
}
  • ProduceThread
package thread;

public class ProduceThread extends Thread {

    private DataBox dataBox;

    public ProduceThread(DataBox dataBox) {
        this.setName("ProducerThread");
        this.dataBox = dataBox;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 3; i++) {
            String data = "Data -" + i;
          	dataBox.setData(data);
        }
    }
}
  • ConsumerThread
package thread;

public class ConsumerThread extends Thread {

    private DataBox dataBox;

    public ConsumerThread(DataBox databox) {
        this.setName("ConsumerThread");
        this.dataBox = databox;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 3; i++) {
            String data = dataBox.getData();
        }
    }
}
  • Main
package thread;

public class WaitNotifyExample {

    public static void main(String[] args) {
        DataBox dataBox = new DataBox();

        ProduceThread produceThread = new ProduceThread(dataBox);
        ConsumerThread consumerThread = new ConsumerThread(dataBox);

        produceThread.start();
        consumerThread.start();
    }
}

스레드의 안전한 종료 (stop(), interrupt())

스레드는 자신의 run() 메소드가 끝나면 자동으로 종료된다. 하지만 경우에 따라서는 실행 중인 스레드를 즉시 종료할 필요가 있다. 동영상 멈춤 등이 이러한 경우에 해당한다고 볼 수 있다. Thread는 스레드를 즉시 종료시키기 위한 stop()을 통해 스레드를 즉시 종료시키지만, 갑자기 종료하게 되면 데이터가 불안전한 상태로 남겨지기 때문에 deprecated 되었다.

그렇다면 최선의 방법은 무엇이 있을까 ?

stop 플래그

boolean타입의 stop 플래그를 정의하여 run() 메소드가 정상적으로 종료되기를 유도한다.

public class XXXThread enxtends Thread {
  private boolean stop; // stop 플래그 필드
  
  @Override
  public void run()	{
    while(!stop) {
      스레드가 반복 실행하는 코드;
    }
    // 스레드가 사용한 자원 정리
  }
}

Interrupt()

메서드가 일시 정지 상태에 있을 때 InterruptedException 예외를 발생시키는 역할을 한다.

예를 들어 스레드의 sleep() 메서드는 surround try&catch 를 사용해야 하는데 이때 사용되는 catch문에서 위 예외를 처리한다. 그 뒤에 스레드.interrupt()를 통해 해당 스레드가 일시 정지 상태가 되면 안전하게 스레드가 종료된다.

  • 주목할 점 : 스레드가 실행 대기 또는 실행 상태에 있을 때 interrupt() 메소드가 실행되면 즉시 예외가 발생하지 않고 스레드가 미래에 일시 정지 상태가 되면 예외가 발생한다는 것이다.

  • Interrupted()isInterrupted()interrupt()를 호출 했다면 true를 반환한다.

    • interrupted : 정적메소드, 현재 스레드가 interrupted 되었는지 확인
    • isInterrupted : 인스턴스메소드, 현재 스레드가 interrupted 되었는지 확인
// #1 interrupted()
boolean status = Thread.interrupted();
// #2 isInterrupted()
boolean status = objThread.isInterrupted();

데몬 스레드

데몬 스레드는 주 스레드의 작업을 돕는 보조적 스레드의 역할을 한다. 주 스레드가 종료되면 데몬 스레드는 강제적으로 자동 종료된다.

자동 종료되는 이유는 데몬스레드는 주스레드의 보조 역할을 수행하기 때문에 주스레드가 종료되면 의미가 없기 때문이다.

스레드 ▶️ 데몬스레드 만들기

  • 스레드를 데몬스레드로 만들기 위해서는 주 스레드가 데몬이 될 스레드의 setDaemon(true)를 호출해주면 된다.
  • 주의할 점은 데몬스레드는 start()메소드가 호출 되고 나서 setDaemon(true)를 호출하면 IllegalThreadStateException이 발생하기 때문에 start()를 호출하기 전 setDaemon(true)를 호출해야 한다.
  • isDaemon() : 현재 실행중인 스레드가 데몬 스레드인지 구별한다. 데몬 스레드일 경우 true를 리턴한다.

데몬 스레드 활용 예제

아래 예제의 데몬스레드는 작업 내용을 자동으로 저장한다. 메인 스레드가 종료되면 자동적으로 강제 종료된다.

  • AutoSaveThread
public class AutoSaveThread extends Thread {
    public void save() {
        System.out.println("작업 내용 저장 완료");
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                break;
            }
            save();
        }
    }
}
  • Main
public class DaemonThreadExample {
    public static void main(String[] args) {
        AutoSaveThread autoSaveThread = new AutoSaveThread();

        autoSaveThread.setDaemon(true);
        autoSaveThread.start();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        }
        System.out.println("메인 스래드 종료");
    }
}

스레드 그룹

스레드 그룹은 관련된 스레드를 묶어서 관리할 목적으로 이용된다. JVM이 실행되면 system스레드 그룹을 만들고 JVM 운영에 필요한 스레드들을 생성해서 system스레드에 포함 시킨다.

스레드는 반드시 하나의 스레드 그룹에 포함되는데 명시적으로 이를 지정해주지 않는다면 자신을 생성한 스레드와 같은 스레드 그룹에 속하게 된다.

스레드 그룹 이름 얻기

ThreadGroup threadGroup = Thread.currentThread.getThreadGroup();
String groupName = threadGroup.getName();
  • Thread의 정적 메소드인 getAllStackTraces()를 이용하면 프로세스 내에서 실행하는 모든 스레드에 대한 정보를 얻을 수 있다.
Map<Thread, StackTraceElement[] map = Thread.getAllStackTraces(); // 키(스레드) : 값(스레드 상태 기록)

스레드 그룹 생성

명시적으로 스레드 그룹을 만들고 싶다면 다음 생성자 중 하나를 이용해서 ThreadGroup 객체를 만들면 된다.

ThreadGroup tg = new ThreadGroup(String name);
ThreadGruop tg = new ThreadGroup(ThreadGroup parent, String name);
  • ThreadGroup parent를 지정하지 않으면 현재 스레드가 속한 그룹의 하위 그룹으로 생성된다.
  • 새로운 스레드 그룹을 생성한 후에 이 그룹에 스레드를 포함시키기 위해서는 Thread 객체를 생성할 때 생성자 매개값으로 parent의 ThreadGroup을 지정하면 된다. 스레드 그룹을 매개값으로 갖는 Thread 생성자는 다음 네가지가 있다.
Thread t = new Thread(ThreadGroup group, Runnable target); // Runnable 구현 객체
Thread t = new Thread(ThreadGroup group, Runnable target, String name); // 스레드의 이름
Thread t = new Thread(ThreadGroup group, Runnable target, String name, long stackSize); // JVM이 할당할 stack크기
Thread t = new Thread(ThreadGroup group, String name); // 객체가 Runnable이 아닌 Thread를 extends할 때

스레드 그룹의 일괄 interrupt()

스레드를 스레드 그룹에 포함시키면 스레드 그룹이 interrupt()를 호출할 때 그룹에 속한 모든 스레드를 일괄로interrupt() 할 수 있다는 이점이 있다.

각자 interrupt()를 호출할 수도 있지만, 같은 스레드 그룹에 소속될 경우, 그룹의 interrupt()를 한 번만 호출해주면 된다!

  • 그림으로 이해하기

  • ThreadGroup이 가지고 있는 주요 메소드
자료형메소드설명
intactiveCount()현재 그룹 및 하위 그룹에서 실행되고 있는 모든 스레드의 수를 리턴
intactiveGroupCount()현재 그룹에서 활동 중인 모든 하위 그룹의 수를 리턴
voidcheckAccess()현재 스레드가 스레드 그룹을 변경할 권한이 있는지 체크
권한이 없으면 SecurityException을 발생시킨다.
voiddestroy()현재 그룹 및 하위 그룹을 모두 삭제한다.
단, 그룹 내 스레드가 모두 종료 상태여야 한다.
booleanisDestroyed()현재 그룹이 삭제되었는지 여부 리턴
intgetMaxPriority()현재 그룹에 포함된 스레드가 가질 수 있는 최대 우선순위를 리턴
voidsetMaxPriority(int pri)현재 그룹에 포함된 스레드가 가질 수 있는 최대 우선순위를 설정
StringgetName()현재 그룹의 이름을 리턴
ThreadGroupgetParent()현재 그룹의 부모 그룹을 리턴
booleanparentOf(ThreadGroup g)현재 그룹이 매개값으로 지정된 그룹의 부모 그룹인지를 리턴
booleanisDaemon()현재 그룹이 데몬 그룹인지 여부를 리턴
voidsetDaemon(boolean daemon)현재 그룹을 데몬 그룹으로 설정
voidlist()현재 그룹 및 포함된 스레드의 정보를 출력한다
voidinterrupt()현재 그룹에 포함된 모든 스레드들을 interrupt()한다.

스레드 그룹 생성, 정보 출력, 일괄 종료 예제

  • WorkThread
package threadGroup;

public class WorkThread extends Thread {
    public WorkThread(ThreadGroup threadGroup, String name) {
        super(threadGroup, name);
    }

    @Override
    public void run() {
        // 무한 루핑을 돌다가 interruptException을 만나면 break;
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println(getName() + "interrupted()");
                break;
            }
        }
        System.out.println(getName() + "종료!");
    }
}
  • Main
package threadGroup;

public class ThreadGroupExample {
    public static void main(String[] args) {
        ThreadGroup myGroup = new ThreadGroup("myGroup");
        WorkThread workThreadA = new WorkThread(myGroup, "workThreadA");
        WorkThread workThreadB = new WorkThread(myGroup, "workThreadB");

        workThreadA.start(); // 스레드 A 작업 시작
        workThreadB.start(); // 스레드 B 작업 시작

        System.out.println("list() 정보 출력");
        ThreadGroup maingroup = Thread.currentThread().getThreadGroup(); // 현재 스레드의 그룹
        maingroup.list(); // 스레드 그룹 list 출력
        System.out.println(); // 개행
        try {
            Thread.sleep(3000); // interrupt예외를 만들기 위함임
        } catch (InterruptedException e) {

        }
        System.out.println("myGroup 스레드 그룹의 interrupt() 메소드 호출");
        myGroup.interrupt(); // interrupt() 실행
    }
}
  • 실행결과는 다음과 같다
list() 정보 출력
java.lang.ThreadGroup[name=main,maxpri=10]
    Thread[main,5,main]
    Thread[Monitor Ctrl-Break,5,main]
    java.lang.ThreadGroup[name=myGroup,maxpri=10]
        Thread[workThreadA,5,myGroup]
        Thread[workThreadB,5,myGroup]

myGroup 스레드 그룹의 interrupt() 메소드 호출
workThreadA interrupted()
workThreadB interrupted()
workThreadB종료!
workThreadA종료!

종료 코드 0(으)로 완료된 프로세스

스레드 풀

병렬 작업 처리가 많아지면 스레드 개수가 증가되고 그에 따른 스레드 생성과 스케줄링으로 인해 CPU가 바빠져 메모리 사용량이 늘어난다.

따라서 애플리케이션의 성능이 저하되고, 이러한 병렬 작업의 폭증을 막으려면 스레드 풀을 사용해야 한다.

  • 스레드풀은 작업 처리에 사용되는 작업처리에 사용되는 스레드를 제한된 개수만큼 정해놓고 작업 큐에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다. 그렇기 때문에 작업처리가 끝난 스레드는 작업 처리 요청이 폭증되어도 스레드의 전체 개수가 늘어나지 않아 성능이 급격히 저하되지 않는다.
  • 자바는 스레드풀을 생성하고 사용할 수 있도록 java.util.concurrent 패키지에서 ExecutorService인터페이스와 Excecutors 클래스를 제공한다.

스레드 풀 생성 및 종료

ExecutorService 구현 객체는 Executors 클래스의 다음 두가지의 메소드 중 하나를 이용해서 간편하게 생성할 수 있다.

메소드명(매개 변수)초기 스레드 수코어 스레드 수최대 스레드 수
newCachedThreadPool()00Integer.MAX_VALUE
newFixedThreadPool(int nThreads)0nThreadsnThreads
  • 초기 스레드 수 : 객체가 생성될 떄 기본적으로 생성되는 스레드 수를 말한다.
  • 코어 스레드 수 : 스레드의 수가 증가된 후 사용되지 않는 스레드를 제거할 때, 최소한 남겨질 스레드의 수를 말한다.
  • 최대 스레드 수 : 스레드풀에서 최대 생성될 수 있는 스레드의 수를 말한다.
  • newCachedThreadPool : 초기 스레드 & 코어 스레드 개수가 0개이고, 스레드 개수보다 작업 개수가 많으면 새 스레드를 실행시켜 작업을 처리한다. 60초 동안 아무런 작업을 하지 않으면 스레드를 종료하고 풀에서 제거한다.
ExecutorService executorService = Executors.newCachedThreadPool();
  • newFixedThreadPool : 초기 스레드 개수는 0개이고 코어 스레드 수는 nThreads이다. 스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시키고 작업을 처리한다. 최대 스레드 개수는 매개값 nThreads이다. 스레드가 작업을 하지 않아도 제거하지 않는다.
ExecutorService excutorService = Executors.newFIxedThreadPool(
		Runtime.getRuntime().availableProcessors() // CPU 코어의 수만큼 최대스레드를 사용하는 스레드풀 생성
);
  • 위의 메소드를 사용하지 않고 코어스레드 개수와 최대 스레드 개수를 설정하고 싶다면 직접 ThreadPoolExecutor 객체를 생성하면 된다.
ExecutorService threadPool = new ThreadPoolExecutor(
	3, // 코어 스레드 개수
  100, //최대 스레드 개수 
  120L, // 놀고 있는 시간
  TimeUnit.SECONDS, // 놀고 있는 시간 단위
  new SynchronousQueue<Runnable> // 작업큐
);

스레드 풀 종료

스레드풀의 스레드는 기본적으로 데몬 스레드가 아니기 때문에 main스레드가 종료되어도 작업을 처리하기 위해 계속 실행 상태로 남아있다. 애플리케이션을 종료하려면 스레드풀을 종료시켜 스레드풀이 종료 상태가 되도록 처리해줘야 한다. ExecutorService는 종료를 위한 다음 세 개의 메소드를 제공한다.

리턴 타입메소드명설명
voidshutdown()현재 처리 중인 작업뿐 아니라 작업 큐에 대기하고 있는 모든 작업을 처리한 뒤에 스레드풀을 종료시킨다
ListshutdownNow()현재 작업 처리 중인 스레드를 interrupt해서 작업 중지를 시도하고 스레드풀을 종료시킨다.
리턴값은 작업 큐에 있는 미처리된 작업의 목록이다.
booleanawaitTermination (long timeout, TimeUnit unit)shutdown() 메소드 호출 이후, 모든 작업 처리를 timeout 시간 내에 완료하면 true, 그렇지 않으면 작업 처리 중인 스레드를 interrupt하고 false 리턴

일반적으로 shutdown()을 호출하고, 남아있는 작업에 상관 없이 강제 종료를 할 때에는 shutdownNow()를 호출한다.

작업 생성과 처리 요청

하나의 작업은 Callable, Runnable 구현 클래스로 표현한다. 이 둘의 차이점은 작업 처리 완료 후 리턴값의 유/무이다.

Runnable의 run() 메소드는 리턴값이 없고, Callable의 call() 메소드는 리턴값이 있다(implements Callable).

작업 생성

  • Runnable 구현 클래스
Runnable task = new Runnable() {
  @Override
  public void run() {
    // 스레드가 처리할 작업 내용
  }
}
  • Callable 구현 클래스
Callable<T> tast = new Callable<T>() {
  @Override
  public T call() throws Exception {
    // 스레드가 처리할 작업 내용
    return T;
  }
}

작업 처리 요청

작업 처리 요청이란 ExecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위를 말한다.

ExecutorService는 작업 처리 요청을 위해 다음 두가지 종류 메소드를 제공한다..

리턴 타입메소드명설명
voidexecute(Runnable command)- Runnable을 작업 큐에 저장
- 작업 처리 결과를 받지 못함
Future<?>submit(Runnable task)
submit(Runnable task, V result)
submit(Callable task)
- Runnable 또는 Callable을 작업 큐에 저장
- 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음
  • execute와 submit의 차이점
  1. execute()는 작업 처리 결과를 받지 못하고 submit()은 작업 처리 결과를 받을 수 있도록 Future를 리턴한다.
  2. execute()는 작업 처리 도중 예외가 발생하면스레드가 종료되고 해당 스레드는 스레드풀에서 제거된다. 따라서 스레드풀은 다음 작업을 위한 스레드를 새로 생성한다. 반면에 submit()은 작업 처리 도중 예외가 발생하면 스레드가 종료되지 않고 다음 작업에 이를 재사용한다.

따라서 스레드 생성 오버헤더를 줄이기 위해 submit()을 사용하는 것이 좋다.

예제

  • execute() 메소드로 작업 처리 요청한 경우와 submit() 메소드로 작업 처리 요청한 경우
package threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecuteVsSubmitExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executorervice = Executors.newFixedThreadPool(2);

        for (int i = 0; i < 10; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
                    int poolSize = threadPoolExecutor.getPoolSize();
                    String threadName = Thread.currentThread().getName();
                    System.out.println("총 스레드 개수 : " + poolSize + ", \t 작업 처리 : " + threadName);
                    int value = Integer.parseInt("삼");
                }
            };
//            executorService.execute(runnable); // execute 방법
            executorService.submit(runnable); // submit 방법
            Thread.sleep(10);
        }
        executorService.shutdown(); // 작업 스레드를 종료할 때는 thread pool을 종료해주는 것이 좋다.
    }
}

블록을 10번 반복하는 run() 메소드를 호출하면 execute() 메소드는 총 10개의 새로운 스레드를 생성한다. submit() 메소드는 스레드를 재활용하기 때문에 2개의 스레드만 사용해서 10번의 반복을 처리한다.

블로킹 방식의 작업 완료 통보

ExecutorService의 submit()메소드는 매개값으로 준 Runnable 또는 Callable 작업을 스레드풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴한다.

리턴 타입메소드명설명
Future<?>submit(Runnable task)- Runnable 또는 Callable을 작업 큐에 저장
- 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음
Futuresubmit(Runnable task, V result)
Futuresubmit(Callable task)

Future 객채

작업 결과가 아니라 작업이 완료될 때까지 기다렸다가 최종 결과를 얻는데 사용된다.

그래서 Future를 지연 완료 객체라고도 한다.

Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다. 이것이 블로킹을 사용하는 작업완료 통보 방식이다. 다음은 Future가 가지고 있는 메소드를 설명한 표이다.

리턴타입메소드명설명
Vget()작업이 완료될 때까지 블로킹되었다가 처리 결과 V를 리턴
Vget(long timeout, TimeUnit unit)timeout 시간 전에 작업이 완료되면 결과 V를 리턴, 작업이 완료되지 않으면 TimeoutException을 발생
  • 리턴 타입인 V는 submit(Runnable task, V result)의 두 번째 매개값인 V 타입이거나 submit(Callable task)의 Callable 타입 파라미터 V 타입이다. 다음은 Future의 get() 메소드가 리턴하는 값을 보여주는 표이다.
메소드작업 처리 완료 후 리턴 타입작업 처리 도중 예외 발생
submit(Runnable task)future.get() -> nullfuture.get() -> 예외 발생
submit(Runnable task, Integer result)future.get() -> int 타입 값future.get() -> 예외 발생
submit(Callable task)future.get() -> String 타입 값future.get() -> 예외 발생

Future의 get() 메소드는 블로킹이 되기 때문에 UI 스레드에서 호출하면 안된다.

UI 변경 및 이벤트를 처리하는 스레드가 get() 메소드를 호출하면 작업 완료 전까지 UI를 변경할 수 없는 상태가 되고 이벤트도 처리할 수 없게 된다.

따라서 새로운 스레드를 호출하거나, 스레드풀의 스레드가 get()을 호출해야 한다.

  • 새로은 스레드를 생성해서 호출
new Thread(new Runnable(){
  @Override
  public void run() {
    try {
     future.get();
    } catch(Exception e) {
      e.printStackTrace();
    }
  }
}).statr();
  • 스레드풀의 스레드가 호출
executorService.submit(new Runnable() {
  @Override
  public void run() {
    try {
      future.get();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
});

future의 다른 메소드

리턴타입메소드설명
booleancancel(boolean mayInterruptIfRunning)작업 처리가 진행중일 경우 취소 시킴
booleanisCancelled()작업이 취소되었는지 여부
booleanisDone()작업 처리가 완료되었는지 여부
  • cancel() : 작업시 시작되기 전이라면 mayInterruptIfRunning 매개값과는 상관없이 작업 취소 후 true를 리턴, 이미 진행 중이라면 mayInterruptIfRunning 매개값이 true일 경우에만 작업스레드를 interrupt()한다. 어떠한 이유로 인해 취소될 수 없다면 false를 리턴한다.
  • isCancelled() : 작업이 완료되기 전에 작업이 취소되었을 경우에만 true를 리턴한다.
  • isDone() : 작업이 정상적, 예외, 취소 등 어떤 이유에서건 작업이 완료되었다면 true를 리턴한다.

리턴값이 없는 작업 완료 통보

  1. 결과값이 없는 작업 처리 요청은 submit(Runnable task) 메소드를 이용하면 된다. 결과값이 없음에도 Future를 리턴하는데 이는 스레드가 작업 처리를 정상적으로 완료했는지, 작업 처리 도중에 예외가 발생했는지 확인하기 위해서이다.
  1. 작업 처리가 정삭적으로 완료되었다면 Future의 get() 메소드는 null을 리턴하지만, 스레드가 작업 처리 도중 interrupt되면 InterruptedException을 발생시키고, 작업 처리 도중 예외가 발생하면 ExecutionException을 발생시킨다.
Runnable task = new Runnable() {
  @Override
  public void run() {
    // 스레드가 처리할 작업 내용
  }
};

Future future = executorSerivce.submit(task);
  • 리턴값이 없는 작업 완료 통보 예제
import java.util.concurrent.*;

public class NoResultExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("작업 처리 요청");
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for (int i = 0; i < 10; i++) {
                    sum += i;
                    System.out.println("[처리 결과]" + sum);

                }
            }
        };
        Future future = executorService.submit(runnable);

        try {
            future.get();
            System.out.println("작업 처리 완료");
        } catch (Exception e) {
            System.out.println("실행 예외 발생함" + e.getMessage());
        }
        executorService.shutdown();
    }
}

리턴값이 있는 작업 완료 통보

스레드풀의 스레드가 작업을 완료한 후에 애플리케이션이 처리 결과를 얻어야 된다면 작업 객체를 Callable로 생성하면 된다. 주의할 점은 제네릭 타입 파라미터 T는 call() 메소드가 리턴하는 타입이 되도록 한다.

Callable<T> task = new Callable<T>() {
  @Override
  public T call() throws Exception {
    // 스레드가 처리할 작업 내용
    return T;
  }
};
Future<T> future = executorService.submit(task);
  • Callable 작업의 처리 요청은 Runnable 작업과 마찬가지로 ExecutorService의 submit() 메소드를 호출하면 된다. submit() 메소드는 작업 큐에 Callable 객체를 저장하고 즉시 Future를 리턴한다.

  • 스레드풀의 스레드가 Callable 객체의 call() 메소드를 모두 실행하고 T 타입의 값을 리턴하면 get() 메소드는 블로킹이 해제되고 T타입의 값을 리턴하게 된다.

try {
  T result = future.get();
} catch(InterruptException e) {
  // 작업 처리 도중 스레드가 interrupt 될 경우 실행 코드
} catch(ExecutionException e) {
  // 작업 처리 도중 예외가 발생된 경우 실행할 코드
}
  • 리턴값이 있는 작업 완료 통보
public class ResultByCallableExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("[작업 처리 요청]");
        Callable<Integer> task = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 0; i < 10; i++) {
                    sum += i;
                }
                return sum;
            }
        };
        Future future = executorService.submit(task);

        try {
            int sum = (int) future.get();
            System.out.println("[처리 결과]" + sum);
            System.out.println("[작업 처리 완료]");

        } catch (Exception e) {
            System.out.println("[실행 에외 발생함]" + e.getMessage());
        }
        executorService.shutdown();
    }
}

작업 처리 결과를 외부 객체에 저장

상황에 따라 스레드가 작업한 결과를 외부 객체에 저장해야 할 경우도 있다. 작업처리를 완료하고 외부 Result 객체에 작업 결과를 저장하는 방법으로 할 수 있다. 대개 Result 객체는 공유 객체가 되어, 두 개 이상의 스레드 작업을 취합할 목적으로 이용된다.

이런 작업을 하기 위해서 ExecutorService의 submit(Runnable task, V result) 메소드를 사용할 수 있는데, V가 바로 result 타입이 된다. 메소드를 호출하면 즉시 Future가 리턴되는데 Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 떄까지 블로킹 되었다가 완료시 V를 리턴한다. 리턴 객체 V는 submit()의 두 번째 매개값으로 준 객체와 동일하지만, 다른점으로 스레드 처리 결과가 내부에 저장되어 있다.

  • 작업 객체는 Runnable 구현 클래스로 생성하는데, 주의점은 스레드에서 결과를 저장하기 위해 외부 Result 객체를 사용해야 하므로 생성자를 통해 Result 객체를 주입받도록 해야 한다.
Result result = ...;
Runnable task = new Task(result);
Future<Result> future = executorService.submit(task, result);
result = future.get();
class Task implements Runnable {
  Result result;
  Task(Result result) { this.result = result; }
  @Override
  public void run() {
    // 작업 코드
    // 처리 결과를 result에 저장
  }
}
  • 작업 처리 결과를 외부 객체에 저장 예제
public class ResultByRunnableExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("[작업 처리 요청]");
        class Task implements Runnable {
            Result result;

            Task(Result result) {
                this.result = result;
            }

            @Override
            public void run() {
                int sum = 0;
                for (int i = 0; i < 10; i++) {
                    sum = +i;
                }
                result.addValue(sum);
            }
        }
        Result result = new Result();
        Runnable task = new Task(result);
        Runnable task2 = new Task(result);
        Future<Result> future1 = executorService.submit(task, result);
        Future<Result> future2 = executorService.submit(task2, result);

        try {
            result = future1.get();
            result = future2.get();
            System.out.println("[처리 결과]" + result.accumValue);
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("[실행 예외 발생함]" + e.getMessage());
        }
        executorService.shutdown();
    }
}

class Result {
    int accumValue;

    synchronized void addValue(int value) {
        accumValue += value;
    }
}

작업 완료 순으로 통보

작업 요청 순서대로 작업 처리가 완료되는 것은 아니다.

작업의 양과 스레드 스케줄링에 따라 먼저 요청한 작업이 나중에 완료되는 경우도 발생한다. 여러 개의 작업들이 순차적으로 처리될 필요성이 없고 처리 결과도 순차적으로 이용할 필요가 없다면 작업 처리가 완료된 것 부터 결과를 얻어 이용하는 것이 좋다.

  • 스레드풀에서 작업 처리가 완료된 것만 통보 받는 방법

    CompletionService는 처리 완료된 작업을 가져오는 poll(), take() 메소드를 제공함

    CompletionService의 구현 클래스는 ExecutorCompletionService(V)이다.

  • 메소드
리턴 타입메소드명설명
Futurepoll()완료된 작업의 Future를 가져옴
완료된 작업이 없다면 즉시 null을 리턴함
Futurepoll(long timeout, TimeUnit unit)완료된 작업의 Future를 가져옴
완료된 작업이 없다면 timeout까지 블로킹 됨
Futuretake()완료된 작업의 Future를 가져옴
완료된 작업이 없다면 있을 때까지 블로킹 됨
Futuresubmit(Callabel task)스레드풀에 Callable 작업 처리 요청
Futuresubmit(Runnable task, V result)스레드풀에 Runnable 작업 처리 요청
  • 객체 생성
ExecutorService executorService = Executors.newFixedThreadPool(
  	Runtime.getRuntime().availableProcessors()
  );
CompletionService<V> completionService = new ExecutorCompletionService<V>(executorService);
  • 작업 처리 요청 방법

poll(), take() 메소드를 통해서 처리 완료된 작업의 Future를 얻으려면 executorService가 아닌 completionService의 submit() 메소드를 사용한다.

completionService.submit(Callable<V> task);
completionService.submit(Runnable task, V result);
  • take() 메소드를 호출하여 완료된 Callable 작업이 있을 때까지 블로킹되었다가 완료된 Future를 얻고 get() 메소드로 결과값을 얻기
executorService.submit(new Runnable() { // 스레드풀의 스레드에서 실행하도록 함
  @Override
  public void run() {
    while(true) {
      try {
        Future<Integer> future = completionService.take(); // 완료된 작업이 있을 때까지 블로킹/완료된 작업이 있으면 Future를 리턴
        int value = future.get(); // get()은 블로킹되지 않고 바로 작업 결과를 리턴
        System.out.println("[처리 결과]" + value);
      } catch (Exception e) {
        break;
      }
    }
  }
});
  • 주의 : take() 메소드가 리턴하는 완료된 작업은 submit()으로 처리 요청한 작업의 순서가 아님을 명심해야함. 먼저 요청한 작업이 나중에 완료될 수도 있기 때문

  • 완료된 작업을 가져올 필요가 없다면 take() 블로킹에서 빠져나와 while문을 종료해야 한다.

  • ExecutorService의 shutdownNow()를 호출하면 take()에서 InterruptedExcetpion이 발생하고 catch절에서 break가 되어 while문을 종료하게 된다.

  • 작업 완료 순으로 통보 받는 예제
public class CompletionServiceExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);

        System.out.println("[작업 처리 요청]");
        for (int i = 0; i < 3; i++) {
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;
                    for (int i = 1; i <= 10; i++) {
                        sum += i;
                    }
                    return sum;
                }
            });
        }

        System.out.println("[처리 완료된 작업 확인]");
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Future<Integer> future = completionService.take();
                        int value = future.get();
                        System.out.println("[처리 결과] " + value);
                    } catch (Exception e) {
                        break;
                    }
                }
            }
        });

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {}
        executorService.shutdownNow();
    }
}

콜백 방식의 작업 완료 통보

콜백이란 애플리케이션이 스레드에게 작업 처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법이다. 이때 자동 실행되는 메소드를 콜백 메소드라고 한다.

블로킹 방식과 콜백 방식의 차이

  • 블로킹 방식

작업 처리를 요청한 후 작업이 완료될 때까지 블로킹된다.

  • 콜백 방식

작업 처리를 요청한 후 결과를 기다릴 필요 없이 다른 기능을 수행할 수 있다. 작업 처리가 완료되면 자동적으로 콜백 메소드 실행되어 결과를 알 수 있다.

ExecutorService는 콜백을 위한 별도의 기능을 제공하지 않는다. 하지만 Runnable 구현 클래스를 작성할 때 콜백 기능을 구현할 수 있다.

콜백 메서드를 가진 클래스가 있어야하는데, 이는 직접 정의하거나, java.nio.channels.CompletionHandler로 이용한다. NIO패키지의 Handler인터페이스는 비동기 통신에서 콜백 객체를 만들 때 사용된다.

  • CompletionHandler 객체 생성
    • completed() : 작업을 정상 처리 완료했을 때 호출되는 콜백 메소드
    • failed() : 작업 처리 도중 예외가 발생했을 때 호출되는 콜백 메소드
    • V : 결과값의 타입
    • A : 첨부값의 타입(콜백 메소드 결과값 이외에 추가적으로 전달하는 객체)
CompletionHandler<V, A> callback = new CompletionHandler<V, A>() {
  @Override
  public void completed(V result, A attachment) {
  }
  @Override
  public void failed(Throwable exc, A attachment) {
  }
};
  • 작업 처리 결과에 따라 콜백 메소드를 호출하는 Runnable 객체
Runnable task = new Runnable() {
  @Override
  public void run() {
    try {
      // 작업 처리
      V result = ...;
      callback.completed(result, null); // 작업 정상 처리시 호출
    } catch(Exception e) {
      callback.failed(e, null); // 예외가 발생 했을 경우 호출
    }
  }
};
  • 콜백 방식의 작업 완료 통보 받기 예제
public class CallbackExample {
    private ExecutorService executorService;

    public CallbackExample() {
        executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );
    }

    private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            System.out.println("completed() 실행 : " + result);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.out.println("failed() 실행 : " + exc.toString());
        }
    };

    public void doWork(final String x, final String y) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    int intX = Integer.parseInt(x);
                    int intY = Integer.parseInt(y);
                    int result = intX + intY;
                    callback.completed(result, null);
                } catch (NumberFormatException e) {
                    callback.failed(e, null);
                }
            }
        };
        executorService.submit(task);
    }

    public void finish() {
        executorService.shutdown();
    }

    public static void main(String[] args) {
        CallbackExample callbackExample = new CallbackExample();
        callbackExample.doWork("3", "3"); // completed
        callbackExample.doWork("3", "삼"); // failed
        callbackExample.finish();
    }
}

0개의 댓글