[Java] 동시성 문제

sion·2024년 11월 21일
0

자바 스터디

목록 보기
5/7

동시성과 병렬성

동시성(Concurrency)과 병렬성(Parallelism)은 멀티스레드의 핵심 개념입니다.

동시성(=Concurrent하다는 것)은 여러 작업을 동시에 처리하는 것처럼 보이는 것이고, 병렬성(=Parallel하다는 것)은 여러 작업을 실제로 동시에 처리하는 것을 의미합니다.

동시성 (Concurrency)

동시성은 하나의 코어에서 여러 task들을 빠르게 번갈아가면서 수행하는 기술입니다. "동시에" 실행되는 것처럼 보이는 것은 스케쥴러가 아주 빠르게 스레드(혹은 프로세스) 간 컨텍스트 스위치를 하기 때문입니다. (어떤 task를 CPU에 할당할 지 결정)
특히, I/O wait이 많은 웹 서버나 DB 서버에서 많이 사용됩니다.

  • 싱글 코어에서 멀티스레드(멀티프로세스)를 동작시키는 방식
  • 참고: 프로세스간 발생하는 context switching 보다는 스레드 간 context switching이 오버헤드가 적다.

병렬성 (Parallelism)

병렬성은 여러 task들을 실제로 동시에 실행하는 기술입니다. 따라서, core가 하나라면 parallel한 방법은 불가능합니다. 대규모 데이터 연산/분석, 이미지 처리 등에서 단순 계산 작업들을 빠르게 처리할 수 있습니다.

  • 멀티 코어에서 멀티스레드(멀티프로세스)를 동작시키는 방식

동시성 문제는 Concurrent한 Task 에서 발생한다.

동기화 문제는 concurrent하게 동작하는 여러 task가 동시에 공유된 자원(resource)에 변경하려 할 때 발생합니다.

  • 멀티 스레드 환경에서는 여러 스레드가 concurrent하게 실행되고, 각 task는 하나의 thread가 실행하는 작업을 의미합니다.

예를 들어, 10개의 스레드가 동일한 변수 counter를 1씩 증가시키는 코드가 있다고 가정하겠습니다.

public class Counter {
    private int counter = 0;

    public void increment() {
        counter++;
    }

    public int getCounter() {
        return counter;
    }
}

10개의 스레드(t1, t2, ..., t10)가 각각 increment() 메서드를 동시에 호출하면, 다음과 같은 과정을 겪습니다.

  1. t1이 counter (ex: 3)를 읽고, counter <- 4 하려고 합니다.
  2. t2이 동시에 counter (ex: 3)를 읽고, counter <- 4 하려고 합니다.
  3. 두 t1, t2가 동일한 값을 읽었기 때문에, counter가 두 번 호출되더라도 실제 counter는 1만 증가됩니다.

이를 Race Condition(경쟁 상태)이 발생했다고 하고, thread-unsafe 한 코드에서 발생합니다.

thread-safe란?

thread-safe란, 멀티스레드 환경에서 여러 스레드가 동시에 공유 자원을 write 하더라도 자원의 상태를 일관되게 유지할 수 있도록 보장된 상태를 의미합니다.

자바의 동시성 이슈를 해결하자.

그러면 counter라는 공유 자원을 스레드 간에 race condition이 발생하지 않도록 thread-safe하게 처리하는 방법들에 대해 알아보겠습니다.

  1. 상태를 두지 않거나, 불변하도록 객체 설계 (final 활용)
    • 파라미터로 값을 받아 처리 후 반환만 해주도록 설계한다. 공유 자원을 없앨 수 있다면 없애자..
  2. synchronized 키워드 사용
    • 한 스레드가 synchronized 블록에 들어가면, 실행이 끝날 때까지 나머지 스레드는 대기 (Blocking)
  3. Lock 인터페이스 사용 (java.util.concurrent.locks.ReentrantLock)
    • 명시적으로 Lock을 관리할 수 있음
  4. AtomicInteger 사용
    • CAS(Compare-And-Swap) 기법으로 Lock을 사용하는 것보다 성능상 우위 (Non-Blocking)
  5. LongAdder 사용
    • CAS 연산에 의한 경합을 최소화하기 위한 기법
동시성 이슈 테스트 코드
@ExtendWith(SpringExtension.class)
class CounterTest {

    @Autowired
    private Counter counter;

    @Autowired
    private CounterWithSync counterWithSync;

    @Autowired
    private CounterWithLock counterWithLock;

    @Autowired
    private CounterWithAtomic counterWithAtomic;

    @Autowired
    private CounterWithAdder counterWithAdder;

