프로세스 동기화가 필요한 이유

논리적인 주소 공간(코드,데이터 영역)을 공유하거나, 공유 데이터를 가질 때

공유 데이터에 concurrent하게 접근하면 데이터 불일성(data inconsistency)가 발생할 수 있다.

  • Concurrent 실행일 때 프로세스가 공유 데이터를 수정하던 도중에 인터럽트가 발생할 수 있음
  • Parallel 실행일 때 분리된 코어에서 동시에 명령어를 실행할 수 있음

데이터 일관성을 유지하기 위해 프로세스의 실행을 제어할 필요가 있다.

생산자-소비자 문제에서 발생할 수 있는 데이터 불일치

producer

while(true){
	while(count== BUFFER_SIZE)
    	; /* do nothing */
	
    buffer[in] = next_item;
    in = (in+1) % BUFFER_SIZE;
    count++;
}

consumer

while(true){
	while(count== 0)
    	; /* do nothing */
	
    next_item = buffer[out];
    out = (out+1) % BUFFER_SIZE;
    count--;
}

위 코드가 개별적으로는 올바를지라도, concurrent하게 실행하면 올바르게 동작하지 않는다. 공유 데이터 counter를 수정하는 코드에서 문제가 발생할 수 있다.

counter의 값을 변경하는 counter++, counter-- 명령어가 병행하게 실행한다고 가정해보자

counter++

register1 = counter
register1 = register1 + 1
counter = register1

counter--

register2 = counter
register2 = register1 - 1
counter = register2

counter++, counter--를 기계어로 보면 위와 같이 볼 수 있다.

register1, register2가 동일한 레지스터이더라도, 레지스터의 내용은 문맥 교환시 메모리에 보관되었다가 적재된다.

counter의 값을 변경하는 counter++, counter-- 명령어들은 아래와 같이 임의적인 순서로 인터리빙 방식으로 수행된다.

[Producer] register1 = counter // register1 = 5
[Producer] register1 = register1 + 1 // register1 = 6
[Consumer] register2 = counter // register2 = 5
[Consumer] register2 = register2 - 1 // register2 = 4
[Producer] counter = register1 // counter = 6
[Consumer] counter = register2 // counter = 4

정상적으로 counter = 5가 되어야 하지만, 위의 결과는 4이다.

이러한 데이터 불일치가 발생한 것은 두 개의 프로세스가 동시에 공유 변수 counter를 조작하도록 허용했기 때문이다.

따라서 프로세스가 공유 데이터에 접근하는 순서를 제어해야 한다.


Race Condition(경쟁 상황)

동시에 여러 개의 프로세스가 공유 데이터를 접근하여 조작하고, 그 실행 결과가 접근이 발생한 순서에 의존하는 상황

데이터 불일치를 막으려면 Race Condition을 방지해야 한다. 특정 시점에 오직 하나의 프로세스만이 공유 데이터를 조작하도록 해야 한다.

package process_synchronization;

public class RaceCondition {

    static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(new MyRunnable());
        Thread thread2 = new Thread(new MyRunnable());

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();

        System.out.println(count); // 13415
    }

    static void increment() {
        count++;
    }
}

class MyRunnable implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            RaceCondition.increment();
        }
    }
}

Critical Section

✔️ CSP

프로세스들이 협력할 때 사용할 수 있는 동기화 프로토콜를 설계하는 것

  • entry section = 임계 영역에 접속하기 위해 허가를 요청하는 영역
  • critical section = 다른 프로세스와 공유하는 데이터를 변경하는 작업을 수행하는 코드 조각
  • exit section = 허가를 반환하는 영역
  • remainder section = 나머지 영역

✔️ CSP Solution requirements

  1. Mutual Exclusion(상호 배제): 한 프로세스가 임계 구역에서 실행되고 있다면, 다른 프로세스들은 임계 구역에 진입할 수 없어야 한다.

  2. Progress(진행): 임계 영역에서 실행되는 프로세스가 없고, 임계 영역에 진입하려는 프로세스가 있을 때 진입하려는 프로세스는 진입할 수 있어야 한다. -> deadlock 방지

  3. Bounded Waiting(한정된 대기): 진입 허가 요청 대기 시간이 한정적이여야 한다. -> starvation 방지

