Java에서 Thread-Safe하게 구현하기

mangoo·2022년 12월 1일
1

💡 해당 포스팅에서 소개된 예제 코드들은 여기에서 확인할 수 있다.


동시성 == 멀티 스레드?

'Thred safe', 'Multi Thread Programming' 등의 키워드로 검색하던 중 여러 글에서 '동시성'이라는 키워드가 제목에 포함된 것을 볼 수 있었다. 동시성멀티 스레드가 정확하게 어떤 관계인지에 대한 의문이 생겨 용어에 대한 정리를 먼저 하려고 한다.

동시성

동시성은 논리적으로 두 개 이상의 task가 동시에 실행되는 것을 의미한다.

Concurrency means multiple tasks which start, run, and complete in overlapping time periods, in no specific order. Concurrency is essentially applicable when we talk about a minimum of two tasks or more.

Concurrent and parallel(Multi-threading) are effectively the same principle, both are related to tasks being executed simultaneously although I would say that parallel tasks should be truly multitasking, executed "at the same time" whereas concurrent could mean that the tasks are sharing the execution thread while still appearing to be executing in parallel.

멀티 스레드

멀티 스레드란 동시성을 달성할 수 있는 하나의 프로그래밍 기법이다.

Multi-threading is just a programming technique for expressing concurrency: you create threads that are logical streams of execution, and say what they can do, including how to arbitrate their access to shared variables (locking).

Multi threading is one way of achieving within a process concurrency. Formal definition of concurrency is, at least two execution is happening within the same time interval.

So, multi-threading is a core vehicle for accomplishing concurrency on a single machine, but concurrency is a wider aim and concept, which can be met by additional vehicles.


Thread-Safe란?

다수의 스레드가 공유 자원에 접근해도 프로그램이 문제 없이 동작하는 것을 의미한다. 조금 더 구체적인 정의를 찾던 중 스택 오버플로우에서 아래와 같은 내용을 발견했다.

"Thread Safety는 단순히 한 번에 하나의 스레드가 공유 자원에 접근하도록 보장하는 것만을 의미하지 않는다. race condition, deadlock, livelock, starvation이 발생하지 않는 동시에 공유 자원에 대한 순차적인 접근이 이루어지도록 보장해야 한다."

Thread safety is a little bit more than just making sure your shared data is accessed by only one thread at a time. You have to ensure sequential access to shared data, while at the same time avoiding race conditions, deadlocks, livelocks, and resource starvation.


Thread-Safety를 고려해야 하는 이유

코드를 Thread-Safe하게 작성해야 하는 이유는 무엇일까? 간단한 테스트를 위해 Bus 클래스를 정의했다.

public class Bus {
    private final int minOccupancy = 10;
    private int reservation = 0;

    public void getBusTicket() {
        try {
            Thread.sleep(100);
            reservation++;
            if (reservation < minOccupancy) {
                Thread.sleep(10);
                System.out.println("인원 부족으로 버스 운행이 취소될 수 있습니다. 현재 예약 인원: " + reservation);
            }
        } catch (InterruptedException e) {
            System.out.println("ERROR!");
        }
    }

    public int getReservation() {
        return reservation;
    }
}

30명이 동시에 버스 티켓을 예약한다고 생각해보자. 아래 코드를 실행했을 때 기대하는 결과는 크게 두 가지이다.

  • 예약 인원이 10명 미만일 때만 "인원 인원 부족으로 버스 운행이 취소될 수 있습니다." 문구가 출력되어야 한다.
  • 최종 예약 인원은 30명이어야 한다.

private final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);

@Test
void 삼십명의_사람들이_동시에_버스티켓을_구매한다() throws InterruptedException {

    int N = 30;
    CountDownLatch latch = new CountDownLatch(N);
    Bus bus = new Bus();

    for (int i = 0; i < N; i++) {
        THREAD_POOL.execute(() -> {
            bus.getBusTicket();
            latch.countDown();
        });
    }

    latch.await();

    int busReservation = bus.getReservation();
    System.out.println("======= Total reservation: " + busReservation + " =======");
    assertThat(busReservation).isNotEqualTo(N);
}

그러나 결과는?

  • 현재 예약 인원이 10명 이상일 때도 "인원 부족으로 버스 운행이 취소될 수 있습니다." 문구가 출력된다.
  • 최종 구매 인원이 26명이다.


왜 이런 결과가 나온 걸까? 하나씩 살펴보자.


결과 #1. 예약 인원이 10명 이상일 때 "운행 취소"문구 출력

하나의 스레드가 if 분기문을 통과한 시점에 다른 스레드가 reservation에 접근해 카운트를 올릴 수 있기 때문에 15번째, 16번째 예약자에게도 "운행 취소" 문구가 출력된다.

