Lock-Free-Queue

Jaemyeong Lee·2025년 3월 11일

💡 Lock-Free란?

✅ 정의

Lock-FreeMutexSpinLock전통적인 동기화 기법을 사용하지 않고, 공유 데이터에 대한 안전한 접근을 보장하는 알고리즘입니다.

✅ 핵심 원리 - Compare And Swap (CAS)

CAS(Compare And Swap)는 다음 3가지 값을 기반으로 작동합니다:

  • 대상 값 (Target)
  • 예상 값 (Expected)
  • 새로운 값 (New Value)

예상 값과 대상 값이 같을 때만 대상 값을 새로운 값으로 바꿉니다.

이 연산은 원자적으로 수행되며, 실패하면 다시 시도(loop)하는 방식으로 Lock-Free를 구현합니다.


📦 Lock-Free Queue의 기본 설계

🔗 기본 구조

  • Head: 큐의 맨 앞을 가리키는 포인터
  • Tail: 큐의 맨 뒤를 가리키는 포인터
  • 큐 생성 시, Head와 Tail은 모두 더미 노드를 가리킵니다.
[Head, Tail] → [Dummy Node] → (처음에는 데이터 없음)

📥 Push(T elem) 동작 요약

  1. 새로운 노드를 할당한다.
  2. Tail이 가리키는 노드의 Next에 새 노드를 연결한다.
  3. Tail을 새 노드로 이동시킨다.

📤 Pop() 동작 요약

  1. Head가 가리키는 노드의 Next를 복사한다.
  2. Head를 Next로 이동시키고, 기존 노드를 제거한다.
  3. 데이터를 반환한다.

👨‍🔬 SPSC (Single Producer Single Consumer) Lock-Free Queue

template<typename T>
class LockFreeQueueSPSC
{
private:
    struct Node {
        std::shared_ptr<T> Data;
        Node* Next;
        Node() : Next(nullptr) {}
    };

    std::atomic<Node*> Head;
    std::atomic<Node*> Tail;

    Node* PopHead() {
        Node* const OldHead = Head.load();
        if (OldHead == Tail.load()) return nullptr;
        Head.store(OldHead->Next);
        return OldHead;
    }

public:
    LockFreeQueueSPSC() : Head(new Node), Tail(Head.load()) {}
    ~LockFreeQueueSPSC() {
        while (Node* const OldHead = Head.load()) {
            Head = OldHead->Next;
            delete OldHead;
        }
    }

    std::shared_ptr<T> Pop() {
        Node* OldHead = PopHead();
        if (!OldHead) return std::shared_ptr<T>();
        std::shared_ptr<T> const Res(OldHead->Data);
        delete OldHead;
        return Res;
    }

    void Push(const T& NewValue) {
        std::shared_ptr<T> NewData(std::make_shared<T>(NewValue));
        Node* p = new Node;
        Node* const OldTail = Tail.load();
        OldTail->Data.swap(NewData);
        OldTail->Next = p;
        Tail.store(p);
    }
};

🔍 분석

  • Head, Tail 모두 std::atomic<Node*>로 정의.
  • 단일 생산자(Single Producer)와 단일 소비자(Single Consumer) 환경에서는 잘 작동.
  • 하지만 멀티스레드 환경에서는 Push, Pop에서 경쟁 조건(race condition)이 발생함.

⚠️ 문제점: 경쟁 조건

🛠 문제 예시 (멀티 쓰레드)

  • 두 개의 쓰레드가 동시에 Push()를 호출하면 OldTail->Data.swap() 부분에서 동일한 Tail에 값을 삽입하게 됨.
  • Pop 시에도 OldHead를 두 번 해제하려는 상황이 발생할 수 있음. → 미정의 동작 발생

🛡 CAS와 atomic으로 동기화 해결하기

멀티 스레드 지원을 위해 다음과 같은 방식이 필요함:

  1. Dataatomic<T*>로 관리
  2. std::shared_ptrlock-free가 아니므로 → unique_ptr<T>로 메모리를 관리
  3. compare_exchange_strong로 CAS 연산 수행

✅ 데이터 삽입 예시 (Push_Broken → 개선 방향 제시)

unique_ptr<T> NewData(new T(NewValue));
CountedNodePtr NewNext;
NewNext.Ptr = new Node;
NewNext.ExternalCount = 1;

for (;;)
{
    Node* const OldTail = Tail.load();
    T* OldData = nullptr;
    if (OldTail->Data.compare_exchange_strong(OldData, NewData.get())) {
        OldTail->Next = NewNext;
        Tail.store(NewNext.Ptr);
        NewData.release(); // ownership 넘김
        break;
    }
}

문제: OldTail이 댕글링 포인터일 수 있어 위험함


🧮 참조 카운팅과 구조 개선