CSP를 해결하기 위해서 위 세가지 조건을 만족해야 한다.


소프트웨어 동기화

✔️ 피터슨 알고리즘

특징

do {
	flag[i] = true;
    turn = j;
    while(flag[j] && turn ==j); // entry section
    
	// critical section
    
    flag[i] = false; // exit section
    
    // remainder section
    
} while(true);
  • 공유 변수(turn, boolean flag[2])를 사용하여 해결한다.
  • 임계구역과 나머지 구역을 번갈아 가며 실행하는 두 개의 프로세스로 한정된다.

장점

  • mutual exclusioin, progress, bounded waiting 조건을 만족한다.

단점

  • 현대 컴퓨터 구조가 load, store와 같은 기본적인 기계어를 수행하는 방식 때문에 올바르게 실행된다고 보장하지 않는다.

c++ 구현

#include "stdio.h"
#include "pthread.h"

int sum = 0;

int turn;
int flag[2];

void *producer(void *param);

void *consumer(void *param);

int main() {
    pthread_t tid1, tid2;

    pthread_create(&tid1, NULL, producer, NULL);
    pthread_create(&tid2, NULL, consumer, NULL);

    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    printf("sum = %d\n", sum); // 19997
}

void *producer(void *param) {
    for (int k = 0; k < 10000; k++) {
        flag[0] = true;
        turn = 1;
        while (flag[1] && turn == 1);

        sum++;

        flag[0] = false;
    }

    pthread_exit(0);
}

void *consumer(void *param) {
    for (int k = 0; k < 10000; k++) {
        flag[1] = true;
        turn = 0;
        while (flag[0] && turn == 0);

        sum++;

        flag[1] = false;
    }

    pthread_exit(0);
}

하드웨어 동기화

✔️ Atomic operation

  • 피터슨 알고리즘에서 보았듯이 entry section에서 인터럽트가 발생하면 race condition이 발생할 수 있다. 단일 프로세서 환경에서는 인터럽트를 허용하지 않게하여 CSP를 해결할 수 있지만, 멀티 프로세서 환경에 적용할 수 없기 때문에, 하드웨어의 도움이 필요해진다.

  • 현대 컴퓨터는 워드(word) 내용을 검사하고 변경하거나, 두 워드의 내용을 원자적으로 교환할 수 있는, 즉 인터럽트 되지 않는 하나의 단위로써, atomic한 명령어를 제공한다.

atomic value

atomic value는 내부에서 atomic 명령어를 사용하여 value에 대한 race condition을 막는다

✔️ test_and_set()

특징

boolean test_and_set(boolean *target){
	boolean rv = *target;
    *target = true;
    return rv;
}
  • 위 블록을 하나의 하드웨어 명령어(atomic한 명령어로 지정, 실행 도중에 문맥교환X) 제공한다.

  • false로 초기화되는 전역 변수 lock을 사용한다.

do{
    while(test_and_set(&lock));  // entry section
        
	// critical section
        
	lock = false; // exit section
    
} while(true);

장점

atomic한 명령어를 제공하여 mutual exclusion 제공

단점

bounded waiting은 제공하지 않는다.

✔️ compare_and_swap()

특징

int compare_and_swap(int *value, int expected, int new_value){
	int temp = *value;
    
    if(*value==expected)
    	*value = new_value;
        
	return temp;
}
  • 위 블록을 하나의 하드웨어 명령어(atomic한 명령어로 지정, 실행 도중에 문맥교환X) 제공한다.

  • 0으로 초기화되는 전역 변수 lock을 사용한다,

while(true){
	while(compare_and_swap(&lock,0,1)!=0); // entry section
    
    // critical section
    
    lock = 0;
}

장점

atomic한 명령어를 제공하여 mutual exclusion 제공

단점

bounded waiting은 제공하지 않는다.

atomic value를 사용한 피터슨 알고리즘

package process_synchronization;

import java.util.concurrent.atomic.AtomicBoolean;

import static process_synchronization.PetersonAlgorithmWithAtomicValue.*;

