자바 멀티쓰레드와 동시성 종류, 간단한 응용 코드 (기초)

DongHyun Kim·2024년 8월 30일
0
post-custom-banner

이 글의 목적

📚
자바 멀티쓰레드의 동시성을 해결하는 각 기능의 심화과정을 다루기 전에 가볍게 이용해보고 테스트를 통해 사용 방식을 익혀봅니다

자바 멀티쓰레드를 사용하면 반드시 겪는 동시성 문제가 있다.
동시성 문제는 쓰레드마다 객체나 변수를 조작할 때 서로가 최신값을 모르고 접근 & 변경하기 때문에 발생하는 문제고, 주로 쓰레드마다 가지는 로컬 변수가 아닌 전역 변수나 객체의 필드에서 자주 발생하기 때문에 주의하자

Volatile

  • volatile 를 사용하는 이유 - Thread의 가시성 문제
    • 자바 메모리 모델에서 쓰레드는 각자의 캐시를 사용한다
    • 한 쓰레드가 자신의 캐시에 있는 전역 변수의 값을 변경해도 다른 쓰레드는 그 값을 모른다
    • “volatile 키워드”는 변수의 값을 메인 메모리에 직접 쓰고 읽도록 강제하여, 모든 쓰레드가 항상 최신 값을 보도록 보장
public class NonVolatileExample {

    private boolean stopRequested = false;
    private volatile boolean stopVolatileRequested = false;
    Logger logger = Logger.getLogger(NonVolatileExample.class.getName());

    @Test
    void testNonVolatile() throws InterruptedException {
        Thread backgroundThread = new Thread(() -> {
            logger.log(INFO, "Background Thread: " + Thread.currentThread().getName() + ", ID: " + Thread.currentThread().getId());
            while (!stopRequested) {
                // 무한 루프 - stopRequested가 true가 될 때까지 반복
            }
            logger.log(INFO, "Thread stopped.");
        });

        backgroundThread.start();

        // 메인 스레드가 1초 동안 대기 후 stopRequested를 true로 설정
        Thread.sleep(1000);
        logger.log(INFO, "Main thread set stopRequested to true.");
        stopRequested = true;

        logger.log(INFO, "Main Thread: " + Thread.currentThread().getName() + ", ID: " + Thread.currentThread().getId());
        Thread.sleep(1000);
    }

    @Test
    void testVolatile() throws InterruptedException {
        Thread backgroundThread = new Thread(() -> {
            while (!stopVolatileRequested) {
                // 무한 루프 - stopRequested가 true가 될 때까지 반복
            }
            logger.log(INFO, "Thread stopped.");
        });

        backgroundThread.start();

        // 메인 스레드가 1초 동안 대기 후 stopRequested를 true로 설정
        Thread.sleep(1000);
        stopVolatileRequested = true;

        Thread.sleep(1000);
    }
}
  • 위 코드에서 testNonVolatile() 의 경우 다른 쓰레드에서 stopRequested 플래그를 true로 업데이트했지만, 쓰레드의 캐시 이슈로 인해 무한 루프에서 빠져나오지 못하는 경우가 발생한다

동시성 V1 - Race Condition 방지

쓰레드가 서로 하나의 값에 접근해서 바꾸는 상태를 경합 상태 (Race Condition)라 한다. Race Condition은 어떻게 해결할 수 있을까?

  • volatile 로 가능할까?

  • synchronized 은 무엇인가?

    • 모니터 락 (Monitor Lock)
      • 여러 스레드가 동일한 자원 (메서드 또는 코드 블록)에 접근할 때 한 번에 하나의 스레드만 접근할 수 있도록 모니터 락을 사용하여 보장
    • 가시성 보장
      • synchronized 블록을 탈출할 때, 쓰레드의 모든 변경 사항이 메모리에 커밋된다
  • Atomic 은 무엇인가?

    • 원자성 보장

      • Atomic 클래스는 원자성 보장하는 메서드 제공

      • Atomic 클래스의 변수는 volatile로 선언

Race Condition 테스트 코드

각 방식을 ExecutorService를 통해 동시에 실행했을 때 Counter의 필드가 예상값이 나오는지 테스트해보자

public class ExecutorTest {

    private ExecutorService executorService;
    private final Counter counter = new Counter();

    @BeforeEach
    void init() {
        executorService = Executors.newFixedThreadPool(5);
        counter.init();
    }

