현대 CPU는 8-16개 코어를 제공하지만 싱글스레드 프로그램은 하나만 사용한다. 멀티스레딩은 이 병렬성을 활용해 성능을 N배 향상시키지만, 경쟁 조건(Race Condition), 데드락(Deadlock), 메모리 일관성 문제가 발생한다. 멀티스레딩이 어려운 이유는 공유 상태와 가시성 때문이다. 여러 스레드가 같은 메모리를 수정할 때(공유 상태), 한 스레드의 수정이 다른 스레드에게 언제 보이는가(가시성)를 보장해야 한다.
가시성 문제는 세 계층에서 발생한다. 하드웨어 계층에서 CPU 캐시는 각 코어마다 다른 값을 가질 수 있다(MESI 프로토콜). 언어 계층에서 컴파일러는 성능을 위해 명령어 순서를 재배치한다(Memory Reordering). 동기화 계층에서 Mutex, Atomic, Memory Order가 가시성을 보장한다. 각 계층은 성능과 안전성의 트레이드오프를 갖는다.
프로세스 내 메모리 구조
| 메모리 영역 | 공유 여부 | 설명 |
|---|---|---|
| 코드 (Text Segment) | ✅ 공유 | 프로그램 실행 코드 |
| 데이터 (Data Segment) | ✅ 공유 | 전역 변수, 정적 변수 |
| 힙 (Heap) | ✅ 공유 | 동적 할당 메모리 |
| 스레드 1 스택 | ❌ 독립 | 스레드 1의 지역 변수, 함수 호출 정보 |
| 스레드 2 스택 | ❌ 독립 | 스레드 2의 지역 변수, 함수 호출 정보 |
| 스레드 3 스택 | ❌ 독립 | 스레드 3의 지역 변수, 함수 호출 정보 |
스레드 간 공유 vs 독립
| 항목 | 공유 여부 | 범위 | 설명 |
|---|---|---|---|
| 코드 영역 | ✅ 공유 | 프로세스 | 모든 스레드가 같은 코드 실행 |
| 전역 변수 | ✅ 공유 | 프로세스 | Data Segment에 저장 |
| 힙 메모리 | ✅ 공유 | 프로세스 | malloc(), new로 할당한 메모리 |
| 파일 디스크립터 | ✅ 공유 | 프로세스 | 열린 파일, 소켓 등 |
| 프로세스 ID (PID) | ✅ 공유 | 프로세스 | 모든 스레드가 같은 PID |
| 스택 | ❌ 독립 | 스레드 | 각 스레드마다 독립적인 스택 |
| 레지스터 | ❌ 독립 | 스레드 | CPU 레지스터 상태 |
| 프로그램 카운터 (PC) | ❌ 독립 | 스레드 | 현재 실행 중인 명령어 주소 |
| 스레드 ID (TID) | ❌ 독립 | 스레드 | 각 스레드 고유 ID |
| 스레드 로컬 저장소 (TLS) | ❌ 독립 | 스레드 | 스레드별 전역 변수 |
프로세스 vs 스레드 비교
| 구분 | 프로세스 | 스레드 |
|---|---|---|
| 정의 | 독립적인 실행 단위 | 프로세스 내 실행 단위 |
| 메모리 공간 | 완전히 독립 | 코드/데이터/힙 공유, 스택만 독립 |
| 생성 비용 | 높음 (fork) | 낮음 (pthread_create) |
| 통신 방법 | IPC 필요 (파이프, 소켓, 공유 메모리) | 직접 메모리 접근 가능 |
| 문맥 교환 비용 | 높음 | 낮음 |
| 안정성 | 높음 (한 프로세스 죽어도 다른 프로세스 영향 없음) | 낮음 (한 스레드 죽으면 전체 프로세스 종료) |
#include <thread>
#include <iostream>
void WorkerFunction(int id) {
std::cout << "Thread " << id << " is running" << std::endl;
}
int main() {
std::thread t1(WorkerFunction, 1);
std::thread t2(WorkerFunction, 2);
std::cout << "Main thread is running" << std::endl;
t1.join(); // t1이 끝날 때까지 대기
t2.join(); // t2가 끝날 때까지 대기
return 0;
}
join vs detach:
void Example1() {
std::thread t([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Thread finished" << std::endl;
});
t.join(); // 스레드가 끝날 때까지 대기
std::cout << "After join" << std::endl;
}
void Example2() {
std::thread t([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Thread finished" << std::endl;
});
t.detach(); // 스레드를 백그라운드로 보냄
std::cout << "After detach" << std::endl;
}
중요한 규칙:
// ❌ 위험
void Dangerous() {
std::thread t([]() {
// 작업...
});
// t.join()도 t.detach()도 안 함!
} // std::terminate() 호출! (프로그램 강제 종료)
// ✅ 안전
void Safe() {
std::thread t([]() {
// 작업...
});
t.join(); // 또는 t.detach()
}
// 1. 값으로 전달
void Func1(int value) {
std::cout << value << std::endl;
}
std::thread t1(Func1, 42);
// 2. 레퍼런스로 전달
void Func2(int& value) {
value++;
}
int x = 10;
std::thread t2(Func2, std::ref(x)); // std::ref 필요!
t2.join();
// x는 이제 11
// 3. 람다로 캡처
int z = 5;
std::thread t4([&z]() { // 참조 캡처
z *= 2;
});
t4.join();
// z는 이제 10
주의사항:
// ❌ 위험
void Dangerous() {
int localVar = 42;
std::thread t([&localVar]() {
std::cout << localVar << std::endl;
});
t.detach(); // 위험!
} // localVar가 파괴됨, 스레드는 댕글링 레퍼런스!
// ✅ 안전
void Safe() {
int localVar = 42;
std::thread t([localVar]() { // 값 캡처
std::cout << localVar << std::endl;
});
t.detach();
}
int globalCounter = 0;
void IncrementCounter() {
for (int i = 0; i < 100000; i++) {
globalCounter++; // 위험!
}
}
int main() {
std::thread t1(IncrementCounter);
std::thread t2(IncrementCounter);
t1.join();
t2.join();
std::cout << "Counter: " << globalCounter << std::endl;
// 예상: 200000
// 실제: 112453 (매번 다름!)
}
왜 문제인가?
globalCounter++는 실제로 3단계:
1. LOAD: 레지스터 = globalCounter (메모리 → CPU)
2. ADD: 레지스터 = 레지스터 + 1
3. STORE: globalCounter = 레지스터 (CPU → 메모리)
스레드 1과 스레드 2가 동시에 실행되면:
시간 스레드 1 스레드 2 globalCounter
────────────────────────────────────────────────────────────────────
t0 LOAD (값: 0) 0
t1 LOAD (값: 0) 0
t2 ADD (1) 0
t3 ADD (1) 0
t4 STORE (1) 1
t5 STORE (1) 1
결과: 2가 되어야 하는데 1이 됨!
어셈블리로 보기:
; globalCounter++의 어셈블리
mov eax, DWORD PTR [globalCounter] ; LOAD
add eax, 1 ; ADD
mov DWORD PTR [globalCounter], eax ; STORE
; 3개 명령어 사이에 컨텍스트 스위치 가능!
동시에 실행되면 안 되는 코드 영역이다.
std::mutex mtx;
void SafeIncrementCounter() {
for (int i = 0; i < 100000; i++) {
mtx.lock(); // 임계 영역 시작
globalCounter++; // 안전!
mtx.unlock(); // 임계 영역 끝
}
}
// RAII 스타일 (권장)
void SafeIncrementCounter2() {
for (int i = 0; i < 100000; i++) {
std::lock_guard<std::mutex> lock(mtx);
globalCounter++;
} // 자동으로 unlock
}
class BankAccount {
int balance = 1000;
std::mutex mtx;
public:
// ❌ 데이터 경쟁
int GetBalance() {
return balance; // lock 없음!
}
void Withdraw(int amount) {
std::lock_guard<std::mutex> lock(mtx);
balance -= amount;
}
// ✅ 안전
int GetBalanceSafe() {
std::lock_guard<std::mutex> lock(mtx);
return balance;
}
};
원칙:
#include <mutex>
std::mutex mtx;
int sharedData = 0;
// RAII 스타일 (권장)
void CriticalSectionRAII() {
std::lock_guard<std::mutex> lock(mtx);
sharedData++;
// 스코프 끝나면 자동 unlock
}
// unique_lock (더 유연함)
void CriticalSectionUnique() {
std::unique_lock<std::mutex> lock(mtx);
sharedData++;
lock.unlock(); // 중간에 unlock 가능
// 다른 작업...
lock.lock(); // 다시 lock 가능
}
Mutex의 내부 동작:
// 간단한 Mutex 구현 (개념적)
class SimpleMutex {
std::atomic<bool> locked{false};
public:
void lock() {
while (locked.exchange(true, std::memory_order_acquire)) {
std::this_thread::yield(); // CPU 양보
}
}
void unlock() {
locked.store(false, std::memory_order_release);
}
};
// 실제 구현은 더 복잡:
// - Futex (Fast Userspace Mutex) 사용
// - 잠금 실패 시 커널에 등록
// - 대기 큐 관리
비용:
성공적인 lock (경쟁 없음): 10-50 사이클
실패한 lock (경쟁): 1000-10000 사이클 (컨텍스트 스위치)
class Spinlock {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
void lock() {
// busy-wait: CPU를 계속 사용
while (flag.test_and_set(std::memory_order_acquire)) {
// 계속 시도
}
}
void unlock() {
flag.clear(std::memory_order_release);
}
};
Mutex vs Spinlock:
| 특성 | Mutex | Spinlock |
|---|---|---|
| 대기 방식 | Sleep (블로킹) | Busy-wait |
| CPU 사용 | 낮음 | 높음 (100%) |
| 컨텍스트 스위치 | 있음 | 없음 |
| 적합한 경우 | 긴 임계 영역 | 매우 짧은 임계 영역 |
| 비용 (잠금 성공) | 50 사이클 | 10 사이클 |
| 비용 (잠금 실패) | 10000 사이클 | 무한 (대기) |
선택 가이드:
// Mutex: 임계 영역이 김 (> 1000 사이클)
std::mutex mtx;
mtx.lock();
ComplexCalculation(); // 10000 사이클
mtx.unlock();
// Spinlock: 임계 영역이 매우 짧음 (< 100 사이클)
Spinlock spinlock;
spinlock.lock();
counter++; // 1-2 사이클
spinlock.unlock();
#include <atomic>
std::atomic<int> atomicCounter{0};
void IncrementAtomic() {
for (int i = 0; i < 100000; i++) {
atomicCounter++; // 원자적! (lock 없이 안전)
// 또는
atomicCounter.fetch_add(1, std::memory_order_relaxed);
}
}
// Compare-And-Swap (CAS)
void CASExample() {
int expected = 10;
int desired = 20;
// atomicCounter가 expected면 desired로 변경
// 성공하면 true, 실패하면 false
if (atomicCounter.compare_exchange_strong(expected, desired)) {
std::cout << "Success! Changed 10 to 20" << std::endl;
} else {
std::cout << "Failed! Current value: " << expected << std::endl;
}
}
원자적 연산의 종류:
std::atomic<int> x{0};
// 읽기/쓰기
int value = x.load();
x.store(42);
// 수정
x++;
x--;
x += 5;
// Fetch 연산 (기존 값 반환)
int old = x.fetch_add(1); // x++, 기존 값 반환
int old = x.fetch_sub(1); // x--, 기존 값 반환
int old = x.exchange(100); // x = 100, 기존 값 반환
// Compare-And-Swap
int expected = 10;
bool success = x.compare_exchange_strong(expected, 20);
// weak vs strong
// weak: 가짜 실패 가능 (spurious failure), 더 빠름
// strong: 항상 정확, 느림
while (!x.compare_exchange_weak(expected, 20)) {
// 루프에서는 weak 사용
}
어셈블리로 보기:
; atomicCounter++ (x86-64)
lock inc DWORD PTR [atomicCounter]
; lock 접두사:
; - 다른 CPU가 메모리에 접근하지 못하도록 버스 잠금
; - 원자성 보장
; - 1-10 사이클 (캐시 히트 시)
스레드 간 신호를 주고받는 메커니즘이다.
#include <condition_variable>
#include <queue>
std::queue<int> taskQueue;
std::mutex mtx;
std::condition_variable cv;
bool done = false;
void Producer() {
for (int i = 0; i < 10; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> lock(mtx);
taskQueue.push(i);
std::cout << "Produced: " << i << std::endl;
}
cv.notify_one(); // Consumer 깨우기
}
{
std::lock_guard<std::mutex> lock(mtx);
done = true;
}
cv.notify_all();
}
void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 조건 대기
cv.wait(lock, []() {
return !taskQueue.empty() || done;
});
if (taskQueue.empty()) {
break; // done이고 큐가 비어있음
}
int task = taskQueue.front();
taskQueue.pop();
lock.unlock();
std::cout << "Consumer " << id << " processing: " << task << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
wait의 내부 동작:
// cv.wait(lock, predicate)는 다음과 같음:
while (!predicate()) {
// 1. mutex unlock
// 2. 대기 큐에 등록
// 3. sleep (블로킹)
// 4. notify 받으면 깨어남
// 5. mutex lock
}
// 주의: Spurious Wakeup (가짜 깨어남) 가능
// 따라서 항상 predicate 체크!
데드락의 4가지 조건:
1. 상호 배제 (Mutual Exclusion)
2. 점유 대기 (Hold and Wait)
3. 비선점 (No Preemption)
4. 순환 대기 (Circular Wait)
데드락 예시:
std::mutex mtx1, mtx2;
void Thread1() {
mtx1.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mtx2.lock(); // 대기 (Thread2가 mtx2를 잠금)
// 작업...
mtx2.unlock();
mtx1.unlock();
}
void Thread2() {
mtx2.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mtx1.lock(); // 대기 (Thread1이 mtx1을 잠금)
// 작업...
mtx1.unlock();
mtx2.unlock();
}
// 💀 데드락!
해결 1: 락 순서 지정
// ✅ 항상 같은 순서로 락 획득
void Thread1() {
mtx1.lock();
mtx2.lock();
// 작업...
mtx2.unlock();
mtx1.unlock();
}
void Thread2() {
mtx1.lock(); // 순서 통일!
mtx2.lock();
// 작업...
mtx2.unlock();
mtx1.unlock();
}
해결 2: std::lock (동시 잠금)
void Thread1() {
std::lock(mtx1, mtx2); // 둘 다 획득하거나 둘 다 실패
std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
// 작업...
}
해결 3: std::scoped_lock (C++17)
void Thread1() {
std::scoped_lock lock(mtx1, mtx2); // RAII + 동시 잠금
// 작업...
}
void Thread2() {
std::scoped_lock lock(mtx2, mtx1); // 순서 상관없음!
// 작업...
}
CPU는 성능을 위해 명령어 순서를 바꿀 수 있다.
// 싱글스레드에서는 문제없음
int a = 0;
int b = 0;
void Thread1() {
a = 1; // (1)
b = 1; // (2)
}
void Thread2() {
while (b == 0); // (3) b가 1이 될 때까지 대기
assert(a == 1); // (4) a도 1일 것으로 기대
}
// 문제: CPU가 (1)과 (2)의 순서를 바꿀 수 있음!
// → (2), (1) 순서로 실행
// → Thread2에서 assert 실패!
메모리 순서 종류:
enum memory_order {
memory_order_relaxed, // 순서 보장 없음 (가장 빠름)
memory_order_consume, // 의존성만 보장
memory_order_acquire, // 이후 읽기/쓰기 재정렬 금지
memory_order_release, // 이전 읽기/쓰기 재정렬 금지
memory_order_acq_rel, // acquire + release
memory_order_seq_cst // Sequential Consistency (가장 느림, 기본값)
};
std::atomic<int> x{0};
std::atomic<int> y{0};
void Thread1() {
x.store(1, std::memory_order_seq_cst); // (1)
y.store(1, std::memory_order_seq_cst); // (2)
}
void Thread2() {
while (y.load(std::memory_order_seq_cst) == 0); // (3)
assert(x.load(std::memory_order_seq_cst) == 1); // (4) 보장됨!
}
// Sequential Consistency:
// - 모든 스레드가 같은 순서로 연산을 봄
// - (1) → (2) → (3) → (4) 순서 보장
std::atomic<int> data{0};
std::atomic<bool> ready{false};
void Producer() {
data.store(42, std::memory_order_relaxed); // (1)
ready.store(true, std::memory_order_release); // (2) Release
// Release: (2) 이전의 모든 읽기/쓰기는 (2) 전에 완료
}
void Consumer() {
while (!ready.load(std::memory_order_acquire)); // (3) Acquire
// Acquire: (3) 이후의 모든 읽기/쓰기는 (3) 후에 시작
assert(data.load(std::memory_order_relaxed) == 42); // (4) 보장됨!
}
// Acquire-Release 동기화:
// Producer의 Release와 Consumer의 Acquire가 만나는 지점에서
// 메모리 가시성 보장
std::atomic<int> counter{0};
void IncrementRelaxed() {
counter.fetch_add(1, std::memory_order_relaxed);
// 순서 보장 없음, 가장 빠름
// 단, 원자성은 보장
}
// 사용 예: 통계 카운터
std::atomic<int> bytesProcessed{0};
std::atomic<int> packetsProcessed{0};
void ProcessPacket(const Packet& packet) {
// 처리...
bytesProcessed.fetch_add(packet.size, std::memory_order_relaxed);
packetsProcessed.fetch_add(1, std::memory_order_relaxed);
// 순서는 상관없음, 빠르게 증가만 하면 됨
}
멀티코어 CPU에서 각 코어의 캐시가 일관성을 유지하는 프로토콜이다. False Sharing이 왜 느린지 이해하는 핵심이다.
캐시 구조:
CPU 0 CPU 1
┌────────────┐ ┌────────────┐
│ Core 0 │ │ Core 1 │
├────────────┤ ├────────────┤
│ L1 Cache │ │ L1 Cache │
│ (32 KB) │ │ (32 KB) │
└─────┬──────┘ └─────┬──────┘
│ │
└──────────┬─────────────────┘
│
┌─────┴──────┐
│ L3 Cache │ ← 공유 캐시
│ (8 MB) │
└─────┬──────┘
│
┌─────┴──────┐
│ RAM │
│ (16 GB) │
└────────────┘
캐시 라인: 64 바이트
- 하나의 변수만 수정해도 64바이트 전체가 무효화!
MESI 상태:
M (Modified): 이 캐시만 가지고 있고, RAM과 다름 (수정됨)
E (Exclusive): 이 캐시만 가지고 있고, RAM과 같음
S (Shared): 여러 캐시가 가지고 있고, RAM과 같음
I (Invalid): 무효 (다른 코어가 수정함)
같은 캐시 라인 (False Sharing):
초기 상태:
CPU 0 L1: I (Invalid)
CPU 1 L1: I (Invalid)
RAM: counter1=0, counter2=0
1. CPU 0이 counter1 읽기:
CPU 0 L1: E (Exclusive) [counter1=0, counter2=0]
CPU 1 L1: I (Invalid)
2. CPU 1이 counter2 읽기:
CPU 0 L1: S (Shared) [counter1=0, counter2=0]
CPU 1 L1: S (Shared) [counter1=0, counter2=0]
3. CPU 0이 counter1 = 1 쓰기:
→ CPU 0 L1: M (Modified) [counter1=1, counter2=0]
→ CPU 1 L1: I (Invalid) ← 캐시 무효화!
4. CPU 1이 counter2 접근 시도:
→ 캐시 미스! RAM에서 다시 로드
→ 약 100 사이클 소요! (L1 접근은 4 사이클)
→ Ping-Pong 효과: 캐시 라인이 두 코어 사이를 왔다갔다함!
False Sharing 방지:
// ❌ False Sharing
struct BadCounter {
std::atomic<int> counter1; // 캐시 라인 0
std::atomic<int> counter2; // 캐시 라인 0 (같은 라인!)
};
// ✅ 캐시 라인 분리
struct GoodCounter {
alignas(64) std::atomic<int> counter1; // 캐시 라인 0
char padding[60];
alignas(64) std::atomic<int> counter2; // 캐시 라인 1
};
// 또는 C++17
struct alignas(std::hardware_destructive_interference_size) AlignedCounter {
std::atomic<int> counter;
};
실제 측정:
constexpr int ITERATIONS = 100'000'000;
void TestBad() {
BadData data{0, 0};
std::thread t1([&]() {
for (int i = 0; i < ITERATIONS; i++) {
data.counter1++;
}
});
std::thread t2([&]() {
for (int i = 0; i < ITERATIONS; i++) {
data.counter2++;
}
});
t1.join();
t2.join();
// 시간: 약 5000ms
}
void TestGood() {
GoodData data{0, {}, 0};
// 시간: 약 800ms (6.25배 빠름!)
}
// Fence (Barrier)
void Producer() {
data = 42;
std::atomic_thread_fence(std::memory_order_release);
ready.store(true, std::memory_order_relaxed);
// fence가 Release barrier 역할
}
void Consumer() {
while (!ready.load(std::memory_order_relaxed));
std::atomic_thread_fence(std::memory_order_acquire);
// fence가 Acquire barrier 역할
assert(data == 42);
}
CPU별 메모리 모델:
| CPU | 메모리 모델 | 특징 |
|---|---|---|
| x86/x64 | TSO (Total Store Order) | 강한 모델, 배리어 적음 |
| ARM | Weak | 약한 모델, 명시적 배리어 필요 |
| PowerPC | Weak | 약한 모델 |
Lock-Free:
- 최소 한 스레드는 항상 진행 (system-wide progress)
- 데드락 불가능
- 하지만 개별 스레드는 starvation 가능
Wait-Free:
- 모든 스레드가 유한 시간 내에 완료 (per-thread progress)
- 더 강한 보장, 구현 어려움
template<typename T>
class LockFreeStack {
struct Node {
T data;
Node* next;
};
std::atomic<Node*> head{nullptr};
public:
void Push(T value) {
Node* newNode = new Node{value, nullptr};
// CAS 루프
Node* oldHead = head.load(std::memory_order_relaxed);
do {
newNode->next = oldHead;
} while (!head.compare_exchange_weak(
oldHead,
newNode,
std::memory_order_release,
std::memory_order_relaxed
));
}
bool Pop(T& result) {
Node* oldHead = head.load(std::memory_order_acquire);
do {
if (oldHead == nullptr) {
return false;
}
} while (!head.compare_exchange_weak(
oldHead,
oldHead->next,
std::memory_order_release,
std::memory_order_acquire
));
result = oldHead->data;
delete oldHead; // ⚠️ ABA 문제 가능!
return true;
}
};
// 문제 시나리오:
스레드 1 스레드 2 스레드 3
head = [A]
oldHead = [A]
Pop() → [A] 제거
head = [B]
Pop() → [B] 제거
head = null
Push([A']) (재사용된 주소!)
head = [A']
CAS(head, [A], [C])
→ 성공! (주소가 같으니까)
→ 하지만 다른 [A]임!
해결 방법 1: Tagged Pointer
template<typename T>
class LockFreeStackTagged {
struct Node {
T data;
Node* next;
};
struct TaggedPointer {
Node* ptr;
uintptr_t tag; // 버전 번호
};
std::atomic<TaggedPointer> head{{nullptr, 0}};
public:
void Push(T value) {
Node* newNode = new Node{value, nullptr};
TaggedPointer oldHead = head.load();
TaggedPointer newHead;
do {
newNode->next = oldHead.ptr;
newHead.ptr = newNode;
newHead.tag = oldHead.tag + 1; // 태그 증가
} while (!head.compare_exchange_weak(oldHead, newHead));
}
};
해결 방법 2: Hazard Pointer
template<typename T>
class LockFreeStackHazard {
struct Node {
T data;
std::atomic<Node*> next;
};
std::atomic<Node*> head{nullptr};
static thread_local Node* hazardPointer;
public:
bool Pop(T& result) {
Node* oldHead;
do {
oldHead = head.load();
if (oldHead == nullptr) {
return false;
}
// 이 포인터 사용 중이라고 표시
hazardPointer = oldHead;
// 다시 확인 (head가 바뀌었나?)
if (head.load() != oldHead) {
continue;
}
} while (!head.compare_exchange_weak(oldHead, oldHead->next));
result = oldHead->data;
// 안전하게 삭제
RetireNode(oldHead);
hazardPointer = nullptr;
return true;
}
};
template<typename T>
class LockFreeQueue {
struct Node {
T data;
std::atomic<Node*> next{nullptr};
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
LockFreeQueue() {
Node* dummy = new Node{};
head.store(dummy);
tail.store(dummy);
}
void Enqueue(T value) {
Node* newNode = new Node{value, nullptr};
while (true) {
Node* oldTail = tail.load();
Node* next = oldTail->next.load();
if (oldTail == tail.load()) {
if (next == nullptr) {
if (oldTail->next.compare_exchange_weak(next, newNode)) {
tail.compare_exchange_weak(oldTail, newNode);
return;
}
} else {
tail.compare_exchange_weak(oldTail, next);
}
}
}
}
bool Dequeue(T& result) {
while (true) {
Node* oldHead = head.load();
Node* oldTail = tail.load();
Node* next = oldHead->next.load();
if (oldHead == head.load()) {
if (oldHead == oldTail) {
if (next == nullptr) {
return false;
}
tail.compare_exchange_weak(oldTail, next);
} else {
result = next->data;
if (head.compare_exchange_weak(oldHead, next)) {
delete oldHead;
return true;
}
}
}
}
}
};
벤치마크 (Stack Pop 1M번):
방법 시간 메모리
Mutex 850ms 안정적
shared_ptr (atomic) 450ms 높음
Hazard Pointers 180ms 예측 가능
Epoch-Based 150ms 불안정
class ThreadPool {
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable cv;
bool stop = false;
public:
ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; i++) {
workers.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this]() {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::lock_guard<std::mutex> lock(mtx);
stop = true;
}
cv.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
template<typename F>
void Enqueue(F&& f) {
{
std::lock_guard<std::mutex> lock(mtx);
tasks.emplace(std::forward<F>(f));
}
cv.notify_one();
}
};
#include <future>
class ThreadPoolWithFuture {
public:
template<typename F, typename... Args>
auto Enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> result = task->get_future();
{
std::lock_guard<std::mutex> lock(mtx);
tasks.emplace([task]() { (*task)(); });
}
cv.notify_one();
return result;
}
};
// 사용
ThreadPoolWithFuture pool(4);
auto future = pool.Enqueue([](int x) {
return x * x;
}, 42);
int result = future.get(); // 결과 대기 및 획득
std::cout << "Result: " << result << std::endl; // 1764
Work Stealing은 유휴 스레드가 바쁜 스레드의 작업을 가져가는 기법이다.
문제: 작업 불균형
일반 스레드 풀:
스레드 1: [████████████████████████] 바쁨!
스레드 2: [█] 유휴...
스레드 3: [█] 유휴...
스레드 4: [████████████████████████] 바쁨!
해결: Work Stealing
Work Stealing:
스레드 1: [████████] ← 스레드 2가 작업 가져감
스레드 2: [████████] (스레드 1에서 steal)
스레드 3: [████████] (스레드 4에서 steal)
스레드 4: [████████] ← 스레드 3이 작업 가져감
Deque 기반 구현:
template<typename T>
class WorkStealingQueue {
std::deque<T> queue;
mutable std::mutex mtx;
public:
// Push: 로컬 스레드만 호출 (LIFO - 캐시 친화적)
void Push(T item) {
std::lock_guard<std::mutex> lock(mtx);
queue.push_front(item);
}
// Pop: 로컬 스레드만 호출 (LIFO)
bool Pop(T& item) {
std::lock_guard<std::mutex> lock(mtx);
if (queue.empty()) {
return false;
}
item = queue.front();
queue.pop_front();
return true;
}
// Steal: 다른 스레드가 호출 (FIFO - 오래된 작업 우선)
bool Steal(T& item) {
std::lock_guard<std::mutex> lock(mtx);
if (queue.empty()) {
return false;
}
item = queue.back();
queue.pop_back();
return true;
}
};
Work Stealing 스레드 풀:
class WorkStealingThreadPool {
using Task = std::function<void()>;
std::vector<std::unique_ptr<WorkStealingQueue<Task>>> queues;
std::vector<std::thread> threads;
std::atomic<bool> running{true};
thread_local static int localQueueIndex;
public:
WorkStealingThreadPool(size_t numThreads) {
queues.resize(numThreads);
threads.reserve(numThreads);
for (size_t i = 0; i < numThreads; i++) {
queues[i] = std::make_unique<WorkStealingQueue<Task>>();
}
for (size_t i = 0; i < numThreads; i++) {
threads.emplace_back([this, i]() {
WorkerThread(i);
});
}
}
void Submit(Task task) {
static std::atomic<size_t> next{0};
size_t index = next.fetch_add(1) % queues.size();
queues[index]->Push(std::move(task));
}
void SubmitLocal(Task task) {
if (localQueueIndex >= 0) {
queues[localQueueIndex]->Push(std::move(task));
} else {
Submit(std::move(task));
}
}
private:
void WorkerThread(int index) {
localQueueIndex = index;
std::mt19937 rng(std::random_device{}());
while (running) {
Task task;
// 1. 로컬 큐에서 Pop
if (queues[index]->Pop(task)) {
task();
continue;
}
// 2. 다른 큐에서 Steal
if (TrySteal(index, task, rng)) {
task();
continue;
}
// 3. 작업 없음
std::this_thread::yield();
}
}
bool TrySteal(int myIndex, Task& task, std::mt19937& rng) {
std::uniform_int_distribution<int> dist(0, queues.size() - 1);
for (size_t attempt = 0; attempt < queues.size(); attempt++) {
int victimIndex = dist(rng);
if (victimIndex == myIndex) {
continue;
}
if (queues[victimIndex]->Steal(task)) {
return true;
}
}
return false;
}
};
thread_local int WorkStealingThreadPool::localQueueIndex = -1;
성능 비교:
벤치마크: 파티클 시뮬레이션 (10만 개, 4코어)
일반 스레드 풀:
- 작업 분할: 수동
- 시간: 45ms
Work Stealing:
- 작업 분할: 자동
- 시간: 32ms (29% 빠름)
Cache 효과:
- LIFO (Pop): L1 캐시 히트율 90%
- FIFO (Steal): L1 캐시 히트율 60%
struct Task {
std::function<void()> function;
std::atomic<int> dependencies{0};
std::vector<Task*> dependents;
};
class TaskScheduler {
ThreadPool pool;
std::vector<std::unique_ptr<Task>> allTasks;
public:
TaskScheduler(size_t numThreads) : pool(numThreads) {}
Task* CreateTask(std::function<void()> func) {
auto task = std::make_unique<Task>();
task->function = std::move(func);
Task* ptr = task.get();
allTasks.push_back(std::move(task));
return ptr;
}
void AddDependency(Task* task, Task* dependency) {
task->dependencies++;
dependency->dependents.push_back(task);
}
void Submit(Task* task) {
if (task->dependencies == 0) {
pool.Enqueue([this, task]() {
task->function();
OnTaskComplete(task);
});
}
}
void OnTaskComplete(Task* task) {
for (Task* dependent : task->dependents) {
int oldDeps = dependent->dependencies.fetch_sub(1);
if (oldDeps == 1) {
Submit(dependent);
}
}
}
};
// 사용 예시
TaskScheduler scheduler(4);
Task* loadMesh = scheduler.CreateTask([]() {
// 메시 로드...
});
Task* loadTexture = scheduler.CreateTask([]() {
// 텍스처 로드...
});
Task* createMaterial = scheduler.CreateTask([]() {
// 머티리얼 생성...
});
scheduler.AddDependency(createMaterial, loadMesh);
scheduler.AddDependency(createMaterial, loadTexture);
scheduler.Submit(loadMesh);
scheduler.Submit(loadTexture);
메인 스레드 (60fps, 16ms):
├─ 입력 처리
├─ 게임 로직
├─ 렌더 커맨드 생성
└─ 프레임 제출
렌더 스레드:
├─ 렌더 커맨드 실행
├─ GPU 드라이버 호출
└─ 스왑 체인 관리
워커 스레드들:
├─ 물리 시뮬레이션
├─ AI 계산
├─ 사운드 믹싱
└─ 리소스 로딩
class FramePipeline {
struct FrameData {
std::vector<RenderCommand> commands;
std::mutex mtx;
};
std::array<FrameData, 3> frames; // 트리플 버퍼링
std::atomic<int> currentFrame{0};
std::thread renderThread;
std::atomic<bool> running{true};
public:
FramePipeline() {
renderThread = std::thread([this]() {
RenderThreadLoop();
});
}
void GameThread() {
while (running) {
int frameIndex = currentFrame.load();
FrameData& frame = frames[frameIndex];
UpdateGame();
{
std::lock_guard<std::mutex> lock(frame.mtx);
frame.commands.clear();
GenerateRenderCommands(frame.commands);
}
currentFrame = (frameIndex + 1) % 3;
std::this_thread::sleep_for(std::chrono::milliseconds(16));
}
}
void RenderThreadLoop() {
int lastRenderedFrame = 0;
while (running) {
int currentFrameIndex = currentFrame.load();
int renderFrame = (currentFrameIndex + 2) % 3;
if (renderFrame != lastRenderedFrame) {
FrameData& frame = frames[renderFrame];
std::lock_guard<std::mutex> lock(frame.mtx);
ExecuteRenderCommands(frame.commands);
lastRenderedFrame = renderFrame;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
};
class GameTaskGraph {
WorkStealingThreadPool pool;
public:
GameTaskGraph() : pool(std::thread::hardware_concurrency()) {}
// 병렬 AI 업데이트
void UpdateAI(std::vector<AIController*>& controllers) {
ParallelFor(0, controllers.size(), [&](int i) {
controllers[i]->Tick();
}, pool);
}
// 병렬 물리 시뮬레이션
void SimulatePhysics(std::vector<RigidBody*>& bodies) {
pool.Submit([&]() {
ParallelFor(0, bodies.size(), [&](int i) {
bodies[i]->DetectCollisions();
}, pool);
});
for (auto* body : bodies) {
body->ResolveCollisions();
}
pool.Submit([&]() {
ParallelFor(0, bodies.size(), [&](int i) {
bodies[i]->UpdatePosition();
}, pool);
});
}
};
# 컴파일
g++ -fsanitize=thread -g program.cpp -o program
# 실행
./program
# 출력 예시:
# WARNING: ThreadSanitizer: data race
# Write of size 4 at 0x7b0400000000 by thread T2:
# #0 IncrementCounter() program.cpp:10
#
# Previous write of size 4 at 0x7b0400000000 by thread T1:
# #0 IncrementCounter() program.cpp:10
class DeadlockDetector {
std::map<std::thread::id, std::set<std::mutex*>> heldLocks;
std::mutex detectorMtx;
public:
void AcquireLock(std::mutex* mtx) {
std::lock_guard<std::mutex> lock(detectorMtx);
auto threadId = std::this_thread::get_id();
auto& locks = heldLocks[threadId];
for (auto* heldLock : locks) {
if (WouldCauseDeadlock(heldLock, mtx)) {
std::cerr << "Potential deadlock detected!" << std::endl;
std::abort();
}
}
locks.insert(mtx);
}
void ReleaseLock(std::mutex* mtx) {
std::lock_guard<std::mutex> lock(detectorMtx);
auto threadId = std::this_thread::get_id();
heldLocks[threadId].erase(mtx);
}
};
class ThreadProfiler {
struct ThreadStats {
std::atomic<uint64_t> tasksCompleted{0};
std::atomic<uint64_t> totalTime{0};
std::atomic<uint64_t> idleTime{0};
};
std::map<std::thread::id, ThreadStats> stats;
public:
void RecordTask(uint64_t duration) {
auto threadId = std::this_thread::get_id();
stats[threadId].tasksCompleted++;
stats[threadId].totalTime += duration;
}
void PrintStats() {
std::cout << "Thread Performance:" << std::endl;
for (const auto& [id, stat] : stats) {
uint64_t avgTime = stat.totalTime / stat.tasksCompleted;
std::cout << "Thread " << id << ":" << std::endl;
std::cout << " Tasks: " << stat.tasksCompleted << std::endl;
std::cout << " Avg time: " << avgTime << " ns" << std::endl;
std::cout << " Utilization: "
<< (100.0 * stat.totalTime / (stat.totalTime + stat.idleTime))
<< "%" << std::endl;
}
}
};
Lock-Free vs Mutex 벤치마크:
#include <benchmark/benchmark.h>
std::atomic<int> atomicCounter{0};
int mutexCounter = 0;
std::mutex mtx;
static void BM_Atomic(benchmark::State& state) {
for (auto _ : state) {
atomicCounter.fetch_add(1, std::memory_order_relaxed);
}
}
BENCHMARK(BM_Atomic)->Threads(1)->Threads(2)->Threads(4)->Threads(8);
static void BM_Mutex(benchmark::State& state) {
for (auto _ : state) {
std::lock_guard<std::mutex> lock(mtx);
mutexCounter++;
}
}
BENCHMARK(BM_Mutex)->Threads(1)->Threads(2)->Threads(4)->Threads(8);
// 결과:
// Threads Atomic (ns/op) Mutex (ns/op) Speedup
// 1 5 50 10x
// 2 10 1000 100x
// 4 15 2000 133x
// 8 20 4000 200x
스레드 친화도 (Thread Affinity):
#include <pthread.h>
void SetThreadAffinity(std::thread& thread, int coreId) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(coreId, &cpuset);
pthread_t nativeHandle = thread.native_handle();
pthread_setaffinity_np(nativeHandle, sizeof(cpu_set_t), &cpuset);
}
// 사용
std::thread physicsThread(PhysicsSimulation);
SetThreadAffinity(physicsThread, 2); // 코어 2에 고정
// 장점:
// - 캐시 히트율 증가
// - 컨텍스트 스위치 감소
// - 예측 가능한 성능
멀티스레딩의 핵심 원칙:
공유 최소화: 가능한 한 데이터를 공유하지 않는다
동기화 명확화: 공유가 필요하면 명확히 보호한다
데드락 방지: 락 순서를 지킨다
성능 측정: 추측하지 말고 측정한다
디버깅 도구: TSan, Helgrind 활용