결과 #2. 최종 구매 인원이 26명

Thread 1이 reservation 값(x)를 읽어온 시점에 Thread 2에서 동일한 reservation 값(x)을 읽어올 수 있다. Thread 1이 reservation 값을 (x+1)로 업데이트하지만 Thread 2에 의해 reservation 값이 다시 (x+1)로 업데이트되기 때문에 최종적으로 reservation 값은 (x+2)가 아닌 (x+1)이 된다.


결국 이 두 가지 상황 모두 Race Condition(경쟁 상태)가 발생한 것으로 볼 수 있다. Race Condition은 두 개 이상의 스레드가 동시에 변경 가능한 공유 자원에 접근하고 이를 변경하려고 할 때 발생한다. 스레드 스케쥴링 알고리즘에 의해 스레드는 언제든 swap 될 수 있기 때문에 어느 시점에 어떤 순서로 스레드가 공유 자원에 접근할지 알 수 없다. 따라서, 스레드 스케쥴링 알고리즘이 결정하는 스레드의 접근 시점과 순서에 따라 실행 결과가 달라지게 된다.

Race Condition이 발생하는 주요 시나리오는 read-modify-write와 check-then-act가 있다. 위에서 살펴본 결과 #1이 check-then-act에 해당하고, 결과 #2는 read-modify-write에 해당한다.


그렇다면 Java에서는 어떻게 Thread-Safe한 코드를 작성할 수 있을까?


Java의 Thread-Safety

무상태(Stateless), 불변(Immutable)

객체를 무상태(Stateless)로 구현하면 항상 Thread-safe하다. 아래 MathUtil 클래스에는 변경 가능한 클래스 변수와 인스턴스 변수가 선언되어 있지 않고, factorial() 메서드는 내부 상태나 외부 상태에 의존하지 않기 때문에 항상 동일한 input에 대해 일관된 output을 얻을 수 있다.

public class MathUtils {
    
    public static BigInteger factorial(int number) {
        BigInteger f = new BigInteger("1");
        for (int i = 2; i <= number; i++) {
            f = f.multiply(BigInteger.valueOf(i));
        }
        return f;
    }
}

스레드 간 상태를 공유해야 한다면 객체를 불변(Immutable)으로 구현하면 된다. "불변"이란 객체가 생성된 후 내부 상태를 변경할 수 없는 것을 의미한다. 자바에서 불변 클래스를 구현하는 방법은 클래스 변수와 인스턴스 변수를 private, final로 선언한 뒤 setter를 제공하지 않는 것이다. 아래 MessageService 객체는 생성 시점 이후 내부 상태를 변경할 수 없기 때문에 Thread-safe하다.

public class MessageService {
    
    private final String message;

    public MessageService(String message) {
        this.message = message;
    }
    
    // getter
    
}

Synchronized

자바의 모든 객체는 하나의 Monitor를 갖고 있다. Monitor는 상호 배제(mutual exclusion)협력(cooperation) 두 가지 종류의 스레드 동기화를 지원한다. 상호 배제는 Lock을 통해 다수의 스레드가 공유 자원에 대해 독립적으로 연산을 수행할 수 있도록 하는 것이며, 협력은 wait와 notify 메서드를 통해 스레드가 상호 협력하도록 하는 것이다. 예를 들어 버퍼로부터 데이터를 읽어오는 스레드와 버퍼에 데이터를 쓰는 스레드가 있다고 할 때, 버퍼가 비어 있다면 버퍼에 데이터를 쓰는 스레드가 데이터를 채울 때까지 wait하고, 해당 작업이 종료되면 다시 읽기를 시작하는 것이다.

Monitor에 대한 더 자세한 내용은 여기에서 확인할 수 있다.

자바의 synchrnoized 키워드는 스레드 간 동기화를 위해 사용되는 대표적인 기법으로, synchronized 키워드는 객체 안에 존재하는 Monitor를 이용해 동기화를 수행한다. 하나의 스레드가 synchronized로 지정한 임계 영역에 들어가 있을 때 Lock이 걸리기 때문에 다른 스레드가 임계 영역에 접근할 수 없다. 이후 해당 스레드가 임계 영역의 코드를 모두 실행한 뒤 벗어나면 unlock 상태가 되어 대기하고 있던 다른 스레드가 이 임계영역에 접근해 다시 Lock을 걸어 사용할 수 있다.

synchronized 키워드는 메서드코드 블럭에 사용할 수 있다. 메서드에 synchronized 키워드를 붙이면 한 번에 하나의 스레드만 해당 메서드에 접근할 수 있고, 나머지 스레드로부터의 접근은 block된다.

