ConcurrentHashMap

고동현·2024년 10월 2일
0

"더 깊이, 더 넓게"

목록 보기
7/12

ConcurrentHashMap은 Java에서 동시성을 지원하는 멀티스레드 환경에서 안전하게 사용할 수 있는 Map의 종류 입니다.

동시성이 뭔지, 또 Java는 ConcurrentHashMap을 어떻게 구현하고 있는지 배워봅시다.

우선, 동시성에 대해서 알아야 합니다.

동시성

싱글코어에서 멀티스레드를 동작시키기위해서, 마치 동시에 스레드들이 한꺼번에 실행되는것 처럼 보이지만 번갈아가면서 실행되는 성질을 동시성이라고 말합니다.

멀티코에서 멀티스레드를 이용하여 동시성을 만족시키면 실제 물리적 시간에 동시에 실행됩니다.

그런데 공유데이터에 접근을 할때(Critical Section에 접근)는 공유데이터에 Lock을 치고 접근을 하거나, 세마포어나 뮤텍스를 활용하게 됩니다.

세마포어란? 추상자료형으로

  • P: 공유데이터를 획득하는 과정
  • V: 공유데이터를 다쓰고 반납하는 과정
  • S: 자원의 갯수


만약 Critical Section에 들어갈 수 있는 프로세스를 1개로 mutex를 설정하면
P연산을 통해 0으로 만들고 Critical Section에 접근,
Critical Section에서 나올때 V연산을 통해서 1로 마들어서 증가 시키는 과정을 거치게 됩니다.

동기화란, 프로세스들 사이의 수행 시기를 맞추는 것을 의미하는데
크게 3가지를 일컫습니다.

  1. Mutual Exclusion:상호배재
    한 프로세스가 critical section에 있으면 다른 프로세스는 그 critical section에 들어가면 안됨
  2. Process
    아무도 critical section에 없으면 들어가고자하는 프로세스는 들여보내줘야함
  3. Bounded waiting: 유한대기
    critical Section에 들어가려고 너무 오래기다리면안된다. starvation이 없어야함

고로 이 동기화를 만족시키기 위해서 아까 위에서 말했던, Lock을 걸고 세마포어를 사용하는 것입니다.

ConcurrentHashMap의 구성

그렇다면 ConcurrentHashMap은 어떻게 동기화를 만족하고 있을까요?

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {

    public V get(Object key) {}

    public boolean containsKey(Object key) { }

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
}

위 코드는 ConcurrentHashMap의 일부 API입니다.
Synchronized 키워드가 get메서드에는 아에 없고, put 메서드 중간에 키워드가 존재합니다.

즉, ConcurrentHashMap에서는 읽기 작업에는 여러 스레드가 동시에 읽을 수 있지만,
쓰기 작업에는 특정 세그먼트 또는 버킷에 대한 Lock을 칩니다.

그림을 보면, DEFAULT_CAPACITY, DEFAULT_CONCURRENCY_LEVEL이 16으로 설정되어있습니다.

DEFAULT_CAPACITY=16은 ConcurrentHashMap의 기본 버킷 수를 의미합니다.

그렇다면, 여기서 버킷이란,
버킷은 해시 충돌을 줄이기 위해서 존재하며, 데이터는 해시값에 따라 각 버킷에 분배가 됩니다.

해시 테이블은 특정 크기만큼 가질 수 있기 때문에, 해시 테이블이 가득 차면 해시 충돌이 발생 할 수 있습니다.

이를 해결하기위해 데이터를 여러개의 버킷으로 나누어 저장하는 방식을 사용하여 충돌을 줄아게 됩니다.(분리연결법을 사용하는데 이 내용은 뒤에 나온다.)

예를 들어 키 apple이 있다면 hash함수를 적용해 123456를 만듭니다. 그리고 버킷의 수가 16이므로 해시값을 16으로 나누어 나머지값을 구해 버킷 인덱스를 찾습니다.

123456 % 16 = 8
즉 8번 버킷에다가 apple이라는 키와 그에 대한 value를 저장하게 됩니다.

이렇게 버킷을 나누면, 여러 스레드가 동시에 데이터를 다룰 때, 서로 다른 버킷에 데이터를 저장하므로써 락을 효율적으로 관리 할 수 있습니다.

각 버킷에 대해 락을 걸기 때문에, 버킷 간에 동시 접근이 가능해지고 성능이 개선됩니다.

