이름에서 유추를 해보자.
readers, writer 독자는 여러명인데 쓰는 사람은1명이다.
멀티스레드에서 읽는건 여러명인데 쓰는건 1명이라는 의미는 값의 변경이 거의 없다는 뜻이다. 대부분이 값을 읽어오기만 하고 아주 가끔 값을 수정한다.
기존 방식처럼 모든 행동에 lock을 건다면 이 상황에서 read에 lock을 거는건 불필요하다. 진짜 값을 수정할 때만 lock을 걸고 단순히 읽는 상황에서는 lock 없이 여러 쓰레드가 사용하게 한다. 이것이 RW Lock이다.
//lock.h
#pragma once
#include <atomic>
/*---------------------
RW SpinLock
---------------------*/
#defin OUT
#define CRASH(cause) \
{ \
uint32* crash = nullptr; \
__analysis_assume(crash != nullptr); \
*crash = 0xDEADBEEF; \
}
//상위 16: WriteFlag (lock을 획득한 스레드의 Id) (Exclusive Lock Owner ThreadId)
//하위 16: ReadFlag (공유해서 사용되는 횟수)
// Write -> read , Write -> write (0)
// read -> write (x)
class Lock
{
public:
enum : unsigned int
{
// 획득하는데 기다리는 상한
ACQUIRE_TIMEOUT_TICK = 10'000,
MAX_SPIN_COUNT = 5000,
WRITE_THREAD_MASK = 0xFFFF'0000,
READ_COUNT_MASK = 0x0000'FFFF,
EMPTY_FLAG = 0x0000'0000,
};
public:
void WriteLock();
void WriteUnLock();
void ReadLock();
void ReadUnLock();
private:
std::atomic<unsigned int> _lockFlag = EMPTY_FLAG;
// write lock을 잡은 상태에서 write lock을 또 잡을 수 있도록
// Count를 분리
unsigned int _writeCount = 0;
};
/*--------------------
Lock guard
---------------------*/
//RAII
class ReadLockGuard
{
public:
ReadLockGuard(Lock& lock) : _lock(lock) { _lock.ReadLock(); }
~ReadLockGuard() { _lock.ReadUnLock(); }
private:
Lock& _lock;
};
class WriteLockGuard
{
public:
WriteLockGuard(Lock& lock) : _lock(lock) { _lock.WriteLock(); }
~WriteLockGuard() { _lock.WriteUnLock(); }
private:
Lock& _lock;
};
//lock.cpp
#include <atomic>
#include <Windows.h>
#include <thread>
#include "Lock.h"
#include "CoreTLS.h"
void Lock::WriteLock()
{
// 동일한 쓰레드가 write lock을 잡고 있으면 또 잡게 해줌
const unsigned int lockThreadID = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
if (LThreadId == lockThreadID)
{
_writeCount++;
return;
}
// read, write 아무도 안 쓰고 있을때 소유권을 얻는다
// EMPTY_FLAG 일때
const __int64 beginTick = ::GetTickCount64();
const unsigned int desired = ((LThreadId << 16) & WRITE_THREAD_MASK);
while (true)
{
for (unsigned int spinCount = 0; spinCount < MAX_SPIN_COUNT; spinCount++)
{
unsigned int expected = EMPTY_FLAG;
if (_lockFlag.compare_exchange_strong(OUT expected, desired))
{
_writeCount++;
return;
}
}
if (::GetTickCount64() - beginTick > ACQUIRE_TIMEOUT_TICK)
CRASH("WRITE_LOCK_ACQ_TIMEOUT");
std::this_thread::yield();
}
}
void Lock::WriteUnLock()
{
//ReadLock 다 풀기 전에는 WriteUnlock 불가능
if ((_lockFlag.load() & READ_COUNT_MASK) != 0)
CRASH("INVALID_WRITE_UNLOCK_ORDER");
const int lockCount = --_writeCount;
if (lockCount == 0)
_lockFlag.store(EMPTY_FLAG);
}
void Lock::ReadLock()
{
// 동일한 스레드가 lock 잡고 있으면 성공
// 즉, 동일한 스레드가 write lock을 잡고 있는 상태
const unsigned int lockThreadID = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
if (LThreadId == lockThreadID)
{
_lockFlag.fetch_add(1);
return;
}
// 아무도 lock을 안잡으면 공유 카운트를 올린다.
while (true)
{
const __int64 beginTick = ::GetTickCount64();
for (unsigned int spinCount = 0; spinCount < MAX_SPIN_COUNT; spinCount++)
{
unsigned int expected = _lockFlag.load() & READ_COUNT_MASK;
if (_lockFlag.compare_exchange_strong(OUT expected, expected + 1))
return;
}
if (::GetTickCount64() - beginTick > ACQUIRE_TIMEOUT_TICK)
CRASH("RAED_LOCK_ACQ_TIMEOUT");
std::this_thread::yield();
}
}
void Lock::ReadUnLock()
{
if ((_lockFlag.fetch_sub(1) & READ_COUNT_MASK) == 0)
CRASH("INVALID_READ_MULTIPLE_UNLOCK");
}
// GameServer.cpp
#include "pch.h"
#include <iostream>
#include "CorePch.h"
#include <thread>
#include <atomic>
#include <chrono>
#include "CoreGlobal.h"
#include "ThreadManager.h"
thread_local unsigned int LThreadId = 0;
class TestLock
{
Lock _locks[1];
public:
unsigned int TestRead()
{
ReadLockGuard readLockGuard_0(_locks[0]);
if (_queue.empty())
return -1;
return _queue.front();
}
void TestPush()
{
WriteLockGuard writeLockGuard_0(_locks[0]);
_queue.push(rand() % 100);
}
void Testpop()
{
WriteLockGuard writeLockGuard_0(_locks[0]);
if (_queue.empty() == false)
_queue.pop();
}
private:
queue<unsigned int> _queue;
};
TestLock testLock;
void ThreadWrite(unsigned int threadId)
{
LThreadId = threadId;
while (true)
{
testLock.TestPush();
this_thread::sleep_for(1ms);
testLock.Testpop();
}
}
void ThreadRead(unsigned int threadId)
{
LThreadId = threadId;
while (true)
{
unsigned int value = testLock.TestRead();
cout << value << endl;
this_thread::sleep_for(1ms);
}
}
int main()
{
/*for (unsigned int i = 0; i < 2; i++)
{
GThreadManager->Launch(ThreadWrite);
}
for (unsigned int i = 0; i < 5; i++)
{
GThreadManager->Launch(ThreadRead);
}
GThreadManager->Join();*/
vector<thread> threads;
Atomic<unsigned int> SThreadId = 1;
for (unsigned int i = 0; i < 2; i++)
{
unsigned int next = SThreadId.fetch_add(1);
threads.push_back(thread(ThreadRead, next));
}
for (unsigned int i = 0; i < 5; i++)
{
unsigned int next = SThreadId.fetch_add(1);
threads.push_back(thread(ThreadWrite, next));
}
for (auto& t : threads)
t.join();
}
DeadLock의 가장 대표적인 경우를 생각해보자. A, B 행동이 있다. A의 내부에서는 A의 Lock을 잡고 B에서 lock을 잡는 함수를 호출한다. 그 반대도 동일하다면
A->B 의 lock 순서와 B->A lock 순서가 생긴다. 이럴 경우 dead lock 발생하기 매우 쉽다.
락을 그래프의 node라고 생각해보자
락은 잡는 순서대로 번호가 주어진다고 했을 때
0번 락을 잡은 상태에서 새로운 락을 잡으면 1번을 부여하고 0->1 간선을 만든다. 즉 먼저 잡은 쪽에서 나중에 잡은 쪽으로 향하는 방향 그래프가 만들어 진다.
0->1->2 같은 경우는 문제가 없다. 그런데 여기서 1번 lock을 잡고 0번 lock을 잡으려고 하면 문제가 생간다. 아까 A->B B->A 상황과 같다.
따라서 그래프에서 사이클의 여부가 DeadLock의 여부이다.
//DeadLockProfiler.h
#pragma once
#include <stack>
#include <map>
#include <vector>
/*---------------------
DeadLockProfiler
---------------------*/
class DeadLockProfiler
{
public:
void PushLock(const char* name);
void PopLock(const char* name);
void CheckCycle();
private:
void DFS(int here);
private :
Mutex _lock;
// 그래프를 위한 변수
unordered_map<const char*, int> _nameToId;
unordered_map<int, const char*> _idToName;
// TLS에 들어있음
//stack<int> _lockStack;
map<nt, set<int>> _lockHistory;
private:
vector<int> _discoveredOrder; // 노드가 발견된 순서를 기록
int _discoveredCount = 0; // 노드가 발견된 순서
vector<bool> _finished;
vector<int> _parent;
};
//DeadLockProfiler.cpp
#include "pch.h"
#include "DeadLockProfiler.h"
/*---------------------
DeadLockProfiler
---------------------*/
void DeadLockProfiler::PushLock(const char* name)
{
LockGuard guard(_lock);
// 아이디를 찾거나 발급
int lockId = 0;
auto it = _nameToId.find(name);
if (it == _nameToId.end())
{
lockId = static_cast<int>(_nameToId.size());
_nameToId[name] = lockId;
_idToName[lockId] = name;
}
else
{
lockId = it->second;
}
// 잡고 있는 락이 있었다면
if (LLockStack.empty() == false)
{
// 기존에 발견되지 않은 케이스는 데드락(사이클) 판별
const int prevId = LLockStack.top();
// 같은 스레드가 연속해서 잡은게 아니면
if (lockId != prevId)
{
// 가장 최근에 잡은 lock의 history를 살펴봄
set<int>& history = _lockHistory[prevId];
// 기록에 없으면 기록에 삽입
if (history.find(lockId) == history.end())
{
history.insert(lockId);
CheckCycle();
}
}
}
LLockStack.push(lockId);
}
void DeadLockProfiler::PopLock(const char* name)
{
LockGuard guard(_lock);
if (LLockStack.empty())
CRASH("MULTIPLE_UNLOCK");
int id = _nameToId[name];
if (LLockStack.top() != id)
CRASH("INVALID_UNLOCK");
LLockStack.pop();
}
void DeadLockProfiler::CheckCycle()
{
//DFS 초기화
const int lockCount = static_cast<int>(_nameToId.size());
_discoveredOrder = vector<int>(lockCount, -1);
_discoveredCount = 0;
_finished = vector<bool>(lockCount, false);
_parent = vector<int>(lockCount, -1);
for (int lockId = 0; lockId < lockCount; lockId++)
{
DFS(lockId);
}
_discoveredOrder.clear();
_finished.clear();
_parent.clear();
}
void DeadLockProfiler::DFS(int here)
{
// 이미 방문이 된 상태
if (_discoveredOrder[here] != -1)
return;
_discoveredOrder[here] = _discoveredCount++;
auto it = _lockHistory.find(here);
if (it == _lockHistory.end())
{
_finished[here] = true;
return;
}
set<int> nSet = it->second;
for (int v : nSet)
{
if (_discoveredOrder[v] == -1)
{
_parent[v] = here;
DFS(v);
continue;
}
// 순방향
if (_discoveredOrder[here] < _discoveredOrder[v])
continue;
// 순방향도 아니고, v점에서의 dfs가 끝나지 않았으면
if (_finished[v] == false)
{
printf("%s -> %s\n", _idToName[here], _idToName[v]);
int now = here;
while (true)
{
printf("%s -> %s\n", _idToName[_parent[now]], _idToName[now]);
now = _parent[now];
if (now == v) break;
}
CRASH("DEAD_LOCK_DETECTED");
}
}
_finished[here] = true;
}