데이터 구조에서 락을 사용하는 방법에 대해 알아보자
데이터 구조에 락을 추가하면 해당 구조가 thread safe 하게 된다.
가장 간단한 데이터 구조 중 하나는 카운터이다.
1 typedef struct __counter_t {
2 int value;
3 } counter_t;
4
5 void init(counter_t *c) {
6 c->value = 0;
7 }
8
9 void increment(counter_t *c) {
10 c->value++;
11 }
12
13 void decrement(counter_t *c) {
14 c->value--;
15 }
16
17 int get(counter_t *c) {
18 return c->value;
19 }
non-synchronized(비동기화된) 카운터는 단순하여 작은 양의 코드가 필요하다. 어떻게하면 이 코드를 thread-safe하게 만들 수 있을까?
1 typedef struct __counter_t {
2 int value;
3 pthread_mutex_t lock;
4 } counter_t;
5
6 void init(counter_t *c) {
7 c->value = 0;
8 Pthread_mutex_init(&c->lock, NULL);
9 }
10
11 void increment(counter_t *c) {
12 Pthread_mutex_lock(&c->lock);
13 c->value++;
14 Pthread_mutex_unlock(&c->lock);
15 }
16
17 void decrement(counter_t *c) {
18 Pthread_mutex_lock(&c->lock);
19 c->value--;
20 Pthread_mutex_unlock(&c->lock);
21 }
22
23 int get(counter_t *c) {
24 Pthread_mutex_lock(&c->lock);
25 int rc = c->value;
26 Pthread_mutex_unlock(&c->lock);
27 return rc;
28 }
concurrent(동시) 카운터는 데이터 구조를 조작하는 루틴을 호출할 때 락을 얻고, 반환할 때 락을 푼다. 이 시점에서 working concurrent(작동하는 동시) 데이터 구조를 갖게된다.
한가지 고려해야할 사항은 성능이다. 각 스레드가 일정횟수의 업데이트를 수행하는 벤치마크를 실행하고 공유 카운터를 업데이트 해볼 수 있다. 그래서 활성 스레드가 하나에서 네 개 까지 인 경우에 걸리는 총 시간을 볼 수 있다. CPU가 많아지면 더 많은 작업을 수행하기를 기대한다. 하지만 동기화된 카운터의 성능이 나쁘게 보인다.

이상적으로는,여러 프로세스의 스레드도 단일스레드가 하나의 프로세서에서 수행하는 것과 같은 속도로 완료되기를 원한다. 이 목표를 달성하는 것을 perfect scaling(완벽한 스케일링)이라고 한다.
확장 가능한 counting이 없으면 리눅스의 일부 workload는 멀티코어머신에서 확장성 문제를 겪게된다. 이 문제를 해결하는 한가지 방식인 approximate counter(근사 카운터)에 대해서 알아보자
근사카운터는 다음과 같은 방식으로 작동한다. 각 CPU는 로컬 카운터를 각각 갖는다. 스레드들은 경합없이 로컬카운터를 업데이트 가능하기에 카운터의 업데이트는 확장 가능하다. 특정 코어에서 실행되는 스레드가 카운터를 증가시키려면 그 곳의 로컬 카운터를 증가시킨다. 이 로컬 카운터는 로컬 락으로 얻을 수 있다.
전역 카운터를 최신 상태로 유지하기 위해 로컬 카운터를 읽을 때, 전역 락을 획득하고 로컬 카운터의 값으로 전역 카운터의 값을 증가시킨다. 이 작업은 임계값 S에 의해 결정된다. S가 작을 수록 이전의 확장 불가한 카운터와 유사하게 작동하고, S가 클 수록 카운터는 확장가능하지만, 실제 카운트에서 멀어질 수 있다.

아래 사진에서는 S가 커질 수록 성능이 좋아지는 것을 볼 수 있다. 네개의 프로세서에서 카운터를 400만번 업데이트 하는 시간이 하나의 프로세서에서 100만번 업데이트하는 시간과 비슷하다.