앞서 예시로 들었던 Bus 클래스에 synchronized 키워드를 사용해 결과를 보자.

public class BusSynchronizedMethod {
    private final int minOccupancy = 10;
    private int reservation = 0;

    public synchronized void getBusTicket() {
        try {
            Thread.sleep(100);
            reservation++;
            if (reservation < minOccupancy) {
                Thread.sleep(10);
                System.out.println("인원 부족으로 버스 운행이 취소될 수 있습니다. 현재 예약 인원: " + reservation);
            }
        } catch (InterruptedException e) {
            System.out.println("ERROR!");
        }
    }
}

BusSynchronizedMethod 테스트 코드는 아래와 같다.

@Test
void 삼십명의_사람들이_동시에_버스티켓을_구매한다_synchronized_method() throws InterruptedException {

    int N = 30;
    CountDownLatch latch = new CountDownLatch(N);
    BusSynchronizedMethod bus = new BusSynchronizedMethod();

    for (int i = 0; i < N; i++) {
        THREAD_POOL.execute(() -> {
            bus.getBusTicket();
            latch.countDown();
        });
    }

    latch.await();

    int busReservation = bus.getReservation();
    System.out.println("======= Total reservation: " + busReservation + " =======");
    assertThat(busReservation).isEqualTo(N);
}

결과는 이전과 다르게 최종 예약 인원이 30명이고, 예약 인원이 10명 미만일 때만 "운행 취소" 문구가 출력되는 걸 볼 수 있다.

reservation을 업데이트하고 minOccupancy를 체크하는 로직을 synchronized 블럭에 넣어도 위와 동일한 결과를 볼 수 있다. 해당 테스트도 만들어 놨으니 포스팅 상단의 링크에서 코드를 다운받아 실행시켜 볼 수 있다.

Atomic Objects

자바는 AtomicInteger, AtomicLong, AtomicBoolean 등의 atomic 클래스를 제공한다. atomic 클래스는 멀티 스레드 환경에서 원자성을 보장해준다. 또한, 앞에서 살펴본 synchronized와는 다르게 blocking이 아닌 non-blocking 방식으로 동작하며 핵심 동작 원리는 CAS(Compare And Swap) 알고리즘이다.

CAS 알고리즘에 대한 더 자세한 내용은 여기에서 확인할 수 있다.

Bus의 reservation 변수를 AtomicInteger 타입으로 변경해보자.


public class BusAtomic {
    private final int minOccupancy = 10;
    private final AtomicInteger reservation = new AtomicInteger();

    public void getBusTicket() {
        try {
            Thread.sleep(100);
            int newReservation = reservation.incrementAndGet();
            if (newReservation < minOccupancy) {
                Thread.sleep(1);
                System.out.println("인원 부족으로 버스 운행이 취소될 수 있습니다. 현재 예약 인원: " + newReservation);
            }
        } catch (InterruptedException e) {
            System.out.println("ERROR!");
        }
    }
}

이전과 동일한 테스트를 돌리면 어떻게 될까?

@Test
void 삼십명의_사람들이_동시에_버스티켓을_구매한다_atomic() throws InterruptedException {

    int N = 30;
    CountDownLatch latch = new CountDownLatch(N);
    BusAtomic bus = new BusAtomic();

    for (int i = 0; i < N; i++) {
        THREAD_POOL.execute(() -> {
            bus.getBusTicket();
            latch.countDown();
        });
    }

    latch.await();

    int busReservation = bus.getReservation();
    System.out.println("======= Total reservation: " + busReservation + " =======");
    assertThat(busReservation).isEqualTo(N);
}

reservation 변수가 int 타입으로 선언됐을 때와 다르게 10명 미만일 때 "운행 취소" 문구가 출력되고 최종 예약 인원이 30명임을 알 수 있다.

왜 이런 결과가 나온 걸까? 자바로 작성된 한 줄의 연산인 reservation++은 변수의 값을 읽고, 연산한 뒤 저장하는 세 줄의 기계어로 쪼개질 수 있고, 멀티 스레드 환경에서 기계어 연산들이 한 번에 모두 실행될거라는 보장이 없기 때문이다. 값을 읽고 연산하는 기계어까지 실행했는데 다른 스레드로 swap 되는 상황을 생각하면 된다. atomic 클래스는 원자성을 보장하기 때문에 멀티 스레드 환경에서 안전하게 사용할 수 있다.

Volatile

변수에 volatile 키워드를 붙이면 CPU Cache를 사용하지 않고 Main Memory에 변수를 저장해 읽기와 쓰기 연산을 수행한다고 명시하는 것이다.

