Thread(쓰레드)

박윤택·2022년 5월 24일
2

JAVA

목록 보기
11/14

🤔 개요

Thread

"어떠한 프로그램 내에서, 특히 프로세스 내에서 실행되는 흐름의 단위"를 말하며 모든 프로세스에는 한 개 이상의 스레드가 존재하여 작업을 수행한다. 두 개 이상의 스레드를 가지는 프로세스를 멀티스레드 프로세스라고 한다. 멀티 스레드는 대용량 데이터의 처리시간을 줄이기 위해 데이터를 분할해서 병렬로 처리하는 곳에서 사용되기도 하고, UI를 가지고 있는 애플리케이션에서 네트워크 통신을 위해 사용되기도 하며 다수 클라이언트의 요청을 처리하는 서버를 개발할 때도 사용된다.

Main Thread

자바 어플리케이션에서 메인스레드가 main() 메서드를 실행하면서 시작된다. main() 메서드 첫 코드부터 아래로 순차적으로 실행되고 main() 메서드의 마지막 코드를 실행하거나 return문을 만나게 되면 종료된다. 메인 스레드에 작업 스레드를 만들어서 동시에 코드를 실행시킬 수 있다.

싱글 스레드 애플리케이션에서는 메인 스레드가 종료하면 프로세스도 종료 된다. 하지만 멀티 스레드 애플리케이션에서는 실행 중인 스레드가 하나라도 있으면 프로세스는 종료 되지 않는다. 메인 스레드가 작업 스레드보다 먼저 종료되더라도 작업 스레드가 계속 실행 중이라면 프로세스는 종료되지 않는다.


🎉 Thread 생성과 실행

Thread 생성에는 두가지 방법이 있다. Thread 클래스로부터 직접 생성하는 방법과 Thread 클래스를 상속받아서 생성하는 방법이다.


Thread 클래스로부터 직접 생성

class MyThread implements Runnable{
	@Override
	public void run() {
		
		int i;
		for(i = 0; i<200; i++) {
			System.out.print(i + "\t");
		}
	}
}

public class ThreadTest {

	public static void main(String[] args) {

		System.out.println(Thread.currentThread()); // Thread[main,5,main]
		MyThread runnable = new MyThread();
        
        Thread th1 = new Thread(runnable);
        Thread th2 = new Thread(runnable);
        
		th1.start();
		th2.start();
        
        // 익명 클래스를 통해 간단한 스레드 생성
        Runnable run = new Runnable() {
        	@Override
            public void run() {
            	System.out.println("run");
            }
        };
        
        run.run();
        
        // run <- th1과 th2보다 먼저 run, start()한다고 바로 run 되지 않음
        // 0 0 1 1 2 2 3 3 4 4 ...
	}

}

thread 객체를 생성하고 start() 메서드를 호출하면 곧바로 스레드가 실행되는 것처럼 보이지만, 사실은 실행 대기 상태가 된다. 실행 대기 상태에 있는 스레드 중에서 스레드 스케줄링으로 선택된 스레드가 CPU를 점유하고 run() 메서드를 실행한다. 따라서 run()을 먼저 실행하기 때문에 "run"이 출력된다.


Thread 클래스 상속받아서 생성

class MyThread extends Thread{
	
	public void run() {
		
		int i;
		for(i = 0; i<200; i++) {
			System.out.print(i + "\t");
		}
	}
}

public class ThreadTest {

	public static void main(String[] args) {

		System.out.println(Thread.currentThread()); // Thread[main,5,main], 5는 NORMAL_PRIORITY(우선순위)
		MyThread th1 = new MyThread();
		th1.start();
		
		MyThread th2 = new MyThread();
		th2.start();
        // 0 0 1 1 2 2 3 3 4 4 ...
	}

}

☝️ Thread 우선 순위