public class PetersonAlgorithmWithAtomicValue {
    static final int NUMBER = 10000;
    static int count;
    static int turn;
    static AtomicBoolean[] flags; // atomic value

    static {
        count = 0;
        turn = 0;
        flags = new AtomicBoolean[2];
        for (int i = 0; i < flags.length; i++) {
            flags[i] = new AtomicBoolean();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Runnable1());
        Thread t2 = new Thread(new Runnable2());

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println(count);
    }

    static void increment() {
        count++;
    }
}


class Runnable1 implements Runnable {
    @Override
    public void run() {

        flags[0].set(true);
        turn = 1;

        while (flags[1].get() && turn == 1) { // entry section
        }

        for (int i = 0; i < NUMBER; i++) { // critical section
            increment();
        }

        flags[0].set(false); // exit section
    }
}

class Runnable2 implements Runnable {
    @Override
    public void run() {

        flags[1].set(true);
        turn = 0;

        while (flags[0].get() && turn == 0) { // entry section
        }

        for (int i = 0; i < NUMBER; i++) { // critical section
            increment();
        }

        flags[1].set(false); // exit section
    }
}

Mutual Exclusion 기법

✔️ Mutex Lock

특징

while(true){
	// acquire lock
    
    	// critical section
        
	// release lock
    	
        // remainder section
}
  • 프로세스는 임계 구역에 진입하기 전에 Lock을 획득해야 한다.
  • 임계 구역에 빠져나올 때는 Lock을 반환해야 한다.
acquire(){
	while(!available); // busy wait
    available = false;
}

release(){
	available = true;
}
  • mutext lock 연산에는 acquire(), release() 두가지가 있고, 두가지 연산 모두 atomic하게 실행되어야 한다.

  • available 이라는 불린 변수를 사용한다

c++ 예시

#include "stdio.h"
#include "pthread.h"

int sum = 0; // shared data

pthread_mutex_t mutex;

void *counter(void *param);

int main() {

    pthread_t tid1, tid2;

    pthread_mutex_init(&mutex, NULL);

    pthread_create(&tid1, NULL, counter, NULL);
    pthread_create(&tid2, NULL, counter, NULL);

    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    printf("sum = %d\n", sum);
}

void *counter(void *param) {
    for (int k = 0; k < 10000; k++) {
        pthread_mutex_lock(&mutex); // entry section

        sum++; // critical section

        pthread_mutex_unlock(&mutex); // exit section
    }
}

장점

  • mutual exclusion을 보장한다.
  • 멀티코어 환경에서는 CPU 코어을 점유하면서 락을 대기하기 때문에 컨텍스트 스위치 오버헤드가 생기지 않는다.

단점

  • Busy Waiting 하기 때문에 CPU 싸이클을 낭비한다.
  • Progress, Bounded waiting을 보장하지 않는다

Busy waiting

  • 락을 획득하기 위해 CPU 코어를 점유하면서 반복문을 계속 실행하는 것을 말한다.
  • CPU 싸이클을 낭비한다.

Spinlock

  • Busy waiting하는 뮤텍스 락을 말한다.
  • 멀티코어 환경에서는 스핀락은 CPU 코어을 점유하면서 락을 대기하기 때문에 컨텍스트 스위치 오버헤드가 생기지 않는다.

✔️ Semaphore

특징

wait(S){
	while(S<=0){
    	sleep();
    }
    S--;
}

