📚
자바 멀티쓰레드의 동시성을 해결하는 각 기능의 심화과정을 다루기 전에 가볍게 이용해보고 테스트를 통해 사용 방식을 익혀봅니다
자바 멀티쓰레드를 사용하면 반드시 겪는 동시성 문제가 있다.
동시성 문제는 쓰레드마다 객체나 변수를 조작할 때 서로가 최신값을 모르고 접근 & 변경하기 때문에 발생하는 문제고, 주로 쓰레드마다 가지는 로컬 변수가 아닌 전역 변수나 객체의 필드에서 자주 발생하기 때문에 주의하자
public class NonVolatileExample {
private boolean stopRequested = false;
private volatile boolean stopVolatileRequested = false;
Logger logger = Logger.getLogger(NonVolatileExample.class.getName());
@Test
void testNonVolatile() throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
logger.log(INFO, "Background Thread: " + Thread.currentThread().getName() + ", ID: " + Thread.currentThread().getId());
while (!stopRequested) {
// 무한 루프 - stopRequested가 true가 될 때까지 반복
}
logger.log(INFO, "Thread stopped.");
});
backgroundThread.start();
// 메인 스레드가 1초 동안 대기 후 stopRequested를 true로 설정
Thread.sleep(1000);
logger.log(INFO, "Main thread set stopRequested to true.");
stopRequested = true;
logger.log(INFO, "Main Thread: " + Thread.currentThread().getName() + ", ID: " + Thread.currentThread().getId());
Thread.sleep(1000);
}
@Test
void testVolatile() throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
while (!stopVolatileRequested) {
// 무한 루프 - stopRequested가 true가 될 때까지 반복
}
logger.log(INFO, "Thread stopped.");
});
backgroundThread.start();
// 메인 스레드가 1초 동안 대기 후 stopRequested를 true로 설정
Thread.sleep(1000);
stopVolatileRequested = true;
Thread.sleep(1000);
}
}
쓰레드가 서로 하나의 값에 접근해서 바꾸는 상태를 경합 상태 (Race Condition)라 한다. Race Condition은 어떻게 해결할 수 있을까?
volatile 로 가능할까?
synchronized 은 무엇인가?
Atomic 은 무엇인가?
원자성 보장
Atomic 클래스는 원자성 보장하는 메서드 제공
Atomic 클래스의 변수는 volatile로 선언
각 방식을 ExecutorService를 통해 동시에 실행했을 때 Counter의 필드가 예상값이 나오는지 테스트해보자
public class ExecutorTest {
private ExecutorService executorService;
private final Counter counter = new Counter();
@BeforeEach
void init() {
executorService = Executors.newFixedThreadPool(5);
counter.init();
}
@Test
void multiFail() {
for(int i = 0; i < 1000; i++){
executorService.execute(counter::increment);
}
assertThat(counter.count).isNotEqualTo(1000);
}
@Test
void syncTest(){
for(int i = 0; i < 1000; i++){
executorService.execute(counter::syncIncrement);
}
shutdownAndAwaitTermination();
assertThat(counter.count).isEqualTo(1000);
}
@Test
void volatileTest() {
for(int i = 0; i < 1000; i++){
executorService.execute(counter::volatileIncrement);
}
shutdownAndAwaitTermination();
assertThat(counter.volatileCount).isEqualTo(1000);
}
@Test
void atomicTest() {
for(int i = 0; i < 1000; i++){
executorService.execute(counter::atomicIncrement);
}
shutdownAndAwaitTermination();
assertThat(counter.atomicInteger.get()).isEqualTo(1000);
}
// ⚠️ executorService를 shutDown() 으로 종료해도
// 이미 할당받은 task를 마무리를 해야 정상적으로 예상한 결과가 나온다
// shutdownAndAwaitTermination() 으로 soft landing 구현
private void shutdownAndAwaitTermination() {
executorService.shutdown();
try {
if(!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (Exception e){
e.printStackTrace();
}
}
private static class Counter {
private int count = 0;
private volatile int volatileCount = 0;
private final AtomicInteger atomicInteger = new AtomicInteger(0);
public void init() {
count = 0;
volatileCount = 0;
atomicInteger.set(0);
}
public void increment(){
count++;
}
public void volatileIncrement(){
volatileCount++;
}
public synchronized void syncIncrement(){
count++;
}
public void atomicIncrement(){
atomicInteger.incrementAndGet();
}
}
}
volatile 키워드가 쓰레드간 값을 접근할 때 메모리에서 읽는 것은 보장해주지만 원자성을 보장하진 않는다.
volatile
키워드를 사용하지 않아도 변수의 최신 값을 보장받는 이유@Test
void volatileSyncTest() {
for(int i = 0; i < 10000; i++){
executorService.execute(counter::volatileSyncIncrement);
}
shutdownAndAwaitTermination();
assertThat(counter.volatileCount).isEqualTo(expectedValue);
}
// Counter.class
public void volatileSyncIncrement(){
synchronized (this) {
volatileCount++;
}
}
public class LockTest {
private ExecutorService executorService;
private final Counter counter = new Counter();
private final int expectedValue = 10000;
private final int forLoopCount = 10000;
@BeforeEach
void init() {
executorService = Executors.newFixedThreadPool(5);
counter.init();
}
@Test
@DisplayName("일반 Reentrant Lock")
void lockTest() {
for (int i = 0; i < forLoopCount; i++) {
executorService.execute(counter::incrementWithLock);
}
shutdownAndAwaitTermination();
assertThat(counter.getCount()).isEqualTo(expectedValue);
}
@Test
@DisplayName("tryLock() 테스트")
void tryLockTest() {
for (int i = 0; i < forLoopCount; i++) {
executorService.execute(() -> {
boolean success = counter.tryIncrementWithLock();
if (!success) {
// 락을 획득하지 못했을 때의 처리
System.out.println("Failed to increment due to lock unavailability.");
}
});
}
shutdownAndAwaitTermination();
System.out.println("Final count: " + counter.getCount());
assertThat(counter.getCount()).isLessThanOrEqualTo(expectedValue);
}
@Test
@DisplayName("멀티쓰레드에서 각 쓰레드가 짝수일 때와 홀수일 때 count를 증가시키도록 Condition 설정")
void conditionLockTest() {
for (int i = 0; i < 1000; i++) {
executorService.execute(counter::incrementIfOdd);
executorService.execute(counter::incrementIfEven);
}
shutdownAndAwaitTermination();
assertThat(counter.getCount()).isEqualTo(2000);
}
private void shutdownAndAwaitTermination() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition lockAvailable = lock.newCondition();
public void init() {
count = 0;
}
public void incrementWithLock() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public boolean tryIncrementWithLock() {
boolean isLocked = false;
try {
isLocked = lock.tryLock();
if (isLocked) {
count++;
return true; // 작업 성공
} else {
// 다른 작업 수행 가능
System.out.println("락 획득 실패. 다른 작업 진행");
return false; // 작업 실패
}
} finally {
if (isLocked) {
lock.unlock(); // 락이 성공적으로 획득된 경우에만 해제
}
}
}
public void incrementIfEven() {
String threadName = Thread.currentThread().getName();
try {
lock.lock();
if (count % 2 != 0) {
System.out.println(threadName+": count가 짝수이길 기다림");
// 잠금 상태가 아닌데 await method를 호출하면 IllegalMonitorStateException 발생
boolean timeOut = lockAvailable.await(1, TimeUnit.SECONDS);
if(timeOut) {
System.out.println(threadName+": 짝수 대기 시간 만료");
}
}
count++;
lockAvailable.signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
public void incrementIfOdd() {
String threadName = Thread.currentThread().getName();
try {
lock.lock();
if (count % 2 == 0) { // count가 짝수일 때만 홀수로 만듦
System.out.println(threadName+": count가 홀수이길 기다림");
boolean timeOut = lockAvailable.await(1, TimeUnit.SECONDS);
if(timeOut) {
System.out.println(threadName+": 홀수 대기 시간 만료");
}
}
count++;
lockAvailable.signal(); // 다른 대기 중인 스레드를 깨웁니다.
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock(); // 락을 해제합니다.
}
}
public int getCount() {
return count;
}
}
}
ReentrantLock의 장점
재진입이 가능한 Lock
명시적 Lock을 이용해 직관적인 코드 구조
tryLock() 을 이용가능
쓰레드가 락 획득에 성공하지 못해도 대기하지 않고 다른 작업을 수행할 수 있다 (비동기로 개선 가능)
1000번 반복 중 락 4번 획득 실패해서 최종적으로 996 출력 확인
조건 변수 (Condition) 지원