Thread를 여러개 실행시키면 우선순위에 따라 Thread가 CPU의 배분을 받을 확률이 높다. Thread.MIN_PRIORITY(=1) ~ Thread.MAX_PRIORITY(=10)로 우선순위를 정할 수 있으며 default priority는 Thread.NORMAL_PRIORITY(=5)이다.
Priorirty를 설정하거나 확인하는 함수는 setPriority()/getPriority()이다.

class PriorityThread extends Thread{
	
	public void run(){
	
		int sum = 0;
		
		Thread t = Thread.currentThread();
		System.out.println( t + "start");
		
		for(int i =0; i<=1000000; i++){
			
			sum += i;
		}
		
		System.out.println( t.getPriority() + "end");
	}
}


public class PriorityTest {

	public static void main(String[] args) {

		int i;
		for(i=Thread.MIN_PRIORITY; i<= Thread.MAX_PRIORITY; i++){
			
			PriorityThread pt = new PriorityThread();
			pt.setPriority(i);
			pt.start();
		
		}
	}

}

🎋 Thread 동기화 및 상태

멀티 스레드 프로그램에서는 스레드들이 객체를 공유해서 작업을 수행해야하는 경우가 있다. 이 경우 한 스레드 A가 객체를 변경할 수 없도록 하려면 이 스레드 A의 작업이 끝날때까지 객체에 잠금을 걸어서 다른 스레드가 사용할 수 없도록 해야한다.

  • 임계 영역(Critical Section) : 단 하나의 스레드만 실행할 수 있는 코드 영역

자바는 임계영역을 지정하기 위해 동기화 메서드와 동기화 블럭을 제공한다. 스레드가 객체 내부의 동기화 블럭 또는 동기화 메서드에 들어가면 객체에 잠금을 걸어야 한다. synchronized 키워드를 이용해 동기화 블럭 또는 동기화 메서드를 만들 수 있다.


스레드 상태

public class StateShowThread extends Thread {
  private Thread thread;

  public StateShowThread(Thread thread){
    this.thread = thread;
  }