signal(S){
	S++;
    if(S>0({
    	wakeup();
    }
}
  • 세마포어 S는 정수이다.
    • S = 가용한 자원의 개수
  • wait(),signal() 연산을 사용하여 S 값을 변경한다.
    • wait(): 세마포어 S를 감소
    • signal(): 세마포어 S를 증가
  • wait(),signal() 연산은 atomic 하게 수행되어야 한다.
  • 세마포어가 0이하 이면 Busy waiting을 하지 않고, 대기 큐로 이동한다.
  • signal() 연산으로 세마포어가 증가하면 대기 큐에서 대기하는 프로세스는 레디 큐로 이동한다

장점

  • busy waiting을 해결할 수 있다.
  • 상호 배제를 보장한다.

단점

  • 임계 영역이 짧은 시간안에 수행될 경우, 컨텍스트 스위치 오버헤드가 커진다.
  • Progress, Bounded waiting을 보장하지 않는다
  • 순서가 정확하지 않으면 timing error가 발생하고 코드가 복잡할수록 에러를 찾기 쉽지 않다.

c++ 구현

#include "stdio.h"
#include "pthread.h"
#include "semaphore.h"

int sum = 0;

sem_t sem;

void *counter(void *param);

int main() {
    pthread_t tid1, tid2;

    sem_init(&sem, 0, 1);

    pthread_create(&tid1, NULL, counter, NULL);
    pthread_create(&tid2, NULL, counter, NULL);

    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    printf("sum = %d\n", sum);
}

void *counter(void *param) {
    for (int k = 0; k < 10000; k++) {
        sem_wait(&sem);

        sum++;

        sem_post(&sem);
    }
}

✔️ Monitor

특징

  • 모니터는 내부에 Mutual Exclusion을 보장하는 연산을 포함하는 ADT이다.
  • 모니터 내부에서 condition 변수를 사용하여 부가적인 동기화를 작성할 수 있다.
    • condition 변수는 wait(),signal()을 제공한다.
    • signal()은 정확히 하나의 waiting 프로세스를 재개한다.
  • 모니터 내부 구현은 뮤텍스락, 세마포어 등 구현하기 나름이다.

장점

  • Mutual Exclusion을 보장한다.
  • 모니터 안에 항상 하나의 프로세스만이 위치하므로, 프로그래머들이 동기화 제약 조건을 명시적으로 코딩할 필요가 없어진다.

단점

  • Progress, Bounded Waiting은 보장하지 않는다

Java의 모니터락

  • 자바는 Monitor-lock이라는 모니터와 유사한 동기화 기법을 제공한다. (synchronized, wait(), notify() 등등 )

synchronized

  • 임계 영역에 해당하는 코드 블록을 선언하는 키워드
  • 해당 코드 블록에는 모니터락 객체 인스턴스을 획득해야 진입 가능
    • 모니터락을 가진 객체 인스턴스를 지정할 수 있음
  • 메서드에 선언하면 메서드 코드 블록 전체가 임계영역으로 지정됨
    • 이 때, 모니터락을 가진 객체 인스턴스는 this 객체 인스턴스이다(모니터 락을 가진 객체 인스턴스는 해당 메서드를 가진 클래스의 인스턴스이다.)

wait(), notifiy()

  • java.lang.Object 클래스에 선언됨
  • 쓰레드가 어떤 객체의 wait() 메서드를 호출하면
    • 해당 객체의 모니터락을 획득하기 위해 대기 상태로 진입
  • 쓰레드가 어떤 객체의 notify() 메서드를 호출하면
    • 해당 객체 모니터에 대기중인 쓰레드 하나를 깨움
  • notify() 대신에 notifyAll()을 호출하면
    • 해당 객체 모니터에 대기중인 쓰레드 전부를 깨움
package process_synchronization;

public class MonitorEx {

    static final int MAX = 10000;

    public static void main(String[] args) throws InterruptedException {
        syncEx1(); // 메서드 전체에 synchronized(클래스 자체가 모니터락 소유)-> 50000
        syncEx2(); // 메서드 내부에서 synchronized -> 50000
        syncEx3(); // 쓰레드마다 다른 모니터락 객체 인스턴스 설정 -> 49998
        syncEx4(); // 쓰레드마다 같은 모니터락 객체 인스턴스 설정 -> 50000
    }

    static void syncEx1() throws InterruptedException {
        Thread[] threads = new Thread[5];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < MAX; j++) {
                    Monitor.increment1();
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].join();
        }

        System.out.println("[Ex1] count = " + Monitor.count);
    }

    static void syncEx2() throws InterruptedException {
        Monitor.count = 0;

        Thread[] threads = new Thread[5];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < MAX; j++) {
                    Monitor.increment2();
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].join();
        }

        System.out.println("[Ex2] count = " + Monitor.count);

    }


    private static void syncEx3() throws InterruptedException {
        Monitor.count = 0;

        Thread[] threads = new Thread[5];

        for (int i = 0; i < threads.length; i++) {
            Monitor monitor = new Monitor();
            threads[i] = new Thread(() -> {
                for (int j = 0; j < MAX; j++) {
                    monitor.increment3();
                }
            });

            threads[i].start();
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].join();
        }

        System.out.println("[Ex3] count = " + Monitor.count);
    }

    static void syncEx4() throws InterruptedException {
        Monitor.count = 0;

        Monitor monitor = new Monitor();
        Thread[] threads = new Thread[5];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < MAX; j++) {
                    monitor.increment3();
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].join();
        }

        System.out.println("[Ex4] count = " + Monitor.count);
    }
}

