Thread는 프로세스에서 여러개 존재할 수 있으며, 같은 메모리 영역을 가진다. 반면 자신만의 스택과 레지스터를 가진다.
Thread가 메모리 영역에 자원을 동시에 수정하려 할 때, 동시성 문제가 발생할 수 있다.
이를 안전하게 수정하기 위해 Thread-safe한 설계가 필요하다.
Thread-safe란 여러 스레드가 동시에 공유 자원에 접근하더라도 데이터의 일관성을 유지할 수 있는 상태를 의미한다. 즉, 여러 스레드가 동일한 자원을 수정하거나 업데이트 할 때, 데이터의 무결성이 손상되지 않도록 보장한다. 이는 멀티스레드 환경에서 프로그램이 안정적으로 작동할 수 있도록 만드는 중요한 개념이다.
그렇다면 Thread-safe한 설계에는 어떤 방법이 있을까?
- 동기화:
a. Lock 사용: 가장 일반적인 방법으로, 자원에 접근할 때 Lock을 사용하여 한 스레드가 작업을 수행하는 동안 다른 스레드는 대기하게 만든다. 예를 들어, 자바에서는 synchronized 키워드를 사용하여 메서드나 블록을 동기화 할 수 있다.
b. ReentrantLock: 자바의 java.util.concurrent 패키지에는 더 유연한 동기화를 제공하는 ReentrantLock 클래스가 있다. 이를 사용하면 Lock의 획득 및 해제가 더 세밀하게 제어할 수 있다.
일반적으로 흔히 아는 Lock방식이 존재한다. 공유자원에 접근하려 할 때 Lock을 획득해야만 자원에 접근이 가능하도록 한다. ReentrantLock은 뭘까?
ReentrantLock의 Reentrant는 재진입이란 뜻이다. 동일한 스레드가 이미 잠금을 획득한 상태에서 다시 lock을 요청할 경우, deadlock에 빠지지 않고 lock을 획득할 수 있다. lock 획득 카운터가 증가하여 unlock() 호출 수와 일치할 때까지 lock이 해제되지 않는다.
이외에도 trylock() 메서드와 Condition객체가 존재한다. 기존의 lock보다 더 확장성있고 최적화된 기능들을 제공한다.
- 세마포어:
세마포어는 동시에 접근할 수 있는 스레드의 수를 제한하여 공유자원의 동시성을 제어하는 데 사용된다. 이를 통해 다수의 스레드가 자원에 접근하더라도 충돌이 발생하지 않도록 한다.
세마포어는 자원에 접근하는 스레드의 수를 관리하는 카운터이다. P 연산과 V연산이 존재하며 P연산은 세마포어의 카운터를 감소시키고, V 연산은 세마포어의 카운터를 증가시킨다.
특정적인 자원에 독점적인 접근인 lock과 달리 여러 스레드가 접근할 수 있다는 차이점이 있다.
- 불변 객체:
객체의 상태를 변경할 수 없는 불변 객체를 설계하면, 여러 스레드가 객체에 접근하더라도 안전하게 사용할 수 있다. 객체의 상태가 변경되지 않기 때문에 데이터의 일관성을 유지할 수 있다.
아주 근본적인 해결책이며 자원을 read-only로 만든다. 이러면 여러 스레드가 자원의 일관성이 보장된다.
- Concurrent Collections:
프로그래밍 언어에는 스레드 안정성을 보장하는 컬렉션 클래스가 존재한다. 예를 들어, Java의 ConcurrentHashMap과 같은 클래스를 사용하면, 별도의 동기화 없이도 안전하게 여러 스레드가 데이터를 읽고 쓸 수 있다.
동시성이 지켜지는 다양한 자료구조 라이브러리가 존재한다.
- CAS(Compare And Swap):
원자적인 연산을 통해 자우너의 상태를 안전하게 업데이트 하는 방법이다. CAS를 활용하면 Lock을 사용하지 않고도 동시성 문제를 해결할 수 있다.
각 설계방법에 따라 간단한 실습을 해보자.
class SafeCounter {
private int count = 0;
public synchronized void increment() {
count++;
}
public int getCount() {
return count;
}
}
public class Main {
public static void main(String[] args) {
SafeCounter counter = new SafeCounter();
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread( () -> {
for(int j = 0; j < 1000; j++) {
counter.increment();
}
});
}
for(Thread thread: threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.pritnln("Final count: " + counter.getCount());
}
}
increment() 메소드는 synchronized 키워드가 적용되어 한 스레드만 사용하도록 제한된다.
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class LockCounter {
private int count = 0;
private final Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
return count;
}
}
public class Main {
public static void main(String[] args) {
LockCounter counter = new LockCounter();
//...위와 동일
}
}
lock과 unlock의 수가 맞춰질때까지 자원에 독점적인 접근을 한다.
import java.util.concurrent.Semaphore;
class SharedResource {
private final Semaphore semaphore;
public SharedResource(int permits) {
semaphore = new Semaphore(permits);
}
public void accessResource() {
try {
semaphore.acquire(); //P연산
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
}
}
public class Main {
public static void main(String[] args) {
SharedResource sharedResource = new SharedResource(2);
Runnable task = () -> {
sharedResource.accessResource();
};
for (int i =0; i < 5; i++) {
new Thread(task, "스레드 " + (i + 1)).start();
}
}
}
final class User {
private final String firstName;
private final String lastName;
public User(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return this.firstName;
}
public String getLastName() {
return lastName;
}
public class Main {
public static void main(String[] args) {
User user = new User("John", "Doe");
Thread[] threads = new Thread[10];
for(int i = 0; i<10; i++) {
threads[i] = new Thread( ()-> {
System.out.println(user.getFirstName() + " " + user.getLastName());
});
}
for(Thread thread : threads) {
thread.start();
}
}
}
스레드에서는 단순히 getFirstName(), getLastName() 메소드만 호출해 자원을 읽고만 있다.
import java.util.concurrent.ConcurrentHashMap;
public class Main {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
Thread[] threads = new Thread[10];
for(int i = 0; i < 10; i++) {
threads[i] = new Thread( () -> {
for (int j = 0; j < 1000; j++) {
map.merge("key", 1, Integer::sum);
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Final count for key: " + map.get("key"));
}
}
map.merge 메서드에서 "key"라는 키에 1을 더한다. 최종 값은 0에 1을 1000번 더한 경우가 10번 있으므로 10,000이 된다.
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
Thread thread1 = new Thread(() -> {
int currentValue;
int newValue;
do {
currentValue = atomicInteger.get();
newValue = currentValue + 1;
} while(!atomicInteger.compareAndSet(currentValue, newValue))
System.out.println("Thread 1 updated to: " + newValue);
});
Thread thread2 = new Thread(() -> {
int currentValue;
int newValue;
do {
currentValue = atomicInteger.get(); // 현재 값 가져오기
newValue = currentValue + 1; // 새 값 계산하기
} while (!atomicInteger.compareAndSet(currentValue, newValue));
System.out.println("Thread 2 updated to: " + newValue);
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final value: " + atomicInteger.get());
}
}
compareAndSet은 현재 값이 예상 값인 currentValue와 일치하면 newValue로 설정하고 true를 반환한다.
공유 자원에 대한 안전한 접근 방식을 구현하는 방법에 대해 알아보았지만 환경과 조건에 맞는 동기화 방법을 선택해야 할것이다.
"해당 포스팅은 OpenAI의 chatgpt를 참고하여 작성되었습니다."