DEFAULT_CONCURRENCY_LEVEL은 동시에 업데이트를 처리할 수 있는 스레드 수를 뜻합니다.

아까 말했듯이 ConcurrentHashMap은 Map 전체에 락을 걸지 않고, 버킷단위로 락을 걸기 때문에, 한 버킷에 접근하는 스레드가 있더라도, 다른 버킷에 접근하는 스레드는 별도의 락을 얻어 작업 할 수 있습니다.

버킷이 16개가있으므로 스레드 16개 최대 동시에 다른 버킷을 각각 접근할 수 있습니다.

그렇다면 ConcurrentHashMap은 어떻게 동기화를 할까요?

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // check first node without acquiring lock
                     && fh == hash
                     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                     && (fv = f.val) != null)
                return fv;
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

put메서드 입니다.
자세한것은 살펴보지 않아도 else부분에서 synchronized키워드를 사용하고, 새로운 노드로 교체하고 트리에 추가하는 것을 볼 수 있습니다.

synchronized는 모니터 락이라고 불리는 락을 사용하여 동작합니다.
자바의 모든 객체는 고유한 락을 가지고 있으며, synchronized 키워드를 통해 스레드가 해당 객체의 락을 획득 한 후에 작업을 수행합니다.
다른 스레드는 당연히 스레드가 락을 풀고, 락을 획득할 때까지 대기하게 됩니다.

락을 획득한 스레드는 동기화된 코드 블록, 메서드 내에서 안전하게 작업을 수행 할 수 있습니다.

고로, 여기 까지 정리해 보자면,
ConcurrentHashMap에는 기본적으로 16개의 버킷이 있으므로, 16개의 쓰레드가 동시에 접근이 가능하다.
데이터의 각 영역이 서로 영향을 주지 않는 작업(버킷이 다른 경우)에 대해서는 경쟁이 발생하지 않기 때문에 여러 쓰레드에서 빈번하게 접근하더라도 락 획득을 위한 대기 시간을 많이 줄일 수 있다.

읽을 때는 synchronized가 없으므로, 그냥 읽으면 되고,
put과 같이 write를 해야할때는 synchronized를 통해서 락을 걸고, 값을 바꾸거나 추가한다 라고 생각하면 됩니다.

JAVA 8버전 이후의 동기화

그런데 해당 코드를 자세히 보면

 /**
     * The default concurrency level for this table. Unused but
     * defined for compatibility with previous versions of this class.
     */
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

해석해보면
이 테이블의 기본 동시성 수준. 사용되지 않지만 이전 버전과의 호환성을 위해 정의됨.

Java 7 버전에서는 전체테이블을 여러개의 segment로 나누고, 하나마다 독립된 락을 소유합니다.
각 segment마다 서로 다른 데이터를 저장하게 됩니다.

하나의 스레드가 한 Segment에 락을 걸고 작업을 수행하는 동안, 다른 스레드는 다른 Segment에 접근할 수 있습니다.

이렇게 들으면? 어? 버킷이랑 Segment가 똑같은거 아닌가? 할 수 있습니다.

그림을 봅시다.

위와 같이 해시 테이블이 있고, 버킷이 있다고 칩시다.
버킷이 바로 key-value로 저장된 한쌍의 자료형이라고 보면 됩니다.
그러면 1000개의 버킷을 가진 해시맵을 10개의 세그먼트로 분할하면, 한 세그먼트가 100개의 버킷을 관리하게 됩니다.

여러 스레드가 동시에 접근하더라도, 서로 다른 세그먼트를 작업하면 충돌없이 안전하게 데이터를 처리 할 수 있습니다.

첫번째 스레드가 첫번째 세그먼트에 접근하여 작업을 수행하는동안, 동시에 두번째 스레드는 두번째 세그먼트에 접근하여 별도의 작업을 수행할 수 있습니다.

bucket에는 key가 같아도 value를 넣을 수 있습니다.
그림처럼 만약 "과일"이라는 키를 hash를 돌렸는데 00이나오고, "과일1"이라는 키를 hash를 돌렸는데 00이 나오면
bucket[00]d에 Key: 과일 -> value:apple과 Key:과일 1 -> vlaue:banana가 들어가게 됩니다다.
하나의 버킷에 여러개의 key-value가 들어갈 수 있습니다.
이걸 분리연결법이라고하는데 Hash테이블은 Self-Balancing Binary Search Tree 자료구조를 사용해 Chaining 방식을 구현하였습니다.