class Monitor {

    static int count = 0;
    static Object object = new Object();

    synchronized public static void increment1() {
        count++;
    }

    static void increment2() {
        synchronized (object) {
            count++;
        }
    }

    public void increment3() {
        synchronized (this) {
            count++;
        }
    }
}

Classic Synchronization Problems

✔️ Bounded buffer problem

프로듀서-컨슈머에서 동시에 버퍼에 접근할 경우, Race Condition이 발생할 수 있다.

특징

producer

while(true){
  wait(empty);
  wait(mutex);
  
  // critical section
  
  signal(mutex);
  signal(full);
}

consumer

while(true){
  wait(full);
  wait(mutex);
  
  // critical section
  
  signal(mutex);
  signal(empty);
}
  • 사용 자료구조
    • semaphore mutex = 1; mutex = CS mutex
    • semaphore empty = n; n=버퍼 개수
    • semaphore full = 0;

단점

  • 코드가 복잡해지고 순서가 바뀌면 제어하기가 어렵다

c 구현

# include "stdio.h"
# include "stdlib.h"
# include "unistd.h"
# include "pthread.h"
# include "semaphore.h"

#define BUFFER_SIZE 5
#define NUMBER_OF_PRODUCERS 1
#define NUMBER_OF_CONSUMERS 1

void *produce(void *param);

void *consume(void *param);

void insert_item(int item);

void remove_item(int *item);

int buffer[BUFFER_SIZE];

pthread_mutex_t mutex;
sem_t empty, full;

int in = 0, out = 0;

int main() {
    pthread_t tid;

    pthread_mutex_init(&mutex, NULL);
    sem_init(&empty, 0, BUFFER_SIZE);
    sem_init(&full, 0, 0);

    for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
        pthread_create(&tid, NULL, produce, NULL);
    }

    for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
        pthread_create(&tid, NULL, consume, NULL);
    }

    sleep(10);
    return 0;
}

void *produce(void *param) {
    int item;

    while (true) {
        usleep((1 + rand() % 5) * 10000);
        item = 1000 + rand() % 1000;
        insert_item(item); // critical section
    }
}

void *consume(void *param) {
    int item;
    while (true) {
        usleep((1 + rand() % 5) * 10000);
        remove_item(&item); // critical section
    }
}

void insert_item(int item) {
    sem_wait(&empty);
    pthread_mutex_lock(&mutex);

    buffer[in] = item;
    in = (in + 1) % BUFFER_SIZE;
    printf("[PRODUCER] inserted $%d\n", item);

    pthread_mutex_unlock(&mutex);
    sem_post(&full);
}

void remove_item(int *item) {
    sem_wait(&full);
    pthread_mutex_lock(&mutex);

    *item = buffer[out];
    out = (out + 1) % BUFFER_SIZE;
    printf("[CONSUMER] removed $%d\n", *item);

    pthread_mutex_unlock(&mutex);
    sem_post(&empty);
}

java 구현

package process_synchronization;

public class BoundedBufferSolution {

    public static void main(String[] args) {
        Buffer buffer = new Buffer(1);
        Thread[] producers = new Thread[1];
        Thread[] consumers = new Thread[1];

        for (int i = 0; i < producers.length; i++) {
            producers[i] = new Thread(() -> {
                while (true) {
                    try {
                        Thread.sleep((long) (Math.random() * 500));
                        int item = ((int) (1 + Math.random() * 9)) * 10000;
                        buffer.produce(item);
                    } catch (InterruptedException e) {
                    }
                }
            });
            producers[i].start();
        }

        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new Thread(() -> {
                while (true) {
                    try {
                        Thread.sleep((long) (Math.random() * 500));
                        buffer.consume();
                    } catch (InterruptedException e) {
                    }
                }
            });
            consumers[i].start();
        }
    }
}

