Thread

Minjoo Kim·2024년 9월 12일

자바에서 직접 스레드를 만드는 방법은 두 가지가 있다.
1. 스레드를 직접 생성하고 관리하기
비동기 작업이 필요할 때마다 Thread 객체를 직접 생성하는 방법.
2. 스레드 관리 추상화
작업을 Executor 같은 실행기에게 전달한다. 실행기는 스레드 풀을 관리하고, 작업의 실행을 관리하여 스레드의 생성 및 관리에 대한 세부 사항을 추상화한다. 이를 통해 코드가 더 간결하고 유지보수하기 쉬워진다.

스레드 인스턴스 생성

Thread 인스턴스를 생성하는 어플리케이션은 스레드에서 실행할 코드를 제공해야 하며 방식은 두 가지가 있다.

1 Thread 상속 받는 서브클래스 만들기

private static final class ExtendedThread extends Thread {

	private String message;
    
    public ExtendedThread(final String message) {
    	this.message = message;
    }
    
    @Override
    public void run() {
    	log.info(message);
    }
}

2 Runnable 객체 제공하기

  • 스레드에서 실행되는 코드를 포함하기 위한 run을 정의하고 Thread 생성자에 전달한다.
public static void main(String[] args) {
    Thread thread = new Thread(new RunnableThread("hello thread"));
    thread.start(); // 새 스레드 시작은 start() 메서드 호출
}

private static final class RunnableThread implements Runnable {

	private String message;
    
    public RunnableThread(final String message) {
    	this.message = message;
    }
    
    @Override
    public void run() {
    	log.info(message)
    }
}
  • Runnable 객체가 Thread 이외의 클래스를 서브클래싱할 수 있기 때문에 더 많이 사용된다.

스레드 풀

워커 스레드(스레드 풀 안에서 독립적으로 존재하는 스레드로, 여러 작업을 실행하는 데 재사용된다.)를 사용하면 스레드 생성으로 인한 오버헤드를 최소화할 수 있다. 스레드 객체는 상당한 양의 메모리를 사용하므로 대규모 애플리케이션에서 많은 스레드 객체를 할당하고 할당 해제하면 상당한 메모리 관리 오버헤드가 발생한다.

스레드 풀의 일반적인 유형 중 하나는 fixed thread pool이다. 항상 지정된 수의 스레드가 실행 중이며 사용 중인 스레드가 종료되면 자동으로 새 스레드로 대체된다. 작업은 내부 큐(queue)에 저장되고, 스레드들이 차례대로 큐에서 작업을 꺼내서 처리한다. fixed thread pool은 스레드를 무제한으로 만들지 않고, 일정 개수의 스레드만으로도 많은 작업을 효과적으로 처리할 수 있게 한다.

fixed thread pool을 사용하면 애플리케이션의 성능이 서서히 저하(degrade gracefully)되는 장점이 있다. HTTP 요청을 처리할 때마다 새로운 스레드가 생성될 때, 들어오는 요청의 수가 시스템이 감당할 수 있는 수를 초과하게 되고 스레드를 무제한으로 생성하게 된다. 스레드들이 시스템 자원을 모두 사용해버리면 서버는 모든 요청에 대해 응답을 멈추게 된다. fixed thread pool을 사용하면 시스템이 감당할 수 있는 만큼의 요청을 처리하게 된다. 한 번에 많은 요청이 들어와도 서서히 응답이 느려지지만 계속해서 요청을 처리할 수 있다.

newFixedThreadPool

ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(() -> "Hello World");
String result = future.get();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
}); 
// ⬆️ 위 두 개는 실행되고
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
// ⬆️ 하나는 큐에서 대기한다.

assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getQueue().size()).isEqualTo(1);

newCachedThreadPool

newFixedThreadPool과 다르게 스레드 수를 받지 않는다. corePoolSize를 0으로 설정하고 maximumPoolSizeInteger.MAX_VALUE로 설정하고 keepAliveTime은 60초로 설정한다.

스레드 풀이 작업의 수에 관계없이 무제한으로 커질 수 있다. 스레드가 더 이상 필요하지 않은 경우 60초 동안 사용하지 않으면 폐기된다. 수명이 짧은 작업이 많은 경우 사용한다.

내부적으로 SynchronousQueue 인스턴스가 사용되어 큐 크기는 항상 0이다. SynchronousQueue에서는 삽입과 제거 작업이 항상 동시에 발생하기 때문에 큐에는 아무것도 포함되지 않는다.

executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertThat(executor.getPoolSize()).isEqualTo(3);
assertThat(executor.getQueue().size()).isEqualTo(0);

Synchronization

스레드는 공유된 필드나 객체를 통해 통신한다. 즉, 여러 스레드가 같은 데이터를 참조하거나 수정할 수 있다. 이 방식은 매우 효율적이지만 여러 스레드가 동일한 데이터를 동시에 수정하면서 데이터 충돌이 발생(스레드 간 간섭)하거나 한 스레드가 변경한 데이터가 다른 스레드에서 잘못된 시점에 읽히는 상황(메모리 일관성 오류)가 발생할 수 있다.