Chaining 예시
joinSmith와 Sandra Dee의 해시값이 152로 충돌이 난것을 확인 할 수 있습니다.
이때, LinkedList를 활용하여서 버킷하나에 여러개의 key-value쌍을 저장하게 됩니다.

public void put(K key, V value) {
        int index = hash(key);
        Node<K, V> newNode = new Node<>(key, value, null);
 
        if (table[index] == null) {
            table[index] = newNode;
        } else {
            Node<K, V> currentNode = table[index];
            while (currentNode.next != null) {
                if (currentNode.key.equals(key)) {
                    currentNode.value = value;
                    return;
                }
                currentNode = currentNode.next;
            }
 
            if (currentNode.key.equals(key)) {
                currentNode.value = value;
            } else {
                currentNode.next = newNode;
            }
        }
    }
 
    public V get(K key) {
        int index = hash(key);
        Node<K, V> currentNode = table[index];
 
        while (currentNode != null) {
            if (currentNode.key.equals(key)) {
                return currentNode.value;
            }
            currentNode = currentNode.next;
        }
 
        return null;
    }

예시 코드를 보면 put 메서드에서 table[index]가 null이 아니면 else구문을 실행시키고, 여기서 만약 키가 동일하면 while문을 돌면서 버킷의 마지막 노드를 찾은다음에, currentNode.value = value로 value값을 넣어주고, currentNode = currenNode.next로 연결을 해줍니다.

get메서드에서는 key를 주면, while문을 돌면서 선형 탐색을 하게 됩니다.

다시 돌아가서,
default가 16개의 버킷이므로, 당연히 16개가 넘어가면 버킷의 숫자가 늘어나는 것입니다.

그런데 왜 세그먼트당 이제 더이상 락을 걸지 않을까요?

전체 맵에 락을 거는대신 각 세그먼트에 락을 걸어서 동시성을 향상 시켰지만, 여전히 세그먼트당 락 경합이 발생할 수 있습니다.

Java 8부터는 이제 버킷당 하나의 락을 적용하여서 해당 문제를 해결했습니다.

즉, 큰 세그먼트에서 버킷을 하나로 size를 줄였다고 보면 됩니다다.

그런데 이제는 내부적으로 CAS를 이용한 락 없는 업데이트를 수행하게 됩니다.

putVal메서드를 확인볼까요?

// 이전 설명 코드
for (Node<K,V>[] tab = table;;) {
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
        
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
            break; // 락 없이 빈 버킷에 노드를 추가한 경우 반복을 종료합니다.
    }
    ...
    
}

for문에서 조건을 생략하여 무한 루프를 만듭니다.
이는 특정 조건을 만족할때까지 반복하는데 반복되는 조건은 break가 걸릴때까지입니다.

initTable()메서드로 초기화 하고,
else if문을 통해서 인덱스 i에 있는 버킷이 비어있는지 확인합니다. tabAt메서드를 통해 해당 위치의 값을 가져옵니다.

tabAt(tab, i)는 ConcurrentHashMap 내부에서 특정 인덱스 i에 있는 버킷(bucket)을 확인하는 메서드압니다.
여기서 tab은 ConcurrentHashMap의 내부 배열을 나타내며, i는 해당 배열의 특정 위치(인덱스)를 의미합니다.
tabAt(tab, i)의 결과가 null이면, 해당 위치에 아직 노드가 없다는 의미입니다. 즉, 해당 인덱스에 데이터가 삽입되지 않은 상태입니다.

내부 if문은 노드가 없어서 데이터가 없는경우 casTabAt을 통해서 원자적으로 노드를 추가합니다.
이게 바로 CAS 연산을 통한 노드를 추가하는것입니다.
casTabAt은 CAS연산을 사용하여 해당 위치가 여전히 null일때만 새로운 노드를 원자적으로 삽입하려고 시도합니다.

여전히 null일떄만? 원자적?
CAS에대해서 알아보자.