class Buffer {
    private int[] buffer;
    private int cnt;
    private int in;
    private int out;

    public Buffer(int bufferSize) {
        this.buffer = new int[bufferSize];
        this.cnt = this.in = this.out = 0;
    }

    synchronized public void produce(int item) {
        while (this.cnt == buffer.length) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }

        buffer[in] = item;
        in = (in + 1) % buffer.length;
        cnt++;
        System.out.printf("[Producer] item = %d\n", item);

        notify();
    }

    synchronized public int consume() {
        while (this.cnt == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }

        int item = buffer[out];
        out = (out - 1) % buffer.length;
        cnt--;
        System.out.printf("[Consumer] item = %d\n", item);

        notify();
        return item;
    }
}

✔️ Readers-writers problem

데이터베이스에 같은 여러개의 프로세스가 동시에 접근한다고 했을 때, 프로세스는 읽는 작업만 수행하는 프로세스(readers), 읽고 쓰는 작업을 수행하는 프로세스(writers)로 나눌 수 있다.

readers-writers 문제는 아래 2개의 우선순위 상황을 기반으로 발생할 수 있다.

    1. writer가 대기하고 있다고 reader는 다른 reader가 끝날때까지 대기하면 안된다
    1. writer가 대기하는 상태에서 새로운 reader는 reading을 시작하면 안된다

이 포스트에서는 1번 우선순위 상황에 대하여 발생하는 문제를 해결하는 방법을 알아볼 것이다.

특징

writer

while(true){
	wait(rw_mutex);
    
    // writing opereation
    
    signal(rw_mutex);
}

reader

while(true){
	wait(mutex);
    read_count++;
    if(read_count==1)
    	wait(rw_mutex);
	signal(mutex)
    
    // reading operation
    
	wait(mutex);
    read_count--;
    if(read_count==0)
    	wait(rw_mutex);
	signal(mutex)
}
  • 사용 자료구조
    • int read_count = 0 = reader 카운트 변수
    • sempahore rw_mutex = 1 = reader,writer 공유
    • sempahore mutex = 1 = read_count를 위한 mutex
  • writer가 임계영역에 있으면 n개의 reader는 대기 중이다. 1개의 reader만이 rw_mutex를 대기하고, 나머지 n-1개의 reader는 mutex를 대기한다.

java 구현

public class Database {
    private int readCnt = 0;
    private boolean writeMode = false;

    public void read() {
        acquireReadLock();
		// read operation
        releaseReadLock();

    }

    public void write() {
        acquireWriteLock();
		// write operation
        releaseWriteLock();
    }

    synchronized private void acquireWriteLock() {
        while (readCnt > 0 || writeMode) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }

        writeMode = true;
    }

    synchronized private void releaseWriteLock() {
        writeMode = false;
        notifyAll();
    }

    synchronized private void releaseReadLock() {
        readCnt--;
        if (readCnt == 0) {
            notify();
        }
    }

    synchronized private void acquireReadLock() {
        while (writeMode) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }

        readCnt++;
    }
}

✔️ Dining philosophers problem

  • 원형 테이블에 5명의 철학자가 있고 철학자 사이에는 각각 1개의 젓가락이 있을 때 발생하는 동기화 문제

세마포어 솔루션

while(true){
	wait(chopstick[i]);
    wait(chopstick[(i+1)%5]);
    
    // eat
    
	signal(chopstick[i]);
    signal(chopstick[(i+1)%5]);
    
    // think
}
  • 젓가락을 세마포어로 보아서 해결한다 semaphore chopstick[5]
  • mutual exclusion은 보장하나, deadlock과 starvation을 보장하지 않는다

기타 솔루션

  • 솔루션 1: 두 개의 젓가락을 모두 사용할 수 있을때 사용한다.
  • 솔루션 2: 홀수번째 철학자는 왼쪽 젓가락부터 집고, 짝수번째 철학자는 오른쪽 젓가락부터 집는다