근사 카운터의 대략적인 코드는 다음과 같다.
1 typedef struct __counter_t {
2 int global; // global count
3 pthread_mutex_t glock; // global lock
4 int local[NUMCPUS]; // per-CPU count
5 pthread_mutex_t llock[NUMCPUS]; // ... and locks
6 int threshold; // update freq
7 } counter_t;
8
9 // init: record threshold, init locks, init values
10 // of all local counts and global count
11 void init(counter_t *c, int threshold) {
12 c->threshold = threshold;
13 c->global = 0;
14 pthread_mutex_init(&c->glock, NULL);
15 int i;
16 for (i = 0; i < NUMCPUS; i++) {
17 c->local[i] = 0;
18 pthread_mutex_init(&c->llock[i], NULL);
19 }
20 }
21
22 // update: usually, just grab local lock and update
23 // local amount; once it has risen ’threshold’,
24 // grab global lock and transfer local values to it
25 void update(counter_t *c, int threadID, int amt) {
26 int cpu = threadID % NUMCPUS;
27 pthread_mutex_lock(&c->llock[cpu]);
28 c->local[cpu] += amt;
29 if (c->local[cpu] >= c->threshold) {
30 // transfer to global (assumes amt>0)
31 pthread_mutex_lock(&c->glock);
32 c->global += c->local[cpu];
33 pthread_mutex_unlock(&c->glock);
34 c->local[cpu] = 0;
35 }
36 pthread_mutex_unlock(&c->llock[cpu]);
37 }
38
39 // get: just return global amount (approximate)
40 int get(counter_t *c) {
41 pthread_mutex_lock(&c->glock);
42 int val = c->global;
43 pthread_mutex_unlock(&c->glock);
44 return val; // only approximate!
45 }
더 복잡한 구조인 연결 리스트에 대해 알아보자 간단함을 위해 동시 삽입,조회에만 집중하자
이에 대한 코드는 다음과 같다.
1 // basic node structure
2 typedef struct __node_t {
3 int key;
4 struct __node_t *next;
5 } node_t;
6
7 // basic list structure (one used per list)
8 typedef struct __list_t {
9 node_t *head;
10 pthread_mutex_t lock;
11 } list_t;
12
13 void List_Init(list_t *L) {
14 L->head = NULL;
15 pthread_mutex_init(&L->lock, NULL);
16 }
17
18 int List_Insert(list_t *L, int key) {
19 pthread_mutex_lock(&L->lock);
20 node_t *new = malloc(sizeof(node_t));
21 if (new == NULL) {
22 perror("malloc");
23 pthread_mutex_unlock(&L->lock);
24 return -1; // fail
25 }
26 new->key = key;
27 new->next = L->head;
28 L->head = new;
29 pthread_mutex_unlock(&L->lock);
30 return 0; // success
31 }
32
33 int List_Lookup(list_t *L, int key) {
34 pthread_mutex_lock(&L->lock);
35 node_t *curr = L->head;
36 while (curr) {
37 if (curr->key == key) {
38 pthread_mutex_unlock(&L->lock);
39 return 0; // success
40 }
41 curr = curr->next;
42 }
43 pthread_mutex_unlock(&L->lock);
44 return -1; // failure
45 }
insert 루틴 진입시 락을 획들하고, 종료시 락을 해제한다. malloc()이 실패할 경우는 insert가 실패하기 전에 락을 해제해야한다. 이런 예외적인 흐름은 오류가 발생하기 쉽다는 것이 밝혀졌다. 따라서 삽입 실패 경로에서 락 해제를 추가하는 경우를 피하는 것이 중요하다.
수정사항은 다음과 같다.
1 void List_Init(list_t *L) {
2 L->head = NULL;
3 pthread_mutex_init(&L->lock, NULL);
4 }
5
6 int List_Insert(list_t *L, int key) {
7 // synchronization not needed
8 node_t *new = malloc(sizeof(node_t));
9 if (new == NULL) {
10 perror("malloc");
11 return -1;
12 }
13 new->key = key;
14 // just lock critical section
15 pthread_mutex_lock(&L->lock);
16 new->next = L->head;
17 L->head = new;
18 pthread_mutex_unlock(&L->lock);
19 return 0; // success
20 }
21
22 int List_Lookup(list_t *L, int key) {
23 int rv = -1;
24 pthread_mutex_lock(&L->lock);
25 node_t *curr = L->head;
26 while (curr) {
27 if (curr->key == key) {
28 rv = 0;
29 break;
30 }
31 curr = curr->next;
32 }
33 pthread_mutex_unlock(&L->lock);
34 return rv; // now both success and failure
35 }
malloc() 자체가 thread-safe하다면 연결 리스트를 업데이트 할 때만 락을 획득하면 된다.
연결 리스트 내에서 더 많은 동시성을 가능하게 하는 한 가지 방법은 hand-over-hand locking (또는 lock coupling)이라는 방법이다. 리스트 전체에 락을 하나 거는 대신에 각 노드마다 락을 추가한다. 리스트를 순회할 때 다음 노드의 락을 얻고 현재 노드의 락을 해제한다.
하지만 이 방법은 단순한 단일 락 접근 방식보다 더 빠르게 만드는 것이 어렵다. 노드를 순회할 때 마다 락을 획득하고 해제하는 것이 오버헤드가 크기 때문이다.
큐 또한 앞선 방법들 처럼 하나의 큰 락을 추가하면 동시성 데이터 구조를 만들 수 있을 것이다.
그러니 우리는 Michael과 Scott가 설계한 약간 더 동시성이 있는 큐를 살펴보자
1 typedef struct __node_t {
2 int value;
3 struct __node_t *next;
4 } node_t;
5
6 typedef struct __queue_t {
7 node_t *head;
8 node_t *tail;
9 pthread_mutex_t head_lock, tail_lock;
10 } queue_t;
11
12 void Queue_Init(queue_t *q) {
13 node_t *tmp = malloc(sizeof(node_t));
14 tmp->next = NULL;
15 q->head = q->tail = tmp;
16 pthread_mutex_init(&q->head_lock, NULL);
17 pthread_mutex_init(&q->tail_lock, NULL);
18 }
19
20 void Queue_Enqueue(queue_t *q, int value) {
21 node_t *tmp = malloc(sizeof(node_t));
22 assert(tmp != NULL);
23 tmp->value = value;
24 tmp->next = NULL;
25
26 pthread_mutex_lock(&q->tail_lock);
27 q->tail->next = tmp;
28 q->tail = tmp;
29 pthread_mutex_unlock(&q->tail_lock);
30 }
31
32 int Queue_Dequeue(queue_t *q, int *value) {
33 pthread_mutex_lock(&q->head_lock);
34 node_t *tmp = q->head;
35 node_t *new_head = tmp->next;
36 if (new_head == NULL) {
37 pthread_mutex_unlock(&q->head_lock);
38 return -1; // queue was empty
39 }
40 *value = new_head->value;
41 q->head = new_head;
42 pthread_mutex_unlock(&q->head_lock);
43 free(tmp);
44 return 0;
45 }
이 코드에는 두개의 락이 있다. 이 두 락의 목표는 enqueue와 dequeue를 동시에 가능하게 하는 것이다. 일반적으로 enqueue 루틴은 꼬리쪽 락, dequeue 루틴은 머리쪽 락에만 접근한다.
이전에 살펴보았던 동시 리스트를 사용하여 구현된다. 전체 구조에 대해 단일 락을 가지는 대신 각 해시 버킷마다 락을 사용해서 많은 동시성 작업이 이루어질 수 있다.
1 #define BUCKETS (101)
2
3 typedef struct __hash_t {
4 list_t lists[BUCKETS];
5 } hash_t;
6
7 void Hash_Init(hash_t *H) {
8 int i;
9 for (i = 0; i < BUCKETS; i++)
10 List_Init(&H->lists[i]);
11 }
12
13 int Hash_Insert(hash_t *H, int key) {
14 return List_Insert(&H->lists[key % BUCKETS], key);
15 }
16
17 int Hash_Lookup(hash_t *H, int key) {
18 return List_Lookup(&H->lists[key % BUCKETS], key);
19 }
아래 사진은 네 개의 스레드에서 동시 업데이트를 수행할 때 해시 테이블의 성능을 보여준다. 단일 락이 있는 연결 리스트에 비해서 훨씬 잘 작동하는 것을 볼 수 있다.

동시성을 추가하는 것이 더 복잡해지고 비용이 많이 든다면 성능이 좋아지지 않을 것이다.