    // 스레드 풀 생성
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);

    // task 수
    private final int TASK_COUNT = 1_000_000;

    @Test
    void testCounterWithMultipleThreads() throws InterruptedException {
        for (int i = 0; i < TASK_COUNT; i++) {
            executorService.submit(() -> counter.increment());
        }

        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);

        assertThat(counter.getCounter()).isLessThan(TASK_COUNT); // thread safe 하지 않음
    }

    @Test
    void testCounterWithSyncWithMultipleThreads() throws InterruptedException {
        for (int i = 0; i < TASK_COUNT; i++) {
            executorService.submit(() -> counterWithSync.increment());
        }
        
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);

        assertThat(counterWithSync.getCounter()).isEqualTo(TASK_COUNT);
    }

    @Test
    void testCounterWithLockWithMultipleThreads() throws InterruptedException {
        for (int i = 0; i < TASK_COUNT; i++) {
            executorService.submit(() -> counterWithLock.increment());
        }
        
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);

        assertThat(counterWithLock.getCounter()).isEqualTo(TASK_COUNT);
    }

    @Test
    void testCounterWithAtomicWithMultipleThreads() throws InterruptedException {
        for (int i = 0; i < TASK_COUNT; i++) {
            executorService.submit(() -> counterWithAtomic.increment());
        }
        
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);

        assertThat(counterWithAtomic.getCounter()).isEqualTo(TASK_COUNT);
    }

    @Test
    void testCounterWithAdderWithMultipleThreads() throws InterruptedException {
        for (int i = 0; i < TASK_COUNT; i++) {
            executorService.submit(() -> counterWithAdder.increment());
        }

        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);

        assertThat(counterWithAdder.getCounter()).isEqualTo(TASK_COUNT);
    }

    @Configuration
    public static class TestConfig {

        @Bean
        public Counter counter() {
            return new Counter();
        }

        @Bean
        public CounterWithSync counterWithSync() {
            return new CounterWithSync();
        }

        @Bean
        public CounterWithLock counterWithLock() {
            return new CounterWithLock();
        }

        @Bean
        public CounterWithAtomic counterWithAtomic() {
            return new CounterWithAtomic();
        }

        @Bean
        public CounterWithAdder counterWithAdder() {
            return new CounterWithAdder();
        }
    }

    public static class Counter {
        private int counter = 0;
        
        public void increment() {
            counter++;
        }

        public int getCounter() {
            return counter;
        }
    }

    public static class CounterWithSync {
        private int counter = 0;
        
        public synchronized void increment() {
            counter++;
        }

        public int getCounter() {
            return counter;
        }
    }

    public static class CounterWithLock {
        private int counter = 0;

        private final Lock lock = new ReentrantLock();

        public void increment() {
            lock.lock();
            try {
                counter++;
            } finally {
                lock.unlock();
            }
        }

        public int getCounter() {
            return counter;
        }
    }

    public static class CounterWithAtomic {
        private final AtomicInteger counter = new AtomicInteger(0);

        public void increment() {
            counter.incrementAndGet();
        }

        public int getCounter() {
            return counter.get();
        }
    }

    public static class CounterWithAdder {
        private final LongAdder counter = new LongAdder();

        public void increment() {
            counter.increment();
        }

        public long getCounter() {
            return counter.sum();
        }
    }

    public static class CounterWithParam {

        public int increment(int counter) {
            return counter + 1;
        }

    }
}

synchronized 문제점과 내부 동작

synchronized 방법

synchronized는 블록, 메소드, 클래스 단위로 동기화를 적용할 수 있습니다. 특정 코드 블록 -> 인스턴스 메소드 -> 정적 메소드 수준으로 점차 범위가 증가합니다.

// 1. 코드 블록 동기화
public void increment() {
  	// logic 1...
	synchronized (this) {
  		counter++;
  	}
  	// logic 2...
}
  
// 2. 메서드 동기화
private int counter = 0;
public synchronized void increment() {
	counter++;
}
  
// 3. 메서드 동기화
privte static int counter = 0;
public static synchronized void increment() {
	counter++;
}

synchronized 문제점

synchronized는 블록 전체에 락을 걸고 다른 스레드가 접근하면 blocking되는 방식이므로 스레드가 많을수록 성능 저하가 크게 발생합니다.

1) 성능 저하

  • 동시에 여러 스레드가 synchronized 블록에 접근하려고 하면, 대기 시간이 길어집니다.
  • lock 획득과 해제에 대한 비용이 큽니다.

2) 데드락

  • 실수로 인해, 상호 의존적인 Lock을 기다리게 되면 무한 대기에 빠져 시스템이 멈출 수 있습니다.

3) 과도한 블록 범위

  • synchronized를 넓은 범위(단순히 메소드)로 짜게 되면 동기화가 필요하지 않은 부분까지도 포함되는 문제가 발생합니다. 예를 들어, 블록 안에서 오랜 시간이 걸리는 I/O 등의 작업이 같이 이루어지게 되면 많은 스레드들이 대기 상태에 놓여 성능을 크게 저하시킬 수 있습니다.