위 두가지 솔루션 모두 데드락은 해결할 수 있으나, starvation을 해결하는 것은 보장하지 않는다.

Monitor 솔루션

  • 철학자의 상태를 hungry, eating, thinking 세가지 상태로 구분한다
  • 두 이웃 철학자가 eating 상태가 아니면 철학자는 먹는다
  • DiningPhilosopher 이라는 모니터에서 mutual exclusion을 보장하는 연산을 제공한다
    • 철학자는 식사하기 위해서 pickup 연산을 수행한다
    • 다 먹으면, putdown 연산을 수행한다

c구현

# include "stdio.h"
# include "stdlib.h"
# include "unistd.h"
# include "pthread.h"

#define NUM_OF_PHILS 5

enum {
    THINKING, HUNGRY, EATING
} state[NUM_OF_PHILS];

pthread_mutex_t mutex_lock;
pthread_cond_t cond_vars[NUM_OF_PHILS];

void init();

int leftOf(int i);

int rightOf(int i);

void *philosopher(void *param);

void think(int id);

void eat(int id);

void test(int id);

void pickup(int id);

void putdown(int id);

int main() {
    pthread_t tid;
    init();

    for (int i = 0; i < NUM_OF_PHILS; i++)
        pthread_create(&tid, NULL, philosopher, (void *) &i);

    for (int i = 0; i < NUM_OF_PHILS; i++)
        pthread_join(tid, NULL);

    return 0;
}

void init() {
    for (int i = 0; i < NUM_OF_PHILS; i++) {
        state[i] = THINKING;
        pthread_cond_init(&cond_vars[i], NULL);
    }
    pthread_mutex_init(&mutex_lock, NULL);
    srand(time(0));
}

int leftOf(int i) {
    return (i + NUM_OF_PHILS - 1) % NUM_OF_PHILS;
}

int rightOf(int i) {
    return (i + 1) % NUM_OF_PHILS;
}

void *philosopher(void *param) {
    int id = *((int *) param);
    while (true) {
        think(id);
        pickup(id);
        eat(id);
        putdown(id);
    }
}

void think(int id) {
    printf("%d is thinking\n", id);
    usleep((1 + rand() % 50) * 10000);
};

void eat(int id) {
    printf("%d is eating\n", id);
    usleep((1 + rand() % 50) * 10000);
};


void test(int id) {
    if (state[id] == HUNGRY && state[leftOf(id)] != EATING && state[rightOf(id)] != EATING) {
        state[id] = EATING;
        pthread_cond_signal(&cond_vars[id]);
    }
};

void pickup(int id) {
    pthread_mutex_lock(&mutex_lock);

    state[id] = HUNGRY;
    test(id);
    while (state[id] != EATING) {
        pthread_cond_wait(&cond_vars[id], &mutex_lock);
    }

    pthread_mutex_unlock(&mutex_lock);
};

void putdown(int id) {
    pthread_mutex_lock(&mutex_lock);

    state[id] = THINKING;
    test(leftOf(id));
    test(rightOf(id));

    pthread_mutex_unlock(&mutex_lock);
};

java 구현

package process_synchronization;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static process_synchronization.State.*;

enum State {
    THINKING, EATING, HUNGRY;
}

public class DiningPhilosopher {
    static final int numOfPhils = 5;

    public static void main(String[] args) {
        Philosopher[] philosophers = new Philosopher[numOfPhils];
        DiningPhilosopherMonitor monitor = new DiningPhilosopherMonitor(numOfPhils);

        for (int id = 0; id < numOfPhils; id++) {
            new Thread(new Philosopher(id, monitor)).start();
        }
    }
}


class Philosopher implements Runnable {

    private int id;
    private DiningPhilosopherMonitor monitor;

    public Philosopher(int id, DiningPhilosopherMonitor monitor) {
        this.id = id;
        this.monitor = monitor;
    }

    @Override
    public void run() {
        while (true) {
            think();
            monitor.pickup(this.id);
            eat();
            monitor.putdown(this.id);
        }
    }