  public void run() {
    while(true) {
      Thread.State state = thread.getState(); // 스레드 상태 확인
      System.out.println("스레드 상태 : " + state);

      if(state == State.NEW) {
        thread.start();
      }
      if(state == State.TERMINATED) {
        break;
      }
      try {
        Thread.sleep(300); // 스레드의 상태를 0.3초 주기로 출력
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
}


class TargetThread extends Thread {
  public void run() {
    for (long i=0; i<1000000000; i++){}
    try {
      Thread.sleep(1300); // 1.3초간 TIME_WAITING 상태 유지
    } catch(Exception e){
      e.printStackTrace();
    }
    for(long i =0; i<1000000000; i++){}
  }
}


public class Main {
  public static void main(String[] args) {
    StateShowThread statePrintThread = new StateShowThread(new TargetThread());
    statePrintThread.start();
  }
}

// 스레드 상태 : NEW
// 스레드 상태 : RUNNABLE
// 스레드 상태 : RUNNABLE
// 스레드 상태 : TIMED_WAITING
// 스레드 상태 : TIMED_WAITING
// 스레드 상태 : TIMED_WAITING
// 스레드 상태 : TIMED_WAITING
// 스레드 상태 : RUNNABLE
// 스레드 상태 : RUNNABLE
// 스레드 상태 : TERMINATED

스레드의 상태는 NEW → RUNNABLE → TIMED_WAITING → RUNNABLE → TERMINATED로 변한 것을 확인할 수 있다.


스레드 상태 제어

메서드설명
interrupt일시 정지 상태의 스레드에서 InterruptedException 예외를 발생시켜, 예외 처리 코드(catch)에서 실행 대기 상태로 가거나 종료 상태로 갈 수 있도록 함
notify(), notifyAll()wait() 메서드에 의해 일시 정지 상태에 있는 스레드를 실행 대기 상태로 만듦
sleep(long millis) / sleep(long millis, int nanos)주어진 시간 동안 스레드를 일시 정지 상태로 만듦, 주어진 시간이 지나면 자동적으로 실행 대기 상태가 됨
join() / join(long millis) / join(long millis, int nanos)join() 메서드를 호출한 스레드는 일시 정지 상태가 됨, 실행 대기 상태로 가려면 join() 메서드를 멤버로 가지는 스레드가 종료되거나, 매개값으로 주어진 시간이 지나야 함
wait() / wait(long milis) / wait(long millis, int nanos)동기화(synchronized) 블록 내에서 스레드를 일시 정지 상태로 만듦, 매개값으로 주어진 시간이 지나면 자동적으로 실행 대기 상태가 됨 시간이 주어지지 않으면 notify(), notifyAll() 메서드에 의해 실행 대기 상태로 갈 수 있음.
yield()실행 중에 우선순위가 동일한 다른 스레드에게 실행을 양보하고 실행 대기 상태가 됨

join() 예

public class JoinTest extends  Thread{
  private int start;
  private int end;
  private int total;

  public JoinTest(int start, int end) {
    this.start = start;
    this.end = end;
  }

  public void run() {
    for(int i = start; i <= end; i++)
      this.total+=i;
  }

  public int getTotal() {
    return this.total;
  }
}

public class JoinTestMain {
  public static void main(String[] args) throws InterruptedException {
    JoinTest jt1 = new JoinTest(1, 50);
    JoinTest jt2 = new JoinTest(51, 100);

    System.out.println("jt1, jt2 start");
    jt1.start();
    jt2.start();

    System.out.println("jt1 total : " + jt1.getTotal());
    System.out.println("jt2 total : " + jt2.getTotal());
    int result = jt1.getTotal() + jt2.getTotal();

    System.out.println("total result(before join) : " + result); // 0이 출력됨 -> thread(main, jt1, jt2)에서 main thread가 끝나지 않았기 때문

//    jt1.join();
//    jt2.join();
//
//    result = jt1.getTotal() + jt2.getTotal();
//    System.out.println("total result(before join) : " + result); // 5050이 출력됨
  }

}

Thread는 main, jt1, jt2로 3개가 있다. main 안에서 jt1, jt2 thread가 동작하고 있으므로 jt1, jt2를 start()한 이후 둘의 합을 더해보면 0이 나오는데 이는 main 쓰레드 안에서 jt1, jt2가 아직 수행이 끝나지 않았기 때문이다.

join()을 이용한다면 main thread는 non-runnable 상태로 되므로 jt1, jt2 thread의 동작이 끝나고 그 이후 total 값을 확인해보면 5050이 나오는 것을 확인할 수 있다.


멀티 Thread에서의 동기화

  • 세마포어(Semaphore) : 공유 자원에 여러 프로세스가 접근하는 것을 막는 것
  • 뮤텍스(Mutex) : 상호 배제,임계 영역을 가지는 쓰레드들의 Running Time이 서로 겹치지 않도록 해주는 기법


한 순간 오직 하나의 thread 만이 세마포어를 얻을 수 있고 나머지 thread들은 대기 상태가 된다. 세마포어를 얻은 thread만이 임계 영역에 들어갈 수 있다.

class Bank {
  private int money = 100000;

  public synchronized void saveMoney(int save) {
    int m = this.getMoney();

    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    setMoney( m + save);
  }

  public synchronized void useMoney(int minus) {
    int m = this.getMoney();

    try {
      Thread.sleep(200);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    setMoney( m - minus);
  }

  public int getMoney(){
    return money;
  }

  public void setMoney(int money){
    this.money = money;
  }
}

class Husband extends Thread {
  public void run() {
    System.out.println("Husband save money");
    SyncTest.bank.saveMoney(10000);
    System.out.println("save money result : " + SyncTest.bank.getMoney());
  }
}

class Wife extends Thread {
  public void run() {
    System.out.println("Wife use money");
    SyncTest.bank.useMoney(50000);
    System.out.println("use money result : " + SyncTest.bank.getMoney());
  }
}

public class SyncTest {
  // 공유 자원
  public static Bank bank = new Bank();

  public static void main(String[] args) throws InterruptedException {
    Husband husband = new Husband();
    husband.start();

    Thread.sleep(200);

    Wife wife = new Wife();
    wife.start();

  }
}

// Husband save money
// Wife use money
// save money result : 110000
// use money result : 60000

money를 남편과 아내가 같이 사용하지만 java에서는 객체를 동기화 기준으로 정하므로 Bank를 공유하게 된다. static 키워드를 이용해서 공유하도록 설정하고 synchronized 키워드를 이용해 동기화 메서드를 작성한다. 즉, 임계영역에 접근하는 공유 자원을 lock하여 다른 thread의 접근을 제어한다.

syncronized 메서드에서 다른 syncronized 메서드를 호출한다면 deadlock에 빠지게 되므로 주의가 필요하다.

  • syncronized 키워드를 사용하지 않을 경우

  • syncronized 키워드를 사용하는 경우

syncronized 블럭

synchronized(참조형 수식) {

      수행문;
}

앞서 설명한 예제를 기준으로 syncronized 블럭을 설정하면 다음과 같다.

  • syncronized method -> syncronized block
class Bank {
  private int money = 100000;

  public void saveMoney(int save) {
    synchronized (this) {
      int m = this.getMoney();

      try {
        Thread.sleep(3000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      setMoney(m + save);
    }
  }

  public void useMoney(int minus) {
    synchronized (this) {
      int m = this.getMoney();

      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      setMoney(m - minus);
    }
  }

  public int getMoney(){
    return money;
  }

  public void setMoney(int money){
    this.money = money;
  }
}
  • run() 내부에 syncronized 블럭 설정
class Husband extends Thread {
  public void run() {
    synchronized (SyncTest.bank) {
      System.out.println("Husband save money");
      SyncTest.bank.saveMoney(10000);
      System.out.println("save money result : " + SyncTest.bank.getMoney());
    }
  }
}

class Wife extends Thread {
  public void run() {
    synchronized (SyncTest.bank) {
      System.out.println("Wife use money");
      SyncTest.bank.useMoney(50000);
      System.out.println("use money result : " + SyncTest.bank.getMoney());
    }
  }
}

wait()/notify() 메서드를 이용한 동기화

리소스가 어떤 조건에서 더이상 유효하지 않은 리소스를 기다리기 위해 Thread가 wait() 상태가 된다. wait() 상태가 된 Thread는 notify()가 호출 될 때까지 기다린다. 유효한 자원이 생기면 notify()가 호출되고 wait() 하고 있는 Thread 중 무작위로 하나의 Thread가 재시작된다.
notifyAll()이 호출되는 경우 wait()하고 있는 Thread가 재시작되는데 이때 유효한 리소스만큼의 Thread만이 수행될 수 있고 자원을 갖지 못한 Thread는 다시 wait() 상태가 된다.

  • notify()를 사용하는 경우
import java.util.ArrayList;

class Library{
  public ArrayList<String> shelf = new ArrayList<String>();
  public Library(){
    shelf.add("토지 1");
    shelf.add("토지 2");
    shelf.add("토지 3");
  }

  public synchronized String lendBook() throws InterruptedException{
    Thread t = Thread.currentThread();
    if(shelf.size() == 0) {
      System.out.println(t.getName() + " waiting start");
      wait();
      System.out.println(t.getName() + " waiting end");
    }
    if(shelf.size() > 0 ) {
      String book = shelf.remove(0);
      System.out.println(t.getName() + ": " + book + " lend");

      return book;
    }
    return null;
  }

  public synchronized void returnBook(String book){
    Thread t = Thread.currentThread();

    shelf.add(book);
    notify();
    System.out.println(t.getName() + ": " + book + " return");
  }
}

class Person extends Thread {
  public Person(String name){
    super(name);
  }
  public void run() {
    try {
      String title = NotifyTest.library.lendBook();
      if(title == null){
        System.out.println(getName() + " can't lend");
        return;
      }
      sleep(1000);
      NotifyTest.library.returnBook(title);
    } catch (InterruptedException e){
      e.printStackTrace();
    }
  }
}


public class NotifyTest {
  static Library library = new Library();

  public static void main(String[] args) {
    Person person1 = new Person("person1 ");
    Person person2 = new Person("person2 ");
    Person person3 = new Person("person3 ");
    Person person4 = new Person("person4 ");
    Person person5 = new Person("person5 ");
    Person person6 = new Person("person6 ");

    person1.start();
    person2.start();
    person3.start();
    person4.start();
    person5.start();
    person6.start();
  }
}

  • notifyAll()를 사용하는 경우
import java.util.ArrayList;

class Library{
  public ArrayList<String> shelf = new ArrayList<String>();
  public Library(){
    shelf.add("토지 1");
    shelf.add("토지 2");
    shelf.add("토지 3");
  }

  public synchronized String lendBook() throws InterruptedException{
    Thread t = Thread.currentThread();
    while(shelf.size() == 0) {
      System.out.println(t.getName() + " waiting start");
      wait();
      System.out.println(t.getName() + " waiting end");
    }
    if(shelf.size() > 0 ) {
      String book = shelf.remove(0);
      System.out.println(t.getName() + ": " + book + " lend");

      return book;
    }
    return null;
  }

  public synchronized void returnBook(String book){
    Thread t = Thread.currentThread();

    shelf.add(book);
    notifyAll();
    System.out.println(t.getName() + ": " + book + " return");
  }
}

class Person extends Thread {
  public Person(String name){
    super(name);
  }
  public void run() {
    try {
      String title = NotifyTest.library.lendBook();
      if(title == null){
        System.out.println(getName() + " can't lend");
        return;
      }
      sleep(1000);
      NotifyTest.library.returnBook(title);
    } catch (InterruptedException e){
      e.printStackTrace();
    }
  }
}

📱 Thread 풀

동시 작업 처리가 많아지면 스레드 수가 증가하고 스레드 생성과 스케줄링으로 인해 메모리 사용량이 늘어나면서 성능을 처하시킨다. 스레드의 무분별한 증가를 방지하려면 스레드풀을 사용해야한다. 이는 작업 처리에 사용되는 스레드의 수를 정해 놓아 큐에 작업이 들어오면 스레드 풀 안에 스레드를 하나씩 처리를 한다.


생성

메서드초기 스레드 수코어 스레드 수최대 스레드 수
newCachedThreadPool()00Integer.MAX_VALUE
newFixedThreadPool(int num)0numnum

종료

Thread Pool은 main thread가 종료되어도 작업을 처리하기 위해 계속 실행 상태로 남아있어 어플리케이션을 종료시키려면 Thread Pool을 종료해야 한다.

메서드리턴타입설명
shutdown()void작업 큐에 남아있는 모든 작업을 처리한 뒤 종료
shutdownNow()List작업 큐에 남아있는 작업과 상관없이 종료, 처리 못한 작업(Runnable) 목록을 리턴
awaitTermination(long timeout, TimeUnit unit)booleanshotdown() 메소드 호출 후, 모든 작업 처리를 timeout 시간안에 처리하면 true, 처리 못하면 작업 스레드들을 interrupt()하고 false 리턴

예시

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ThreadPoolTest {
  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(6);

    for(int i = 0; i < 20; i++){
      Runnable runnable = new Runnable() {
        @Override
        public void run() {
          ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
          // thread pool count check
          int poolSize = threadPoolExecutor.getPoolSize();
          // thread check
          String threadPoolName = Thread.currentThread().getName();
          System.out.println("Thread pool : " + poolSize + " / thread name : " + threadPoolName);
        }
      };

      // thread pool 작업 처리 요청
      executorService.execute(runnable);
//      executorService.submit(runnable);

      try {
         Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    // thread pool end
    executorService.shutdown();

  }
}

0개의 댓글