C++ Server - 멀티스레드 프로그래밍 (4)

창고지기·2025년 8월 25일
0

Cpp Server

목록 보기
4/6
post-thumbnail

RW Lock (Readers-Writer Lock)

개념

이름에서 유추를 해보자.
readers, writer 독자는 여러명인데 쓰는 사람은1명이다.
멀티스레드에서 읽는건 여러명인데 쓰는건 1명이라는 의미는 값의 변경이 거의 없다는 뜻이다. 대부분이 값을 읽어오기만 하고 아주 가끔 값을 수정한다.

기존 방식처럼 모든 행동에 lock을 건다면 이 상황에서 read에 lock을 거는건 불필요하다. 진짜 값을 수정할 때만 lock을 걸고 단순히 읽는 상황에서는 lock 없이 여러 쓰레드가 사용하게 한다. 이것이 RW Lock이다.

사용

  • Read를 잡은 상태에서 Write는 불가능.
  • W -> W, W -> R는 가능
  • bit mask를 통해서 현재 write lock을 잡을사람, read lock을 잡은 스레드의 개수를 하나의 변수로 관리
//lock.h
#pragma once
#include <atomic>
/*---------------------
	  RW SpinLock
---------------------*/

#defin OUT

#define CRASH(cause)						\
{											\
	uint32* crash = nullptr;				\
	__analysis_assume(crash != nullptr);	\
	*crash = 0xDEADBEEF;					\
}						

//상위 16: WriteFlag (lock을 획득한 스레드의 Id) (Exclusive Lock Owner ThreadId)
//하위 16: ReadFlag (공유해서 사용되는 횟수)

// Write -> read , Write -> write (0)
// read -> write (x)

class Lock
{
public:
	enum : unsigned int
	{
		// 획득하는데 기다리는 상한
		ACQUIRE_TIMEOUT_TICK = 10'000,
		MAX_SPIN_COUNT = 5000,
		WRITE_THREAD_MASK = 0xFFFF'0000,
		READ_COUNT_MASK = 0x0000'FFFF,
		EMPTY_FLAG = 0x0000'0000,
	};

public:
	void WriteLock();
	void WriteUnLock();
	void ReadLock();
	void ReadUnLock();

private:
	std::atomic<unsigned int> _lockFlag = EMPTY_FLAG;
	// write lock을 잡은 상태에서 write lock을 또 잡을 수 있도록
	// Count를 분리
	unsigned int _writeCount = 0;

};



/*--------------------
      Lock guard
---------------------*/

//RAII
class ReadLockGuard
{
public:
	ReadLockGuard(Lock& lock) : _lock(lock) { _lock.ReadLock(); }
	~ReadLockGuard() { _lock.ReadUnLock(); }

private:
	Lock& _lock;
};

class WriteLockGuard
{
public:
	WriteLockGuard(Lock& lock) : _lock(lock) { _lock.WriteLock(); }
	~WriteLockGuard() { _lock.WriteUnLock(); }

private:
	Lock& _lock;
};

//lock.cpp
#include <atomic>
#include <Windows.h>
#include <thread>
#include "Lock.h"
#include "CoreTLS.h"

void Lock::WriteLock()
{
	// 동일한 쓰레드가 write lock을 잡고 있으면 또 잡게 해줌
	const unsigned int lockThreadID = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
	if (LThreadId == lockThreadID)
	{
		_writeCount++;
		return;
	}

	// read, write 아무도 안 쓰고 있을때 소유권을 얻는다
	// EMPTY_FLAG 일때
	const __int64 beginTick = ::GetTickCount64();
	const unsigned int desired = ((LThreadId << 16) & WRITE_THREAD_MASK);
	while (true)
	{
		for (unsigned int spinCount = 0; spinCount < MAX_SPIN_COUNT; spinCount++)
		{
			unsigned int expected = EMPTY_FLAG;
			if (_lockFlag.compare_exchange_strong(OUT expected, desired))
			{
				_writeCount++;
				return;
			}
		}

		if (::GetTickCount64() - beginTick > ACQUIRE_TIMEOUT_TICK)
			CRASH("WRITE_LOCK_ACQ_TIMEOUT");

		std::this_thread::yield();
	}
}

void Lock::WriteUnLock()
{
	//ReadLock 다 풀기 전에는 WriteUnlock 불가능
	if ((_lockFlag.load() & READ_COUNT_MASK) != 0)
		CRASH("INVALID_WRITE_UNLOCK_ORDER");

	const int lockCount = --_writeCount;
	if (lockCount == 0)
		_lockFlag.store(EMPTY_FLAG);
}

