C++ Server - 멀티스레드 프로그래밍 (3)

창고지기·2025년 8월 23일
0

Cpp Server

목록 보기
3/6
post-thumbnail

lock_based Stack, Queue

Stack

#pragma once
#include <stack>
#include <mutex>

template<typename T>
class LockStack
{
public:
	LockStack() { }

	LockStack(const LockStack&) = delete;
	LockStack& operator=(const LockStack&) = delete;

	void Push(T value)
	{
		lock_guard<mutex> lock(_mutex);
		_stack.push(std::move(value));
		_condVar.notify_one();
	}

	bool TryPop(T& value)
	{
		lock_guard<mutex> lock(_mutex);
		if (_stack.empty())
			return false;

		value = std::move(_stack.top());
		_stack.pop();
		return true;
	}

	void WaitPop(T& value)
	{
		unique_lock<mutex> lock(_mutex);
		_condVar.wait(lock, [this]() {return _stack.empty() == false; });
		value = std::move(_stack.top());
		_stack.pop();
	}

	bool Empty()
	{
		lock_guard<mutex> lock(_mutex);
		return _stack.empty();
	}

private:
	stack<T> _stack;
	mutex _mutex;
	condition_variable _condVar;
};

Queue

#pragma once
#include <queue>
#include <mutex>

template<typename T>
class LockQueue
{
public:
	LockQueue() { }

	LockQueue(const LockQueue&) = delete;
	LockQueue& operator=(const LockQueue&) = delete;

	void Push(T value)
	{
		lock_guard<mutex> lock(_mutex);
		_queue.push(std::move(value));
		_condVar.notify_one();
	}

	bool TryPop(T& value)
	{
		lock_guard<mutex> lock(_mutex);
		if (_queue.empty())
			return false;

		value = std::move(_queue.front());
		_queue.pop();
		return true;
	}

	void WaitPop(T& value)
	{
		unique_lock<mutex> lock(_mutex);
		_condVar.wait(lock, [this]() {return _queue.empty() == false; });

		value = std::move(_queue.front());
		_queue.pop();
	}

	bool Empty()
	{
		lock_guard<mutex> lock(_mutex);
		return _queue.empty();
	}

private:
	queue<T> _queue;
	mutex _mutex;
	condition_variable _condVar;
};

lock free Stack, Queue

이 항목은 그냥 한 번 보기만 하자

Stack

/*----------------
	PopCnt ver	
-----------------*/
template<typename T>
class LockFreeStack
{
	struct Node
	{
		Node(const T& value) : data(value), next(nullptr)
		{

		}

		T data;
		Node* next;
	};

public:

	void Push(const T& value)
	{
		//1. 새로운 노드 만들기
		Node* node = new Node(value);
		//2. 새로운 노드의 next는 원래 head가 가리키던 노드
		node->next = _head;
		//3. head를 노드로 바꾸기
		//_head = node;

		// x86/x64 아키텍쳐에서는 weak도 사실상 strong 처럼 동작
		// ARM CPU에서 spurious failure 발생할 수 있어서 만들었다고 함.
		while (_head.compare_exchange_weak(node->next, node) == false)
		{

		}
	}

	bool TryPop(T& value)
	{
		++_popCount;

		//1. head 읽기
		Node* oldHead = _head;
		//2. head 의 next 읽기
		//3. head의 next를 head로 만든다
		while (oldHead && _head.compare_exchange_weak(oldHead, oldHead->next) == false)
		{

		}

		if (oldHead == nullptr)
		{
			--_popCount;
			return false;
		}


		//4. data 추출및 반환
		value = oldHead->data;
		//5. 추출한 노드 삭제
		TryDelete(oldHead);
		return true;
	}

	void TryDelete(Node* oldHead)
	{
		// 나만  pop 하고 있는지 확인
		if (_popCount == 1)
		{
			// 혼자임
			// 삭제 예약된 데이터도 삭제시도
			Node* node = _pendingList.exchange(nullptr);

			if (--_popCount == 0)
			{
				// 중간에 끼어든 애가 없음
				// 삭제 진행
				DeleteNodes(node);
			}
			else if (node)
			{
				// 누가 중간에 끼어들었다
				// 취소!
				ChainPendingNodeList(node);

			}

			// 데이터 체크, 카운트 체크, 혼자면 삭제의 순서라서 가능한 로직
			delete oldHead;
		}
		else
		{
			// 누가 삭제중이니까 그냥 예약만 하자
			ChainpendingNode(oldHead);
			--_popCount;
		}
	}

	void ChainPendingNodeList(Node* first, Node* last)
	{
		last->next = _pendingList;

		while (_pendingList.compare_exchange_weak(last->next, first) == false)
		{

		}
	}