    private void eat() {
        System.out.printf("[%d] Eating\n", this.id);
        try {
            Thread.sleep((long) (Math.random() * 500));
        } catch (InterruptedException e) {

        }
    }

    private void think() {
        System.out.printf("[%d] Thinking\n", this.id);
        try {
            Thread.sleep((long) (Math.random() * 50));
        } catch (InterruptedException e) {
        }

    }
}

class DiningPhilosopherMonitor {

    private int numOfPhills;
    private State[] states;
    private Condition[] self;
    private Lock lock;

    public DiningPhilosopherMonitor(int numOfPhills) {
        this.numOfPhills = numOfPhills;
        this.states = new State[numOfPhills];
        this.self = new Condition[numOfPhills];
        this.lock = new ReentrantLock();
        for (int i = 0; i < states.length; i++) {
            states[i] = THINKING;
            self[i] = lock.newCondition();
        }
    }

    private int leftOf(int id) {
        return (id + this.numOfPhills - 1) % this.numOfPhills;
    }

    private int rightOf(int id) {
        return (id + +1) % this.numOfPhills;
    }

    public void pickup(int id) {
        lock.lock();

        try {
            states[id] = HUNGRY;
            test(id);
            while (states[id] != EATING) {
                self[id].await();
            }
        } catch (InterruptedException e) {
        } finally {
            lock.unlock();
        }

    }

    public void putdown(int id) {
        lock.lock();

        try {
            states[id] = THINKING;
            test(leftOf(id));
            test(rightOf(id));
        } finally {
            lock.unlock();
        }
    }

    private void test(int id) {
        if (states[id] == HUNGRY && states[leftOf(id)] != EATING && states[rightOf(id)] != EATING) {
            states[id] = EATING;
            self[id].signal();
        }
    }
}
  • 상호 배제와 deadlock-free는 보장하나, 여전히 starvation-free는 보장하지 않는다

Liveness

✔️ Liveness

  • CSP requirements 중 progress, bounded-waiting
  • 세마포어와 모니터는 Liveness를 보장하지 못한다
  • Deadlock, priority inversion 상황은 Liveness를 실패하게 한다

✔️ Deadlock

두 개 이상의 프로세스가 자원을 소유한채로 무한히 대기하는 상황

✔️ Priority Inversion

  • 높은 우선순위를 가진 프로세스가 낮은 우선순위 프로세스가 자원을 해제하기를 기다리는 상황

    • ex) L < M < H 우선순위 순인 프로세스들이 있다고 가정. L은 H가 필요로 하는 리소스 R을 점유하고 있는 상태이고, H는 L이 R 사용을 마칠때까지 기다리고 있는데, L이 M에 의해 선점되었을 경우 우선순위 역전 현상이 발생한다
  • 해결 방안 = 더 높은 우선순위 프로세스가 필요로 하는 자원을 접근하는 모든 프로세스들은 문제가 된 자원의 사용이 끝날 때까지 더 높은 우선순위를 상속받도록 하고, 자원 사용이 긑나면 원래 우선순위로 되돌아간다


Linux Syncrhonization

  • 2.6 버전 이전의 리눅스는 비선점형 커널이었으나, 현재의 리눅스 커널은은 완전 선점 가능하다
  • 리눅스는 커널 안의 임계영역을 보호하기 위해 mutext 락을 사용한다.
    • 임계 영역에 진입하기 위해 mutex_lock()을 호출해야하고, 나오기 전에 mutext_unlock()을 호출해야 한다.
  • SMP 기계에서 기본적인 락킹 기법은 스핀락이다. 그리고 스핀락이 짧은 시간 동안만 소유되도록 설계되었다.
  • 단일 처리기에서는 스핀락을 획득하는 것이 아니라 커널이 커널 선점을 불가능하게 한다.
단일 처리기다중 처리기
커널 선점을 불가능하게 한다스핀락을 획득한다
커널 선점을 가능하게 한다스핀락을 방출한다

스핀락과 커널 선점 불가능 및 가능은 오직 락이 짧은 시간 동안만 유지될 때 사용된다. 락이 오랜 시간동안 유지되어야 한다면 세마포을 사용하는 것이 적절하다

0개의 댓글

Powered by GraphCDN, the GraphQL CDN