void Lock::ReadLock()
{
	// 동일한 스레드가 lock 잡고 있으면 성공
	// 즉, 동일한 스레드가 write lock을 잡고 있는 상태
	const unsigned int lockThreadID = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
	if (LThreadId == lockThreadID)
	{
		_lockFlag.fetch_add(1);
		return;
	}

	// 아무도 lock을 안잡으면 공유 카운트를 올린다.
	while (true)
	{
		const __int64 beginTick = ::GetTickCount64();
		for (unsigned int spinCount = 0; spinCount < MAX_SPIN_COUNT; spinCount++)
		{
			unsigned int expected = _lockFlag.load() & READ_COUNT_MASK;
			if (_lockFlag.compare_exchange_strong(OUT expected, expected + 1))
				return;
		}

		if (::GetTickCount64() - beginTick > ACQUIRE_TIMEOUT_TICK)
			CRASH("RAED_LOCK_ACQ_TIMEOUT");

		std::this_thread::yield();
	}


}

void Lock::ReadUnLock()
{
	if ((_lockFlag.fetch_sub(1) & READ_COUNT_MASK) == 0)
		CRASH("INVALID_READ_MULTIPLE_UNLOCK");
}

// GameServer.cpp
#include "pch.h"
#include <iostream>
#include "CorePch.h"

#include <thread>
#include <atomic>
#include <chrono>
#include "CoreGlobal.h"
#include "ThreadManager.h"

thread_local unsigned int LThreadId = 0;

class TestLock
{
	Lock _locks[1];

public:
	unsigned int TestRead()
	{
		ReadLockGuard readLockGuard_0(_locks[0]);
		
		if (_queue.empty())
			return -1;

		return _queue.front();
	}

	void TestPush()
	{
		WriteLockGuard writeLockGuard_0(_locks[0]);
		_queue.push(rand() % 100);
	}

	void Testpop()
	{
		WriteLockGuard writeLockGuard_0(_locks[0]);
		
		if (_queue.empty() == false)
			_queue.pop();
	}

private:
	queue<unsigned int> _queue;
};

TestLock testLock;

void ThreadWrite(unsigned int threadId)
{
	LThreadId = threadId;
	while (true)
	{
		testLock.TestPush();
		this_thread::sleep_for(1ms);
		testLock.Testpop();
	}
}

void ThreadRead(unsigned int threadId)
{
	LThreadId = threadId;
	while (true)
	{
		unsigned int value = testLock.TestRead();
		cout << value << endl;
		this_thread::sleep_for(1ms);
	}
}

int main()
{
	/*for (unsigned int i = 0; i < 2; i++)
	{
		GThreadManager->Launch(ThreadWrite);
	}

	for (unsigned int i = 0; i < 5; i++)
	{
		GThreadManager->Launch(ThreadRead);
	}
	
	GThreadManager->Join();*/

	vector<thread> threads;
	Atomic<unsigned int> SThreadId = 1;
	
	for (unsigned int i = 0; i < 2; i++)
	{
		unsigned int next = SThreadId.fetch_add(1);
		threads.push_back(thread(ThreadRead, next));
	}
		

	for (unsigned int i = 0; i < 5; i++)
	{
		unsigned int next = SThreadId.fetch_add(1);
		threads.push_back(thread(ThreadWrite, next));
	}
		


	for (auto& t : threads)
		t.join();
}

DeadLock 탐지

개념

DeadLock의 가장 대표적인 경우를 생각해보자. A, B 행동이 있다. A의 내부에서는 A의 Lock을 잡고 B에서 lock을 잡는 함수를 호출한다. 그 반대도 동일하다면
A->B 의 lock 순서와 B->A lock 순서가 생긴다. 이럴 경우 dead lock 발생하기 매우 쉽다.

해결법

사이클 판별

락을 그래프의 node라고 생각해보자
락은 잡는 순서대로 번호가 주어진다고 했을 때
0번 락을 잡은 상태에서 새로운 락을 잡으면 1번을 부여하고 0->1 간선을 만든다. 즉 먼저 잡은 쪽에서 나중에 잡은 쪽으로 향하는 방향 그래프가 만들어 진다.
0->1->2 같은 경우는 문제가 없다. 그런데 여기서 1번 lock을 잡고 0번 lock을 잡으려고 하면 문제가 생간다. 아까 A->B B->A 상황과 같다.
따라서 그래프에서 사이클의 여부가 DeadLock의 여부이다.