    @Test
    void multiFail() {
        for(int i = 0; i < 1000; i++){
            executorService.execute(counter::increment);
        }

        assertThat(counter.count).isNotEqualTo(1000);
    }

    @Test
    void syncTest(){
        for(int i = 0; i < 1000; i++){
            executorService.execute(counter::syncIncrement);
        }
        shutdownAndAwaitTermination();
        assertThat(counter.count).isEqualTo(1000);
    }

    @Test
    void volatileTest() {
        for(int i = 0; i < 1000; i++){
            executorService.execute(counter::volatileIncrement);
        }
        shutdownAndAwaitTermination();
        assertThat(counter.volatileCount).isEqualTo(1000);
    }

    @Test
    void atomicTest() {
        for(int i = 0; i < 1000; i++){
            executorService.execute(counter::atomicIncrement);
        }
        shutdownAndAwaitTermination();
        assertThat(counter.atomicInteger.get()).isEqualTo(1000);
    }

	// ⚠️ executorService를 shutDown() 으로 종료해도 
    // 이미 할당받은 task를 마무리를 해야 정상적으로 예상한 결과가 나온다
    // shutdownAndAwaitTermination() 으로 soft landing 구현
    private void shutdownAndAwaitTermination() {
        executorService.shutdown();
        try {
            if(!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }

    private static class Counter {
        private int count = 0;
        private volatile int volatileCount = 0;
        private final AtomicInteger atomicInteger = new AtomicInteger(0);

        public void init() {
            count = 0;
            volatileCount = 0;
            atomicInteger.set(0);
        }
        public void increment(){
            count++;
        }

        public void volatileIncrement(){
            volatileCount++;
        }

        public synchronized void syncIncrement(){
            count++;
        }

        public void atomicIncrement(){
            atomicInteger.incrementAndGet();
        }
    }
}
  • synchronized 방식과 Atomic 방식은 모두 예상한 결과가 나온다.
  • 하지만 volatile 방식은 문제가 발생했다
    • volatile 키워드가 쓰레드간 값을 접근할 때 메모리에서 읽는 것은 보장해주지만 원자성을 보장하진 않는다.

      • intellij 에선 친절히 경고를 날려준다
  • 언제 volatile을 써야하나?
    • volatile은 간단히 가시성을 보장하기 때문에 다른 고비용의 동기화보다 저렴하다.
    • 간단한 플래그 변수, 싱글톤 패턴, 동시에 쓰기 작업이 발생하는 Race Condition 이 적을 때
  • AtomicInteger나 synchronized를 사용할 때 volatile 키워드를 사용하지 않아도 변수의 최신 값을 보장받는 이유
    • JVM의 동기화 보장 덕분
    • synchronized를 사용하면 캐시 값을 사용하지 않고 메모리의 최신 값 참조
    • AtomicInteger를 사용하면 내부적으로 volatile 변수와 CAS (Compare-And-Swap) 연산을 사용하여 원자적 연산을 수행
  • Volatile 증가 수정 & 테스트
    • 아래와 같이 수정하면 예상대로 count가 출력된다
@Test
void volatileSyncTest() {
	  for(int i = 0; i < 10000; i++){
	      executorService.execute(counter::volatileSyncIncrement);
	  }
	  shutdownAndAwaitTermination();
	  assertThat(counter.volatileCount).isEqualTo(expectedValue);
}

// Counter.class
public void volatileSyncIncrement(){
    synchronized (this) {
        volatileCount++;
    }
}

동시성 V2 - Lock을 이용

  • ReentrantLock
    • 명시적 Lock을 이용
  • lock() 기능 테스트
  • tryLock() 기능 테스트
  • Condition 기능 테스트
public class LockTest {

    private ExecutorService executorService;
    private final Counter counter = new Counter();
    private final int expectedValue = 10000;
    private final int forLoopCount = 10000;

    @BeforeEach
    void init() {
        executorService = Executors.newFixedThreadPool(5);
        counter.init();
    }

    @Test
    @DisplayName("일반 Reentrant Lock")
    void lockTest() {
        for (int i = 0; i < forLoopCount; i++) {
            executorService.execute(counter::incrementWithLock);
        }
        shutdownAndAwaitTermination();

        assertThat(counter.getCount()).isEqualTo(expectedValue);
    }

    @Test
    @DisplayName("tryLock() 테스트")
    void tryLockTest() {
        for (int i = 0; i < forLoopCount; i++) {
            executorService.execute(() -> {
                boolean success = counter.tryIncrementWithLock();
                if (!success) {
                    // 락을 획득하지 못했을 때의 처리
                    System.out.println("Failed to increment due to lock unavailability.");
                }
            });
        }
        shutdownAndAwaitTermination();

        System.out.println("Final count: " + counter.getCount());
        assertThat(counter.getCount()).isLessThanOrEqualTo(expectedValue);
    }

    @Test
    @DisplayName("멀티쓰레드에서 각 쓰레드가 짝수일 때와 홀수일 때 count를 증가시키도록 Condition 설정")
    void conditionLockTest() {
        for (int i = 0; i < 1000; i++) {
            executorService.execute(counter::incrementIfOdd);
            executorService.execute(counter::incrementIfEven);
        }

        shutdownAndAwaitTermination();

        assertThat(counter.getCount()).isEqualTo(2000);
    }

    private void shutdownAndAwaitTermination() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public static class Counter {
        private int count = 0;
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition lockAvailable = lock.newCondition();

        public void init() {
            count = 0;
        }

        public void incrementWithLock() {
            lock.lock();
            try {
                count++;
            } finally {
                lock.unlock();
            }
        }

        public boolean tryIncrementWithLock() {
            boolean isLocked = false;
            try {
                isLocked = lock.tryLock();
                if (isLocked) {
                    count++;
                    return true;  // 작업 성공
                } else {
                    // 다른 작업 수행 가능
                    System.out.println("락 획득 실패. 다른 작업 진행");
                    return false;  // 작업 실패
                }
            } finally {
                if (isLocked) {
                    lock.unlock();  // 락이 성공적으로 획득된 경우에만 해제
                }
            }
        }

        public void incrementIfEven() {
            String threadName = Thread.currentThread().getName();
            try {
                lock.lock();
                if (count % 2 != 0) {
                    System.out.println(threadName+": count가 짝수이길 기다림");
                    // 잠금 상태가 아닌데 await method를 호출하면 IllegalMonitorStateException 발생
                    boolean timeOut = lockAvailable.await(1, TimeUnit.SECONDS);

                    if(timeOut) {
                        System.out.println(threadName+": 짝수 대기 시간 만료");
                    }
                }

                count++;
                lockAvailable.signal();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                if(lock.isHeldByCurrentThread()){
                    lock.unlock();
                }
            }
        }

        public void incrementIfOdd() {
            String threadName = Thread.currentThread().getName();
            try {
                lock.lock();
                if (count % 2 == 0) {  // count가 짝수일 때만 홀수로 만듦
                    System.out.println(threadName+": count가 홀수이길 기다림");
                    boolean timeOut = lockAvailable.await(1, TimeUnit.SECONDS);

                    if(timeOut) {
                        System.out.println(threadName+": 홀수 대기 시간 만료");
                    }
                }

                count++;
                lockAvailable.signal();  // 다른 대기 중인 스레드를 깨웁니다.
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();  // 락을 해제합니다.
            }
        }

        public int getCount() {
            return count;
        }
    }
}
  • ReentrantLock의 장점

    • 재진입이 가능한 Lock

    • 명시적 Lock을 이용해 직관적인 코드 구조

    • tryLock() 을 이용가능

      • 쓰레드가 락 획득에 성공하지 못해도 대기하지 않고 다른 작업을 수행할 수 있다 (비동기로 개선 가능)

      • 1000번 반복 중 락 4번 획득 실패해서 최종적으로 996 출력 확인

    • 조건 변수 (Condition) 지원

      • Lock의 newCondition() 으로 Condition 획득 가능
      • Condition 객체를 이용해 Object 클래스의 wait(), notify(), notifyAll() 메서드보다 더 유연한 동기화 매커니즘 지원
      • 특정 조건을 await() 으로 기다릴 수 있다.
        • await()을 호출하면 획득했던 lock 을 반환하고 ready state (대기) 상태로 돌아간다. (Blocking)
        • 주의⚠️ lock을 획득하지 못한 Thread가 await() 을 호출하면 IllegalMonitorStateException 가 발생한다
      • singal() 으로 대기 중인 다른 스레드 하나를 깨워서 이어서 처리할 수 있도록 한다
      • singalAll() 으로 대기 중인 다른 스레드 모두를 깨워서 이어서 처리할 수 있도록 한다
profile
do programming yourself
post-custom-banner

0개의 댓글