Synchronization(동기화)는 이런 오류를 방지하기 위한 도구이다. 하지만 동시에 새로운 문제를 초래할 수 있다. 두 개 이상의 스레드가 동시에 같은 자원에 접근을 시도하면 스레드 경합(thread contention)이 일어나 일부 스레드가 더 느리게 실행되거나 아예 멈출 수 있다. 스레드 경합의 한 형태로 스레드가 자원을 얻지 못해 계속 기다리거나(기아, starvation) 자원을 얻으려고 하면서도 작업이 멈추는 상황(라이브락, livelock)이 발생할 수 있다.

Synchronizaed block

공유 데이터에 대한 스레드 액세스를 동기화해 경쟁 조건을 피할 수 있는 매커니즘을 제공한다. 동기화 블럭에는 주어진 시간에 하나의 스레드만 접근할 수 있다.

public class SynchronizedMethods {

    private int sum = 0;

    public void calculate() {
        setSum(getSum() + 1);
    }

    public int getSum() {
        return sum;
    }

    public void setSum(int sum) {
        this.sum = sum;
    }
}
@Test
public void givenMultiThread_whenNonSyncMethod() throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    SynchronizedMethods summation = new SynchronizedMethods();

    IntStream.range(0, 1000)
            .forEach(count -> executor.submit(summation::calculate));

    executor.awaitTermination(1000, TimeUnit.MILLISECONDS);

    assertThat(summation.getSum()).isEqualTo(1000);
}
org.opentest4j.AssertionFailedError: 
expected: 1000
 but was: 981
  • 위 테스트는 실패하며 매번 실행할 때마다 다른 결과가 나온다.

Synchronized Keyword

Synchronized 키워드는 인스턴스 메서드, Static 메서드, 코드 블럭과 같이 다양한 레벨에 적용할 수 있다. 동기화 블럭을 사용할 때 Java는 내부적으로 모니터 잠금 또는 내재적 잠금이라고 하는 모니터를 사용해 동기화를 제공한다. 이 모니터는 객체에 바인딩되어 동일한 객체의 모든 동기화된 블럭은 동시에 하나의 스레드만 실행할 수 있다.

인스턴스 메서드

public synchronized void synchronizedCalculate() {
    setSum(getSum() + 1);
}
@Test
public void givenMultiThread_whenMethodSync() throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    SynchronizedMethods summation = new SynchronizedMethods();

    IntStream.range(0, 1000)
            .forEach(count -> executor.submit(summation::synchronizedCalculate));

    executor.awaitTermination(1000, TimeUnit.MILLISECONDS);

    assertThat(summation.getSum()).isEqualTo(1000);
}

Static 메서드

public static synchronized void syncStaticCalculate() {
    staticSum = staticSum + 1;
}
@Test
public void givenMultiThread_whenStaticSyncMethod() throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(3);

    IntStream.range(0, 1000)
            .forEach(count -> executor.submit(SynchronizedMethods::syncStaticCalculate));

    executor.awaitTermination(1000, TimeUnit.MILLISECONDS);

    assertThat(SynchronizedMethods.staticSum).isEqualTo(1000);
}

코드 블럭

public void performSynchronizedTask() {
    synchronized (this) { // 매개변수 전달 -> 모니터 객체
        setSum(getSum() + 1);
    }
}
@Test
public void givenMultiThread_whenBlockSync() throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    SynchronizedMethods summation = new SynchronizedMethods();

    IntStream.range(0, 1000)
            .forEach(count -> executor.submit(summation::performSynchronizedTask));

    executor.awaitTermination(1000, TimeUnit.MILLISECONDS);

    assertThat(summation.getSum()).isEqualTo(1000);
}

코드 블럭 - Static 메서드

public static void performStaticSyncTask() {
    synchronized (SynchronizedMethods.class) {
        setStaticCount(getStaticCount() + 1);
    }
}
@Test
public void givenMultiThread_whenStaticSyncBlock() throws InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(3);

    IntStream.range(0, 1000)
            .forEach(count -> executor.submit(SynchronizedMethods::performStaticSyncTask));

    executor.awaitTermination(1000, TimeUnit.MILLISECONDS);

    assertThat(SynchronizedMethods.getStaticCount()).isEqualTo(1000);
}

톰캣 HTTP Connector 속성

maxThreads

  • 톰캣의 최대 스레드 개수
    HTTP Connector에서 처리할 수 있는 최대 요청 처리 수
  • 동시에 얼마나 많은 요청을 처리할 수 있는지 결정한다.

maxConnections

  • 주어진 시간에 수락하고 처리할 수 있는 최대 커넥션 개수
  • 기본 값 : 8192 ➡️ 8192 개의 커넥션을 수락 및 처리할 수 있다.
  • 수락 및 처리되는 커넥션들은 Tomcat Connector가 관리하는 큐에 저장된다.

acceptCount

  • 큐의 사이즈

  • 기본 값 : 100 ➡️ maxConnections 개수 이상의 요청이 들어와도 100개까지는 대기시킬 수 있다.

  • maxThreads 값이 아무리 높아도 maxConnections 값에 따라 유휴(ldle) 스레드가 발생할 수 있다.

  • maxConnections 값이 maxThreads보다 클 때 유휴 스레드 없이 모든 스레드가 사용된다.


🔗 References

profile
Hello, this is Minjoo Kim.

0개의 댓글