Lock-Free란 Mutex나 SpinLock 등 전통적인 동기화 기법을 사용하지 않고, 공유 데이터에 대한 안전한 접근을 보장하는 알고리즘입니다.
CAS(Compare And Swap)는 다음 3가지 값을 기반으로 작동합니다:
예상 값과 대상 값이 같을 때만 대상 값을 새로운 값으로 바꿉니다.
이 연산은 원자적으로 수행되며, 실패하면 다시 시도(loop)하는 방식으로 Lock-Free를 구현합니다.
[Head, Tail] → [Dummy Node] → (처음에는 데이터 없음)
Next에 새 노드를 연결한다.Next를 복사한다.template<typename T>
class LockFreeQueueSPSC
{
private:
struct Node {
std::shared_ptr<T> Data;
Node* Next;
Node() : Next(nullptr) {}
};
std::atomic<Node*> Head;
std::atomic<Node*> Tail;
Node* PopHead() {
Node* const OldHead = Head.load();
if (OldHead == Tail.load()) return nullptr;
Head.store(OldHead->Next);
return OldHead;
}
public:
LockFreeQueueSPSC() : Head(new Node), Tail(Head.load()) {}
~LockFreeQueueSPSC() {
while (Node* const OldHead = Head.load()) {
Head = OldHead->Next;
delete OldHead;
}
}
std::shared_ptr<T> Pop() {
Node* OldHead = PopHead();
if (!OldHead) return std::shared_ptr<T>();
std::shared_ptr<T> const Res(OldHead->Data);
delete OldHead;
return Res;
}
void Push(const T& NewValue) {
std::shared_ptr<T> NewData(std::make_shared<T>(NewValue));
Node* p = new Node;
Node* const OldTail = Tail.load();
OldTail->Data.swap(NewData);
OldTail->Next = p;
Tail.store(p);
}
};
std::atomic<Node*>로 정의.Push, Pop에서 경쟁 조건(race condition)이 발생함.Push()를 호출하면 OldTail->Data.swap() 부분에서 동일한 Tail에 값을 삽입하게 됨.OldHead를 두 번 해제하려는 상황이 발생할 수 있음. → 미정의 동작 발생멀티 스레드 지원을 위해 다음과 같은 방식이 필요함:
Data는 atomic<T*>로 관리 std::shared_ptr은 lock-free가 아니므로 → unique_ptr<T>로 메모리를 관리 compare_exchange_strong로 CAS 연산 수행unique_ptr<T> NewData(new T(NewValue));
CountedNodePtr NewNext;
NewNext.Ptr = new Node;
NewNext.ExternalCount = 1;
for (;;)
{
Node* const OldTail = Tail.load();
T* OldData = nullptr;
if (OldTail->Data.compare_exchange_strong(OldData, NewData.get())) {
OldTail->Next = NewNext;
Tail.store(NewNext.Ptr);
NewData.release(); // ownership 넘김
break;
}
}
❗ 문제:
OldTail이 댕글링 포인터일 수 있어 위험함
Tail, Next 등 외부 포인터에서 참조된 횟수struct NodeCounter {
unsigned InternalCount : 30;
unsigned ExternalCounters : 2;
};
ExternalCounters는 최대 2개만 필요하므로 2비트면 충분함struct CountedNodePtr {
int ExternalCount;
Node* Ptr;
};
atomic<CountedNodePtr>로 Head와 Tail 선언void Push(const T& NewValue)
{
unique_ptr<T> NewData(new T(NewValue));
CountedNodePtr NewNext;
NewNext.Ptr = new Node;
NewNext.ExternalCount = 1;
CountedNodePtr OldTail = Tail.load();
for (;;)
{
IncreaseExternalCount(Tail, OldTail);
T* OldData = nullptr;
// (6) 데이터 삽입 시도
if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
{
// (7) 다음 포인터 연결 시도
CountedNodePtr OldNext = {};
if (!OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
{
// (8) 다른 쓰레드가 먼저 연결했을 경우
delete NewNext.Ptr;
NewNext = OldNext;
}
// (9) Tail 업데이트
SetNewTail(OldTail, NewNext);
NewData.release(); // 소유권 포기
break;
}
else
{
// (10) 데이터는 이미 들어감. 다음 노드 연결 도와주기
CountedNodePtr OldNext = {};
if (OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
{
// (11) 내가 연결 완료 → 새 노드 하나 더 생성
OldNext = NewNext;
NewNext.Ptr = new Node;
}
// (12) Tail 업데이트 도와주기
SetNewTail(OldTail, OldNext);
}
}
}
if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
OldTail이 아직 데이터가 없을 경우, 새로운 데이터를 집어넣음.if (!OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
delete.SetNewTail(OldTail, NewNext);
Tail 값을 업데이트 (다음 파트에서 설명)SetNewTail()을 통해 Tail을 도와서 업데이트.CountedNodePtr: Node* + ExternalCount👉 이 방식은 ABA 문제를 근본적으로 해결
Next 포인터를 연결하기 전에 스케줄러에 의해 멈춤if (OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
{
// 연결 성공했으면 내가 연결한 것이 아님 → 노드 새로 생성
OldNext = NewNext;
NewNext.Ptr = new Node;
}
Next 연결void SetNewTail(CountedNodePtr& OldTail, CountedNodePtr const& NewTail)
{
Node* const CurrentTailPtr = OldTail.Ptr;
while (!Tail.compare_exchange_weak(OldTail, NewTail)
&& OldTail.Ptr == CurrentTailPtr)
{
// retry if Tail was updated by other thread
}
if (OldTail.Ptr == CurrentTailPtr)
{
FreeExternalCounter(OldTail);
}
else
{
CurrentTailPtr->ReleaseRef();
}
}
Pop() 함수 전체 흐름std::unique_ptr<T> Pop()
{
CountedNodePtr OldHead = Head.load(std::memory_order_relaxed);
for (;;)
{
IncreaseExternalCount(Head, OldHead);
Node* const Ptr = OldHead.Ptr;
// (1) 큐가 비었는지 체크
if (Ptr == Tail.load().Ptr)
{
Ptr->ReleaseRef();
return std::unique_ptr<T>();
}
// (2) 다음 노드 정보 가져오기
CountedNodePtr Next = Ptr->Next.load();
// (3) Head를 다음 노드로 옮기기
if (Head.compare_exchange_strong(OldHead, Next))
{
// (4) 데이터 꺼내기
T* const Res = Ptr->Data.exchange(nullptr);
// (5) 외부 참조 카운터 해제
FreeExternalCounter(OldHead);
// (6) 결과 반환
return std::unique_ptr<T>(Res);
}
// (7) 실패 시 내부 참조 감소
Ptr->ReleaseRef();
}
}
Pop() 단계별 해설if (Ptr == Tail.load().Ptr)
nullptr 반환CountedNodePtr Next = Ptr->Next.load();
if (Head.compare_exchange_strong(OldHead, Next))
T* const Res = Ptr->Data.exchange(nullptr);
nullptr로 설정FreeExternalCounter(OldHead);
OldHead의 참조 권한이 끝났으므로 외부 참조 → 내부 참조로 반영return std::unique_ptr<T>(Res);
std::unique_ptr<T>로 메모리 안전하게 반환Ptr->ReleaseRef();
ReleaseRef() 함수 구조void ReleaseRef()
{
NodeCounter OldCounter = Count.load(std::memory_order_relaxed);
NodeCounter NewCounter;
do
{
NewCounter = OldCounter;
--NewCounter.InternalCount;
} while (!Count.compare_exchange_strong(
OldCounter, NewCounter,
std::memory_order_acquire,
std::memory_order_relaxed));
if (NewCounter.InternalCount == 0 && NewCounter.ExternalCounters == 0)
delete this;
}
FreeExternalCounter() 분석static void FreeExternalCounter(CountedNodePtr& OldNodePtr)
{
Node* const Ptr = OldNodePtr.Ptr;
const int CountIncrease = OldNodePtr.ExternalCount - 2;
NodeCounter OldCounter = Ptr->Count.load(std::memory_order_relaxed);
NodeCounter NewCounter;
do
{
NewCounter = OldCounter;
--NewCounter.ExternalCounters;
NewCounter.InternalCount += CountIncrease;
} while (!Ptr->Count.compare_exchange_strong(
OldCounter, NewCounter,
std::memory_order_acquire,
std::memory_order_relaxed));
if (NewCounter.InternalCount == 0 && NewCounter.ExternalCounters == 0)
delete Ptr;
}
ExternalCount - 2 만큼 내부 참조에 반영