CAS는 원자적 연산으로 세가지 값을 사용하여서 연산을 수행한다.

  1. 메모리주소: 변경하고자 하는 데이터의 위치를 가르킴

  2. 기존 예상 값: 현재 메모리 주소에 저장된 값으로 예상되는 값이다.

  3. 새로운 값: 메모리 주소에 저장하고자 하는 새로운 값이다.

    CAS 연산은 먼저 특정 메모리 위치의 현재 값을 읽는다. 이 값을 기대값이라고 한다.
    읽어들인 현재값이 기대한 값과 동일한지 비교한다.
    만약 현재값이 기대값과 일치한다면, 메모리 위치의 값을 새로운 값으로 원자적으로 교체한다.
    만약 현재 값이 기대값과 일치하지 않으면, 다른 스레드가 그 사이에 값을 변경했음을 의미하므로 교체를 실패하고, 다시 시도하거나 다른 적절한 처리를 한다.

    원자적이란?
    원자적 연산이란 완전히 실행되거나 전혀 실행되지 않는 연산을 의미한다.
    즉, 연산이 시작되면 중간에 다른 연산이 개입할 수 없고, 연산 도중에 중단되지 않고 완료된다.

    어떻게 원자적으로 처리할 수 있을까?
    현대 CPU는 CAS연산을 지원하는 CMPXCHG와 같은 명령어를 지원한다.
    CMPXCHG 명령어는 메모리의 특정 주소에서 값을 읽고, 기대값과 비교한 후에 두값이 일치하면 새값으로 교체하는 과정을 단일 명령어로 처리한다.
    이 과정이 단일 명령어로 처리 되기 때문에, CAS연산이 실행되는 동안에는 다른 어떤 연산도 해당 메모리 주소에 접근할 수 없다.
    이는 하드웨어적으로 보장된 원자성을 의미한다.

    어? CAS 연산이 실행되는 동안 다른 어떤 연산도 해당 메모리 주소에 접근할 수 없다?
    이 말이 락을 걸어서 접근을 방지하는 건가? 오해할 수 있다.

그러나 여기서 중요한점은 CAS 자체는 락을 사용하는 연산이 아니다. CAS는 기본적으로 낙관적 동시성 제어의 개념을 따른다.
즉, 여러 스레드가 동시에 메모리 값에 접근할 수 있음을 가정하고, 그 값이 내가 예상한 값일 때에만 변경하는 것이다.
그 과정에서 CAS는 그 값이 예상대로라면 변경하고, 그렇지 않으면 다시 시도하는 방식으로 동작한다.

따라서, 메모리 값이 CAS가 실행되는 순간 다른 스레드에 의해 바뀌지 않는다는 것은 하드웨어 차원에서 버스락킹, 캐시일관성 프로토콜등을 통해서 하드웨어 차원에서 일시적으로 보장되는 것이고, CAS 자체는 락을 사용한 보호 메커니즘이 아니다.

이때문에 스레드가 값을 바꾸지 못하게 막는게 아니라, 그 값을 여러 스레드가 동시에 변경하려 할때 발생 할 수 있는 경합 상황을 해결하기 위해서 계속 값을 확인하는 동작을 수행한다.

이때문에 CAS 연산을 수행하면, 다른 프로세스가 해당 메모리 값을 바꾸었는지 반복적으로 확인하면서 계속 시도하는 과정을 수행한다.
이때 CPU가 계속 사용되는데, 이시간이 길어질 수 있다.
CPU가 그동안 다른 작업을 하지 못하고 계속 확인만 하게 되어서 CPU가 낭비되는 Busy-waiting 이 발생할 수 있다.

고로, 노드를 새로 추가할때는 null인지 확인해서 노드를 추가하고, 노드를 수정할때는 CAS연산에서 현재값과 기댓값을 비교하여서 수정한다.

정리

  • ConcurrentHashmap은 동시성을 만족하는 HashMap이다.
  • 동시성을 만족하기 위해서 이전에는 버킷에다가 Lock을 거는 과정을 수행했다. (synchronized,DEFAULT_CAPACITY, DEFAULT_CONCURRENCY_LEVEL 사용)
  • 그러나 이제는 Lock을 거는것이 아닌 CAS연산을 통해서 삽입과 수정을 수행한다.

https://velog.io/@gkdbssla97/%EC%9E%90%EB%A3%8C%EA%B5%AC%EC%A1%B0-HashMap-%EB%82%B4%EB%B6%80-%EB%8F%99%EC%9E%912-feat.-Java
https://rebugs.tistory.com/323
https://beststar-1.tistory.com/24

profile
항상 Why?[왜썻는지] What?[이를 통해 무엇을 얻었는지 생각하겠습니다.]

0개의 댓글