흐름

  • 스레드는 락을 잡을때 현재 잡고있는 락이 있는지 확인
    • 있다면 같은 락을 연속으로 잡은 것인지 확인
      • 아니라면 최근에 잡았던 락을 잡은 상태로 현재 잡고자 하는 락을 잡았는지 확인
        • 없다면 기록에 추가하고 사이클 판별
  • 해당 락을 스택에 저장
  • 사이클 확인은 여태 잡았던 모든 락들의 기록에 대해서 DFS
    • 이미 방문한 락이면 종료
    • 현재 락의 방문 순서 업데이트
    • 현재 락을 잡은 적이 있는지 확인 (현재 락을 제일 먼저 잡은적이있는지)
      • 없다면 완료 처리 후 종료
    • 현재 락의 인접 노드들 순회
    • 인접노드가 방문되지 않은 경우
      • 인접노드의 부모노드를 현재 노드로 하고, DFS
      • 다음 노드로 넘어감
    • 인접 노드가 이미 방문된 경우
      • 순방향인지 (현재 노드가 인접노드보다 먼저 발견 되었는지)
        • 맞다면 다음 노드로
          * 아니라면 인접노드를 시작으로하는 DFS 종료여부 확인
          • 아니라면 사이클
    • 현재 노드를 시작으로 하는 DFS완료
//DeadLockProfiler.h
#pragma once
#include <stack>
#include <map>
#include <vector>

/*---------------------
	DeadLockProfiler
---------------------*/

class DeadLockProfiler
{
public:
	void PushLock(const char* name);
	void PopLock(const char* name);
	void CheckCycle();

private:
	void DFS(int here);

private :
	Mutex _lock;

	// 그래프를 위한 변수
	unordered_map<const char*, int>	_nameToId;
	unordered_map<int, const char*>	_idToName;
	// TLS에 들어있음
	//stack<int>						_lockStack;
	map<nt, set<int>>		_lockHistory;

private:
	vector<int>						_discoveredOrder; // 노드가 발견된 순서를 기록
	int								_discoveredCount = 0; // 노드가 발견된 순서
	vector<bool>					_finished;
	vector<int>						_parent;
};

//DeadLockProfiler.cpp
#include "pch.h"
#include "DeadLockProfiler.h"

/*---------------------
	DeadLockProfiler
---------------------*/

void DeadLockProfiler::PushLock(const char* name)
{
	LockGuard guard(_lock);

	// 아이디를 찾거나 발급
	int lockId = 0;

	auto it = _nameToId.find(name);
	if (it == _nameToId.end())
	{
		lockId = static_cast<int>(_nameToId.size());
		_nameToId[name] = lockId;
		_idToName[lockId] = name;
	}
	else
	{
		lockId = it->second;
	}

	// 잡고 있는 락이 있었다면
	if (LLockStack.empty() == false)
	{
		// 기존에 발견되지 않은 케이스는 데드락(사이클) 판별
		const int prevId = LLockStack.top();
		// 같은 스레드가 연속해서 잡은게 아니면
		if (lockId != prevId)
		{
			// 가장 최근에 잡은 lock의 history를 살펴봄
			set<int>& history = _lockHistory[prevId];
			// 기록에 없으면 기록에 삽입
			if (history.find(lockId) == history.end())
			{
				history.insert(lockId);
				CheckCycle();
			}
		}
	}

	LLockStack.push(lockId);
}

void DeadLockProfiler::PopLock(const char* name)
{
	LockGuard guard(_lock);

	if (LLockStack.empty())
		CRASH("MULTIPLE_UNLOCK");

	int id = _nameToId[name];
	if (LLockStack.top() != id)
		CRASH("INVALID_UNLOCK");

	LLockStack.pop();
}

void DeadLockProfiler::CheckCycle()
{
	//DFS 초기화
	const int lockCount = static_cast<int>(_nameToId.size());
	_discoveredOrder = vector<int>(lockCount, -1);
	_discoveredCount = 0;
	_finished = vector<bool>(lockCount, false);
	_parent = vector<int>(lockCount, -1);

	for (int lockId = 0; lockId < lockCount; lockId++)
	{
		DFS(lockId);
	}

	_discoveredOrder.clear();
	_finished.clear();
	_parent.clear();
}

void DeadLockProfiler::DFS(int here)
{
	// 이미 방문이 된 상태
	if (_discoveredOrder[here] != -1)
		return;

	_discoveredOrder[here] = _discoveredCount++;

	auto it = _lockHistory.find(here);
	if (it == _lockHistory.end())
	{
		_finished[here] = true;
		return;
	}

	set<int> nSet = it->second;
	for (int v : nSet)
	{
		if (_discoveredOrder[v] == -1)
		{
			_parent[v] = here;
			DFS(v);
			continue;
		}

		// 순방향
		if (_discoveredOrder[here] < _discoveredOrder[v])
			continue;

		// 순방향도 아니고, v점에서의 dfs가 끝나지 않았으면
		if (_finished[v] == false)
		{
			printf("%s -> %s\n", _idToName[here], _idToName[v]);

			int now = here;

			while (true)
			{
				printf("%s -> %s\n", _idToName[_parent[now]], _idToName[now]);
				now = _parent[now];
				if (now == v) break;
			}
			CRASH("DEAD_LOCK_DETECTED");
		}

	}

	_finished[here] = true;
}
profile
일단 창고에 넣어놓으면 언젠가는 쓰겠지

0개의 댓글