	void ChainPendingNodeList(Node* node)
	{
		Node* last = node;
		while (last->next)
			last = last->next;

		ChainPendingNodeList(node, last);
	}

	void ChainpendingNode(Node* node)
	{
		ChainPendingNodeList(node, node);
	}

	static void DeleteNodes(Node* node)
	{
		while (node)
		{
			Node* next = node->next;
			delete node;
			node = next;
		}
	}

private:
	// [ ][ ][ ][ ][ ][ ][ ][ ]
	// ↑
	// head
	atomic<Node*> _head;

	atomic<uint32> _popCount = 0; //Pop을 실행중인 쓰레드 개수
	atomic<Node*> _pendingList; //삭제 되어야 할 노드들의 처음
};
/*-------------------
	shared_ptr ver	
--------------------*/
template<typename T>
class LockFreeStack
{
	// bool value = atomic_is_lock_free(&ptr); 결과를 확인해 보면 대부분의 환경에서 락프리로 동작하지 않음
	// 락프리 기반으로 만들었는데 락프리로 동작을 안함...
	struct Node
	{
		Node(const T& value) : data(make_shared<T>(value)), next(nullptr)
		{

		}

		shared_ptr<T> data;
		shared_ptr< Node> next;
	};

public:

	void Push(const T& value)
	{
		shared_ptr<Node> node = make_shared<Node>(value);
		node->next = std::atomic_load(&_head);

		while (std::atomic_compare_exchange_weak(&_head, &node->next, node) == false)
		{

		}
	}

	shared_ptr<T> TryPop()
	{
		shared_ptr<Node> oldHead = std::atomic_load(&_head);

		while (oldHead && std::atomic_compare_exchange_weak(&_head, &oldHead, oldHead->next) == false)
		{

		}

		if (oldHead == nullptr)
			return shared_ptr<T>();

		return oldHead->data;
	}

private:
	// [ ][ ][ ][ ][ ][ ][ ][ ]
	// ↑
	// head
	shared_ptr<Node> _head;

};
/*---------------------------------
	 split reference counts ver	
----------------------------------*/
template<typename T>
class LockFreeStack
{
	struct Node;

	struct CountedNodePtr
	{
		// 참조 횟수
		int32 externalCount = 0;
		Node* ptr = nullptr;
	};

	struct Node
	{
		Node(const T& value) : data(make_shared<T>(value))
		{

		}

		shared_ptr<T> data;
		// 참조 횟수
		atomic<int32> internalCount=0;
		CountedNodePtr next;
	};

public:

	void Push(const T& value)
	{
		CountedNodePtr node;
		node.ptr = new Node(value);
		node.externalCount = 1;

		node.ptr->next = _head;
		while (_head.compare_exchange_weak(node.ptr->next, node) == false)
		{

		}
	}

	shared_ptr<T> TryPop()
	{
		CountedNodePtr oldHead = _head;
		while (true)
		{
			// 참조하기 위한 첫 단계
			// 은행에서 번호표 뽑는것과 비슷
			IncreaseHeadCount(oldHead);

			// externalCount > 1
			Node* ptr = oldHead.ptr;

			if (ptr == nullptr)
				return shared_ptr<T>();

			// 번호표 뽑은 사람중에 누굴 할지
			// 먼저 head = ptr->next 한 사람이 승자
			// 누군가 중간에 끼어 들었으면 oldHead의 extern cnt 값이 바뀌었을 것
			auto next = ptr->next;
			if (_head.compare_exchange_strong(oldHead, ptr->next))
			{
				shared_ptr<T> res;
				res.swap(ptr->data);

				// 나 말고 누가 있나?
				const int32 countIncrese = oldHead.externalCount - 2;
				if (ptr->internalCount.fetch_add(countIncrese) == -countIncrese)
					delete ptr;

				return res;
			}
			else if (ptr->internalCount.fetch_sub(1) == 1)
			{
				// 번호표는 뽑았는데 권리가 없는 경우
				// 뒷 수습은 내가한다 <- ???
				delete ptr;
			}

		}
	}

private:
	// 외부카운트의 값을 1 늘린다.
	// 먼저 externalCount의 값을 1증가 시키는 사람이 승자
	void IncreaseHeadCount(CountedNodePtr& oldCounter)
	{
		while (true)
		{
			CountedNodePtr newCounter = oldCounter;
			newCounter.externalCount++;

			if (_head.compare_exchange_strong(oldCounter, newCounter))
			{
				oldCounter.externalCount = newCounter.externalCount;
				break;
			}
		}
	}

private:
	// [ ][ ][ ][ ][ ][ ][ ][ ]
	// ↑
	// head
	atomic<CountedNodePtr> _head;

};

Queue

profile
일단 창고에 넣어놓으면 언젠가는 쓰겠지

0개의 댓글