synchronized 내부 동작

  1. 모니터 락(Monitor Lock)
    • 각 객체는 고유한 Monitor Lock 을 가지고 있습니다.
    • 스레드가 synchronized 블록이나 메서드에 진입하려면 Monitor Lock을 획득해야만 가능합니다.
  2. 획득 과정
    • 스레드가 synchronized 블록에 진입 시도
      - 사용 중이 아니면, Lock을 획득하고 블록 실행
      - 사용 중이면, Lock이 해제될 때까지 대기 상태
    • 블록 실행 완료되면 Lock 해제
    • 대기 중인 스레드가 Lock을 획득할 기회를 가짐

또한, JVM 수준에서 monitorenter, monitorexit 으로 변환되어 바이트코드 수준에서도 동기화를 보장합니다.

가시성 문제와 원자성 문제

가시성 문제: 스레드 A가 값을 바꿔도, 스레드 B가 바꾼 사실을 못 본다.
원자성 문제: 스레드 A와 B가 동시에 값을 바꾸려 할 때, 중간 과정의 데이터를 read하는 문제.

가시성 문제 (Visibility)

한 스레드에서 변경한 값을 다른 스레드가 즉시 관찰할 수 없는 현상을 의미합니다. CPU Cache Memory와 RAM의 데이터가 일치하지 않아 생기는 문제입니다.

class SampleTest {

    private static boolean running = true;

    @Test
    void test2() throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (running) {
                // running 값을 다른 스레드에서 변경하기 때문에 이를 감지하지 못함
  				// 단, 여기에 System.out.println()을 넣으면 내부적으로 synchronized가 실행되어 동기화가 이루어진다.
            }
            System.out.println("Thread stopped.");
        });
        thread.start();

        Thread.sleep(1000);
        running = false;    // 1초 후에 running 값을 false로 변경해도, thread는 이를 알지 못한다. (가시성 문제)
    }

}

즉, 한 스레드가 공유 자원(공유 변수)을 변경해도, 다른 스레드는 이를 인식하지 못하고 캐싱된 값을 사용합니다. 이를 가시성 문제라고 합니다.

volatile 키워드

volatile로 선언하면 항상 RAM에서 데이터를 읽고 쓰게 강제하여 가시성 문제를 해결할 수 있습니다.

원자성 문제 (Atomicity)

하나의 연산 과정이 여러 단계로 나뉘면서, 과정 중에 다른 스레드가 중간 상태의 데이터를 관찰하여 일관되지 않은 값을 일으키는 문제입니다.

  • 읽기(Read) -> 연산(Operate) -> 쓰기(Write)
  • 예를 들어, count++; 더하기 연산을 하게 되면,
  • 1) 변수 count(=0) 읽고, -> 2) CPU 연산 수행 (+1) -> 3) 다시 메모리에 count(=1) 쓴다.
  • 한 스레드가 count 연산하는 중에, 다른 스레드가 count를 읽어간다면 예상한 결과가 나오지 않습니다.
class SampleTest {

    private static int count = 0;

    @Test
    void atomicTest() throws InterruptedException {
        Runnable task = () -> {
            for (int i = 0; i < 10_000; i++) {
                count++; // 읽기 -> 연산 -> 쓰기 (3단계)
            }
        };

        Thread thread1 = new Thread(task);
        Thread thread2 = new Thread(task);

        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();

        System.out.println("count = " + count); // 20_000이 나와야 하는데 그렇지 않음
        assertThat(count).isLessThan(20_000);
    }

}

결과를 보면 20000에 한참 미치지 못하는 것을 볼 수 있다.

AtomicIntegerAtomic 클래스 사용

java.util.concurrent.atomic 패키지 클래스들은 CAS 알고리즘을 통해 원자성을 보장합니다.

private static AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet();

CAS (Compared-And-Swap) 알고리즘

// CAS 간단 예시 (실행되는 코드는 아님)
int expected = x;  // 예상 값
int newValue = x + 1;  // 새로운 값

// 1. Compare: 공유변수 x가 현재 값(expected)과 같은지 비교
// 2. Swap: 같다면, 변수 x를 변경 값(newValue)으로 교체하고 true 반환
// 3. 다르다면, false를 반환하고 다시 시도
boolean success = compareAndSwap(x, expected, newValue);

if(success) {
    System.out.println("성공적으로 값을 업데이트 했습니다.");
} else {
    System.out.println("다른 스레드가 값을 변경했으므로 다시 시도합니다.");
}

Lock 동기화처럼 Blocking이 아닌 스레드가 계속 연산을 시도할 수 있는 Non-Blocking 방식이기 때문에 성능이 향상됩니다.

참고

0개의 댓글

관련 채용 정보