🔷 구조 개선 포인트

  • 내부 참조(InternalCount): 해당 노드를 사용하는 쓰레드 수
  • 외부 참조(ExternalCounters): Tail, Next 등 외부 포인터에서 참조된 횟수

🧱 Node 구조 (비트 필드 사용)

struct NodeCounter {
    unsigned InternalCount : 30;
    unsigned ExternalCounters : 2;
};
  • 32비트 내에 두 카운터를 bit field로 저장
  • ExternalCounters는 최대 2개만 필요하므로 2비트면 충분함

🪢 CountedNodePtr 구조

struct CountedNodePtr {
    int ExternalCount;
    Node* Ptr;
};
  • 외부에서 노드를 참조할 때 사용하는 포인터 구조
  • atomic<CountedNodePtr>HeadTail 선언

🔁 Push() 상세 구현 및 분석

📌 기본 구조

void Push(const T& NewValue)
{
    unique_ptr<T> NewData(new T(NewValue));
    CountedNodePtr NewNext;
    NewNext.Ptr = new Node;
    NewNext.ExternalCount = 1;

    CountedNodePtr OldTail = Tail.load();

    for (;;)
    {
        IncreaseExternalCount(Tail, OldTail);

        T* OldData = nullptr;

        // (6) 데이터 삽입 시도
        if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
        {
            // (7) 다음 포인터 연결 시도
            CountedNodePtr OldNext = {};
            if (!OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
            {
                // (8) 다른 쓰레드가 먼저 연결했을 경우
                delete NewNext.Ptr;
                NewNext = OldNext;
            }

            // (9) Tail 업데이트
            SetNewTail(OldTail, NewNext);
            NewData.release(); // 소유권 포기
            break;
        }
        else
        {
            // (10) 데이터는 이미 들어감. 다음 노드 연결 도와주기
            CountedNodePtr OldNext = {};
            if (OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
            {
                // (11) 내가 연결 완료 → 새 노드 하나 더 생성
                OldNext = NewNext;
                NewNext.Ptr = new Node;
            }

            // (12) Tail 업데이트 도와주기
            SetNewTail(OldTail, OldNext);
        }
    }
}

📘 상세 동작 해설

🧩 단계 (6) - 데이터 삽입 시도

if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
  • OldTail이 아직 데이터가 없을 경우, 새로운 데이터를 집어넣음.
  • 성공하면 내가 이 노드를 채운 주인이 됨.

🧩 단계 (7) - 다음 노드 연결 시도

if (!OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
  • Next 포인터가 비어있는 상태라면 내가 생성한 노드를 연결.
  • 실패하면 이미 다른 쓰레드가 연결했음 → 내가 생성한 노드는 필요 없어졌으니 delete.

🧩 단계 (9) - Tail 업데이트

SetNewTail(OldTail, NewNext);
  • 실제 Tail 값을 업데이트 (다음 파트에서 설명)

🧩 단계 (10~12) - 다른 쓰레드가 먼저 데이터 삽입한 경우

  • 이 경우, 내가 Push()를 실행하는 중간에 다른 쓰레드가 이미 데이터 삽입을 해버린 것.
  • 내가 생성한 노드로 Next를 연결하려 시도하고,
  • 연결 성공 시 새로운 노드를 하나 더 만들어 내 차례에 대비함.
  • 이후 SetNewTail()을 통해 Tail도와서 업데이트.

💥 ABA 문제 방지 전략

❗ ABA란?

  • A → B → A로 값이 변경되었을 때, CAS는 변화가 없다고 착각함
  • 실제로는 중간에 변경이 있었는데 이를 감지하지 못함

✅ 해결 전략

  1. 참조 카운터 사용
    • External/Internal Count 로 참조 상태를 추적
  2. 포인터와 카운트를 묶은 구조
    • CountedNodePtr: Node* + ExternalCount
  3. CAS로 포인터 + Count 동시 비교
    • 포인터가 같아도 Count가 다르면 "다른 노드"로 판단

👉 이 방식은 ABA 문제를 근본적으로 해결


🧵 쓰레드 간 협조 (Helping Push)

💬 예시 상황

  1. Thread A가 데이터 삽입에는 성공했지만
  2. Next 포인터를 연결하기 전에 스케줄러에 의해 멈춤
  3. Thread B가 Push()를 실행하면 다른 쓰레드를 도와서 연결함

🤝 Helping 전략

if (OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
{
    // 연결 성공했으면 내가 연결한 것이 아님 → 노드 새로 생성
    OldNext = NewNext;
    NewNext.Ptr = new Node;
}
  • Push 실패한 쓰레드를 대신해서 Next 연결
  • 이걸 도와주는 Push(Helping Push) 라고 부름

🧷 SetNewTail() 함수 상세 분석

void SetNewTail(CountedNodePtr& OldTail, CountedNodePtr const& NewTail)
{
    Node* const CurrentTailPtr = OldTail.Ptr;

    while (!Tail.compare_exchange_weak(OldTail, NewTail)
        && OldTail.Ptr == CurrentTailPtr)
    {
        // retry if Tail was updated by other thread
    }

    if (OldTail.Ptr == CurrentTailPtr)
    {
        FreeExternalCounter(OldTail);
    }
    else
    {
        CurrentTailPtr->ReleaseRef();
    }
}

🔄 Pop() 함수 전체 흐름

std::unique_ptr<T> Pop()
{
	CountedNodePtr OldHead = Head.load(std::memory_order_relaxed);

	for (;;)
	{
		IncreaseExternalCount(Head, OldHead);

		Node* const Ptr = OldHead.Ptr;

		// (1) 큐가 비었는지 체크
		if (Ptr == Tail.load().Ptr)
		{
			Ptr->ReleaseRef();
			return std::unique_ptr<T>();
		}

		// (2) 다음 노드 정보 가져오기
		CountedNodePtr Next = Ptr->Next.load();

		// (3) Head를 다음 노드로 옮기기
		if (Head.compare_exchange_strong(OldHead, Next))
		{
			// (4) 데이터 꺼내기
			T* const Res = Ptr->Data.exchange(nullptr);

			// (5) 외부 참조 카운터 해제
			FreeExternalCounter(OldHead);

			// (6) 결과 반환
			return std::unique_ptr<T>(Res);
		}

		// (7) 실패 시 내부 참조 감소
		Ptr->ReleaseRef();
	}
}

🧩 Pop() 단계별 해설

🔹 (1) 비어 있는 큐 판단

if (Ptr == Tail.load().Ptr)
  • Head와 Tail이 같으면 비어있음
  • 즉, 데이터를 Pop할 수 없음 → nullptr 반환

🔹 (2) 다음 노드 확인

CountedNodePtr Next = Ptr->Next.load();
  • 현재 Head 노드의 다음 노드 정보
  • Head를 이 다음 노드로 이동시키려 함

🔹 (3) Head 이동 시도 (CAS)

if (Head.compare_exchange_strong(OldHead, Next))
  • 성공 시 → 나 혼자 Head를 바꾼 유일한 쓰레드
  • 실패 시 → 다른 쓰레드가 먼저 바꿨음 → 다시 시도

🔹 (4) 데이터 추출

T* const Res = Ptr->Data.exchange(nullptr);
  • Pop된 노드의 데이터 추출
  • 이후 해당 노드는 사용되지 않으므로 nullptr로 설정

🔹 (5) 외부 참조 카운터 정리

FreeExternalCounter(OldHead);
  • OldHead의 참조 권한이 끝났으므로 외부 참조 → 내부 참조로 반영
  • 내부 참조 + 외부 참조 모두 0이면 노드 삭제

🔹 (6) 결과 반환

return std::unique_ptr<T>(Res);
  • Pop 성공 시 std::unique_ptr<T>로 메모리 안전하게 반환

🧩 실패 시 내부 참조 해제

Ptr->ReleaseRef();
  • CAS 실패 → 다른 쓰레드가 Head를 바꿈
  • 내가 참조하던 노드는 더 이상 사용하지 않으므로 내부 참조 해제

🧪 ReleaseRef() 함수 구조

void ReleaseRef()
{
	NodeCounter OldCounter = Count.load(std::memory_order_relaxed);
	NodeCounter NewCounter;

	do
	{
		NewCounter = OldCounter;
		--NewCounter.InternalCount;
	} while (!Count.compare_exchange_strong(
		OldCounter, NewCounter,
		std::memory_order_acquire,
		std::memory_order_relaxed));

	if (NewCounter.InternalCount == 0 && NewCounter.ExternalCounters == 0)
		delete this;
}

✅ 핵심 역할

  • 내부 참조를 1 감소
  • 내부 + 외부 카운터 모두 0이면 노드를 안전하게 삭제

🔐 FreeExternalCounter() 분석

static void FreeExternalCounter(CountedNodePtr& OldNodePtr)
{
	Node* const Ptr = OldNodePtr.Ptr;
	const int CountIncrease = OldNodePtr.ExternalCount - 2;

	NodeCounter OldCounter = Ptr->Count.load(std::memory_order_relaxed);
	NodeCounter NewCounter;

	do
	{
		NewCounter = OldCounter;
		--NewCounter.ExternalCounters;
		NewCounter.InternalCount += CountIncrease;
	} while (!Ptr->Count.compare_exchange_strong(
		OldCounter, NewCounter,
		std::memory_order_acquire,
		std::memory_order_relaxed));

	if (NewCounter.InternalCount == 0 && NewCounter.ExternalCounters == 0)
		delete Ptr;
}

✅ 핵심 역할

  • 외부 참조 카운터 제거
  • ExternalCount - 2 만큼 내부 참조에 반영
  • 참조 수 모두 0 → 노드 삭제

profile
李家네_공부방

0개의 댓글