지난 글에서 thread-unsafety가 발생할 수 있는 상황들에 대해서 알아보았다. 오늘은 자바에서 thread-safe한 코드를 작성하는 방법들에 대해서 알아보려고 한다.
객체를 stateless하게 구현하면 thread-safe하다. 즉, 변경될 가능성이 있는 인스턴스 변수, 클래스 변수들이 존재하지 않고, 클래스 내부에 선언된 메소드들도 내부나 외부 상태에 의존하지 않는 경우를 의미한다.
아래는 stateless한 클래스의 예시이다. 내부 필드가 선언되어 있지않고, 내부에 정의된 factorial method도 함수의 인자로 넘어온 number 값에만 의존하고 있기 때문에 동일한 input에 대해서 항상 동일한 output을 얻을 수 있다.
package thread;
import java.math.BigInteger;
public class MathUtils {
public static BigInteger factorial(int number) {
BigInteger f = new BigInteger("1");
for(int i=2; i<=number; ++i) {
f = f.multiply(BigInteger.valueOf(i));
}
return f;
}
}
만약 스레드 간에 객체의 상태를 공유해야 한다면, 그 객체를 immutable object로 선언하는 방법이 있다. immutable object는 한번 선언할 때 설정한 값을 변경할 수 없는 객체를 말한다.
한번 선언을 하면 스레드들이 그 객체의 값을 변경할 수 없으므로 thread-safe하다.
immutable 클래스의 필드가 모두 primitive type의 변수라면
1. 클래스를 final로 선언
2. 모든 필드를 private final로 선언
3. 필드에 대한 setter를 제공하지 않음
이렇게 3가지 규칙만 지켜주면 쉽게 immutable object를 구현할 수 있다. 하지만 immutable object의 필드 안에 참조 자료형 필드가 존재하는 경우 이렇게 3가지 규칙만 지켜서는 완전한 불변 객체를 만들 수 없다.
immutable Object를 생성하는 방법은 이 글에서 확인할 수 있다.
자바의 모든 객체는 하나의 Monitor를 가지고 있다. Monitor는 상호 배제(mutual exclusion), 협력(cooperation) 두 가지 종류의 스레드 동기화를 지원한다.
예를 들어 버퍼에 데이터를 쓰는 스레드가 있고 데이터를 읽는 스레드가 있다고 할 때,
Monitor에 대한 이야기를 하는 이유는 자바의 synchronized 키워드는 스레드 간 동기화를 위해 사용되는 대표적인 기법으로 객체 안에 있는 모니터를 이용해서 동기화를 수행하기 때문이다.
하나의 스레드가 synchronized로 지정한 임계 영역에 들어가 있을 때 lock이 걸리기 때문에 다른 스레드가 임계 영역에 접근할 수가 없다. 즉, 임계 영역에 lock이 걸려 있어서 스레드가 접근할 수 없는 경우 그 스레드가 blocked 되었다고 말하고 따라서 synchronized를 blocking 방식이라고 표현한다. blocked된 스레드는 cpu의 제어권을 다른 스레드에게 넘겨준다고 한다. 임계 영역에서 연산을 하던 스레드가 해당 코드 부분을 다 실행해서 벗어나게 되면 lock이 풀리게 되고 다른 스레드가 임계 영역에 들어와 lock을 걸고 작업을 할 수 있다.
synchronized 키워드는 메소드와 코드 블록에 사용할 수 있다.
메소드 이름 앞에 synchronized 키워드를 붙이면 synchronized 메소드가 되고, 한번에 하나의 스레드만 해당 메소드에 접근할 수 있게 된다. synchronized 키워드는 접근 제어자와 static 키워드 또는 리턴 타입 사이에 적어주면 된다.
thread-unsafe한 상황들을 설명하는 글에서 들었던 예시를 synchronized 키워드를 통해 thread-safe하게 만들 수 있다.
public class IncreaseThread extends Thread {
private static int threadCount = 0;
public void run() {
for(int i=0; i<1000; ++i) {
increase();
}
}
// synchronized 메소드
public synchronized static void increase() {
++threadCount;
}
public int getThreadCount() {
return threadCount;
}
}
이렇게 공유 변수인 threadCount의 값을 증가시키는 increase() 메소드를 synchronized 메소드로 선언해주고 아래 클래스의 main 메소드를 실행하면 몇번을 실행하든 상관 없이 4000이라는 값이 프린트된다.
import java.util.*;
class Main {
public static void main(String[] args) throws InterruptedException {
IncreaseThread t1 = new IncreaseThread();
IncreaseThread t2 = new IncreaseThread();
IncreaseThread t3 = new IncreaseThread();
IncreaseThread t4 = new IncreaseThread();
t1.start(); // t1 스레드 시작
t2.start(); // t2 스레드 시작
t3.start(); // t3 스레드 시작
t4.start(); // t4 스레드 시작
t1.join(); // t1이 종료될 때까지 기다리기
t2.join(); // t2가 종료될 때까지 기다리기
t3.join(); // t3가 종료될 때까지 기다리기
t4.join(); // t4가 종료될 때까지 기다리기
System.out.println(t1.getThreadCount());
}
}
예시에서는 synchronized 메소드 안에서 '++threadCount;'라는 명령어 단 한줄을 실행하였기 때문에 상관 없었지만, 어떤 메소드는 임계 영역(critical section) 부분은 일부분이고 대부분의 코드가 임계 영역이 아닌 경우가 있을 수 있다. 이런 경우, 전체 메소드를 synchronized 키워드로 보호한다면 cpu 자원이 낭비되고 실행 시간이 느려질 수 있다.
따라서 메소드 안에서도 임계 영역에 해당하는 부분만 보호할 수 있는 synchronized 블록이 존재한다. synchronized 블록은 다음과 같이 사용할 수 있다.
public class IncreaseThread extends Thread {
private static int threadCount = 0;
Object lock = new Object(); // 1. lock 객체를 만듦
public void run() {
for(int i=0; i<1000; ++i) {
increase();
}
}
public void increase() {
synchronized(lock) { // 2. synchronized 블록. lock 객체의 lock을 사용하게 된다.
++threadCount;
}
}
public int getThreadCount() {
return threadCount;
}
}
주석으로 //1로 표시된 부분에서 lock이라고 하는 Object 객체를 선언해주고, //2로 표시된 부분에서 synchronized 키워드 옆에 (lock)을 써준 것을 확인할 수 있다.
이렇게 되면 그 synchronized 블록에서는 Object lock 객체의 lock을 사용해서 임계 영역에 접근하게 된다. 위 코드에서는 lock 역할을 하는 객체를 따로 선언해주었지만 다음과 같이 this를 사용하더라도 상관없다.
단 this를 사용할 경우에는 synchronized 블록을 포함하는 메소드가 static method일 수는 없다. (this는 현재 인스턴스를 의미하는데, static method는 인스턴스의 생성없이 클래스를 통해서도 호출될 수 있기 때문이다.)
public void increase() {
synchronized(this) { // synchronized 블록. lock 객체의
++threadCount;
}
}
자바는 AtomicInteger, AtomicLong, AtomicBoolean 등의 atomic 클래스를 제공한다. atomic 클래스는 멀티 스레드 환경에서 원자성을 보장해준다.
또한 앞에서 본 synchronized 키워드는 blocking 방식으로 동작하였지만, 이 Atomic 객체들은 non-blocking 방식으로 동작하기 때문에 synchronized보다 비교적 저렴하게 race condition 문제를 해결할 수 있다.
Atomic 객체는 CAS(Compare And Swap) 알고리즘을 통해서 race condition 문제를 해결한다. CAS란, 스레드가 들고 있는 값(캐시에 있는 값)과 메모리에 저장된 값이 동일할 때만 연산을 하는 방식이다.
CAS
1. 각각의 스레드는 메모리에 있는 값을 읽어 cpu 캐시 메모리에 저장한다.
2. 한 스레드가 인자로 기존 값(캐시, 스레드가 들고 있는 값)과 변경할 값을 전달한다.
3. 기존 값(캐시에 있는 값)과 메모리에 있는 값을 비교한다.
4. 두 값이 같다면 메모리에 변경 값을 저장하고 true를 반환한다. 두 값이 다르다면 스레드가 메모리의 값을 읽어가고 false를 반환한 후 2번으로 돌아간다.
volatile이 붙지 않은 변수 예시.
cpu cache를 사용하기 때문에 cache와 메모리 사이 값 불일치 현상이 발생하였다.
변수에 volatile 키워드를 붙이면 CPU cache를 사용하지 않고 Main Memory에 변수를 저장해 읽기와 쓰기 연산을 수행한다고 명시하는 것이다.
volatile 키워드는 하나의 스레드가 읽기/쓰기 작업을 하고 나머지 스레드에서는 읽기 작업만 할 때 동기화가 보장된다.
자바에서 Collection 인터페이스를 구현한 클래스들은 대부분 Thread-safe하지 않다.
아래 예시 코드를 살펴보자.
package thread;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class ThreadTest {
// 10은 동시에 실행될 수 있는 스레드의 개수를 의미
public final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
@Test
public void list에_원소_추가() throws InterruptedException {
int N = 30;
List<Integer> list = new ArrayList<>();
List<Integer> addElements = Arrays.asList(1, 2, 3);
CountDownLatch latch = new CountDownLatch(N);
// 30개의 스레드 실행
for(int i=0; i<N; ++i) {
THREAD_POOL.execute(() -> {
list.addAll(addElements);
latch.countDown();
});
}
latch.await();
System.out.println(list.size());
}
}
코드 출처: https://velog.io/@mangoo/java-thread-safety
3개의 원소가 들어있는 리스트를 30번 추가했으므로 90이 출력될 것을 예상했지만 90보다 적은 수가 출력될 수도 있다. 이는 ArrayList가 thread-unsafe한 Collection이기 때문에 read-modify-write 유형의 thread-unsafe한 상황이 생긴 것이라고 볼 수 있다.
따라서 여러 스레드가 같은 Collection에 동시 접속할 가능성이 있는 상황에서는 thread-safe한 Collection을 선택하여 사용하거나 Collections.synchronizedCollection()과 같은 메소드를 통해 thread-safe 하도록 만들어 주어야한다.
등이 존재한다.
위에서 소개된 Collection 중 Stack, Vector, HashTable을 제외한 대부분의 Collection은 java.util.concurrent 패키지 안에 정의되어 있다.
다음과 같이 리스트를 선언하여 thread-safe한 리스트를 만들 수 있다.
Collection<Integer> syncCollection = Collections.synchronizedCollection(new ArrayList<>());
Collections.synchronizedCollection()뿐만 아니라, Collections.synchronizedList(), Collections.synchronizedMap(), Collections.synchronizedSet() 등도 존재한다.
예시
Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
Set<Integer> syncSet = Collections.synchronizedMap(new HashSet<>());
List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
https://kadosholy.tistory.com/123
https://velog.io/@mangoo/java-thread-safety
https://velog.io/@hyunjong96/%EB%8F%99%EC%8B%9C%EC%84%B1%EA%B3%BC-%EB%B3%91%EB%A0%AC%EC%84%B1
https://steady-coding.tistory.com/568
thread-safe collection에 대한 자세한 설명
https://steady-coding.tistory.com/575