volatile 키워드는 오직 하나의 쓰레드에서 읽기와 쓰기 작업을 하고 나머지 쓰레드에서는 읽기 작업만 보장되는 경우에 사용한다.

Synchronized Collections

자바 Collection Framework의 대부분 Collection 구현체들은 Thread-Safe하지 않다. 아래 테스트를 보자. 어떤 결과가 나올 것 같은가?

@Test
void list에_원소를_추가한다() throws InterruptedException {

    int N = 30;
    List<Integer> list = new ArrayList<>();
    List<Integer> addElements = Arrays.asList(1, 2, 3);

    CountDownLatch latch = new CountDownLatch(N);

    for (int i = 0; i < N; i++) {
        THREAD_POOL.execute(() -> {
            list.addAll(addElements);
            latch.countDown();
        });
    }

    latch.await();

    System.out.println(list.size());
}

분명 30개의 스레드에서 원소를 3개씩 추가했으니 90이 출력되어야 하지만 출력되는 값은 90에 못 미치는 값임을 알 수 있다.

그렇다면 어떻게 해야 할까? java.util.Collections 클래스가 제공하는 static 팩토리 메소드인 synchronizedCollection() 메서드를 이용하면 Thread-Safe한 Collection 객체를 생성할 수 있다.

@Test
void syncrhonized_collection에_원소를_추가한다() throws InterruptedException {

    int N = 30;
    Collection<Integer> syncCollection = Collections.synchronizedCollection(new ArrayList<>());
    List<Integer> addElements = Arrays.asList(1, 2, 3);

    CountDownLatch latch = new CountDownLatch(N);

    for (int i = 0; i < N; i++) {
        THREAD_POOL.execute(() -> {
            syncCollection.addAll(addElements);
            latch.countDown();
        });
    }

    latch.await();

    System.out.println(syncCollection.size());
    assertThat(syncCollection.size()).isEqualTo(N * 3);
}

추가로, SynchronizedList 클래스의 메서드를 살펴보면 모두 synchronized 키워드가 붙어 있는 것을 볼 수 있다. 클래스 내의 모든 메소드가 mutex 객체를 공유하므로 하나의 스레드가 synchronized 블록에 들어간 순간 다른 메소드의 synchronized 블록이 다 Lock이 걸린다. 따라서, 여러 연산을 묶어 단일 연산처럼 사용하는 경우나 성능 저하 문제가 발생할 수 있다.

Concurrent Collections

Synchronized Collection 대신 Concurrent Collection을 사용해도 Thread-Safe한 Collection 객체를 생성할 수 있다. java.util.concurrent 패키지에서 CopyWriteArrayList, Concurrentmap, ConcurrentHashMap 등의 클래스를 찾아볼 수 있다. 그렇다면 아래의 테스트를 실행하면 결과가 어떻게 나올까?

@Test
void concurrent_collection에_원소를_추가한다() throws InterruptedException {

    int N = 30;
    Collection<Integer> concurrentCollection = new CopyOnWriteArrayList<>();
    List<Integer> addElements = Arrays.asList(1, 2, 3);

    CountDownLatch latch = new CountDownLatch(N);

    for (int i = 0; i < N; i++) {
        THREAD_POOL.execute(() -> {
            concurrentCollection.addAll(addElements);
            latch.countDown();
        });
    }

    latch.await();

    System.out.println(concurrentCollection.size());
    assertThat(concurrentCollection.size()).isEqualTo(N * 3);
}

CopyOnWriteArrayList는 모든 쓰기 동작 시 원본 배열에 있는 요소를 복사하여 새로운 임시 배열을 만들고, 이 임시 배열에 쓰기 동작을 수행한 후 원본 배열을 갱신한다. 따라서 읽기 동작에서는 Lock이 걸리지 않기 때문에 앞에서 살펴본 SynchronizedList보다 성능이 좋다. 그러나 쓰기 동작에서는 동일하게 Lock이 걸리고, 내부적으로 원본 배열을 복사하는 연산이 포함되기 때문에 쓰기 작업 수행 비율에 따라 성능 이슈가 발생할 수 있다.

// CopyOnWrtieArrayList 클래스의 메서드

public E get(int index) {
    return elementAt(getArray(), index);
}

public boolean add(E e) {
    synchronized (lock) {
        Object[] es = getArray();
        int len = es.length;
        es = Arrays.copyOf(es, len + 1);
        es[len] = e;
        setArray(es);
        return true;
    }
}

Concurrent Collection에 대한 더 자세한 내용은 여기에서 확인할 수 있다.


Reference

0개의 댓글