소켓프로그래밍#12 : 멀티쓰레드와 동기화

kkado·2022년 6월 6일
0

네트워크프로그래밍

목록 보기
15/16
post-thumbnail

쓰레드의 등장 배경

이전에 멀티 프로세스 환경에 대해서 다룰 때 단점들에 대해서 언급한 적이 있다.

멀티 프로세스는 크게 두 가지 문제가 있다.

  • 우선 프로세스를 생성하는 데 많은 리소스가 소모되고 프로세스 간의 context switching으로 인해 성능이 저하된다.
  • 또한 데이터의 교환이 어렵다. 프로세스끼리는 메모리가 독자적으로 운영되기 때문에 프로세스 간에 데이터 공유는 불가능하고, 운영체제가 별도로 제공하는 IPC 기법을 사용해야 가능하다.

그렇다면 쓰레드는?

  • 쓰레드는 가볍다. context switching이 빠르다.
  • 쓰레드끼리 메모리 공유도 가능하므로 별도의 IPC 기법도 불필요하다.


위 그림에서 보이듯 프로세스끼리는 완전 독립적이다.


반면 쓰레드는 하나의 프로세스 안에 위치하며, 공유하는 데이터 영역이 있기 때문에 쓰레드 간 데이터 공유가 매우 용이하다.

쓰레드 함수

쓰레드 생성

#include <pthread.h>

int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, 
					void *(start_routine)(void*), void *restrict arg);

파라미터 전달 정보는 다음과 같다.

  • thread : 쓰레드 ID 저장을 위한 변수의 주소 값
  • attr : 쓰레드에 부여할 특성, NULL 전달 시 기본 쓰레드가 생성
  • start_routine : 쓰레드의 main 함수 역할을 하는, 별도의 실행 흐름의 시작이 되는 함수 포인터 값
  • arg : start_routine 함수가 실행될 때 전달해 주는 인자

예시

int main()
{
  pthread_t t_id;
  int thread_param = 5;

  pthread_create(&t_id, NULL, thread_main, (void*)&thread_param);
  sleep(10);
  puts("end of main");
}
void * thread_main(void *arg)
{
	int i;
    int cnt = *(int*)arg;
    for(int i=0; i<cnt; i++)
    {
    	sleep(1);
        puts("running thread");
    }
}

프로세스의 종료와 쓰레드

위 코드처럼 sleep 함수를 이용해 흐름을 제어하는 것은 한계가 있다.

main 함수에서 프로세스가 종료되면 그 안에서 생성된 쓰레드도 함께 종료된다.

쓰레드는 열심히 돌아가고 있는데 종료돼버리면 안 되지 않는가? 그래서 쓰레드의 종료를 기다리는 pthread_join 함수가 있다.

쓰레드 종료 대기

#include <pthread_h>

int pthread_join(pthread_t thread, void **status);

첫 번째 인자로 전달되는 id의 쓰레드가 종료될 때까지 대기 상태에 둔다.
status는 쓰레드의 main 함수가 반환하는 값이 저장된다.

int main()
{
	pthread_t t_id;
    int thread_param = 5;
    void *thr_ret;
    
    pthread_create(&t_id, NULL, thread_main, (void*)&thread_param);
    
    pthread_join(t_id, &thr_ret);
    printf("Thread return message : %s\n", (char*)thr_ret);
    
    free(thr_ret);
    return 0;
)

void *thread_main(void *arg)
{
	int cnt = *((int*)arg);
    char *msg = (char*)malloc(sizeof(char) * 50);
    strcpy(msg, "Hello, I'm Thread");
    
    for(int i=0; i<cnt; i++)
    {
    	sleep(1);
        puts("running thread");
    }
    return (void*)msg;
}

둘 이상의 쓰레드 동시 접근의 문제점

쓰레드의 장점 중 하나가 자원 공유가 용이함 이라고 하였는데, 이것이 마냥 장점은 아닐 수 있다.

예를 들어 두 쓰레드가 cnt = 100 이라는 변수에 동시에 접근해서, 1씩 증가시킨다고 해보자.

우리가 원하는 결과는, 한 쓰레드가 cnt를 101로 만들고 또 다른 쓰레드가 102로 만드는 것이겠지만, 그렇지 않을 수도 있다. 두 쓰레드 모두 cnt가 100일 때 접근했다면, 그 상태로 1씩 증가시켜도 결과적으로 cnt에는 101이 들어가 있게 된다.

이렇게 자원 접근 조건을 컨트롤 해 주지 않으면, 공유 자원을 사용한다는 것이 문제점으로 작용할 수도 있다. 그리고 이런 공유 자원에 접근하는 부분을 임계 영역(Critical Section) 이라고 한다.

따라서 쓰레드 동기화가 필요한데 이것에 대해서는 다음 챕터에서 다루도록 하겠다.

쓰레드의 소멸

쓰레드가 종료(리턴)해도 자동 소멸되지 않는다. pthread_join이나 pthread_detach 함수를 호출하여 쓰레드를 소멸시켜야 한다.

쓰레드 동기화

동기화가 필요한 상황은 대표적으로 두 가지이다.

  1. 동일한 메모리 영역으로의 동시접근이 발생하는 상황
  2. 동일한 메모리 영역에 접근하는 쓰레드의 실행 순서를 지정해야 하는 상황

즉, 동기화를 통해 동시 접근을 막을 뿐만 아니라 접근의 순서를 지정해 줄 수도 있다.

뮤텍스(Mutex) 기반 동기화

뮤텍스란?

뮤텍스(Mutex)는 상호 배제 (MUTual EXclusion)를 뜻하는 말로 critical section을 가지는 쓰레드들 사이에서 서로 겹쳐 접근하는 것을 막아주는 기법이다.

한 번에 하나의 쓰레드만이 공유 자원에 접근하도록 하며, 구현을 위해 lockunlock 개념이 존재한다.

마치 공공 화장실을 쓰듯이, 한 명이 들어가서 문을 잠그고 화장실을 사용하는 것처럼 임계 영역에 접근한 쓰레드가 잠금과 잠금 해제의 권한을 가진다.

잠금 상태일 때는 다른 쓰레드는 해당 공유 자원에 접근할 수 없으며 기다려야 한다. 잠금이 해제 되면 다른 쓰레드가 접근할 수 있다.

뮤텍스 함수

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

init 함수에서 attr 인자는 뮤텍스의 특성 정보를 담고 있다. 별도의 특성을 지정하지 않으면 NULL을 전달한다.

기본적인 구성은 다음과 같다.

pthread_mutex_lock(&mutex);
// 임계 영역 시작
...
// 임계 영역 끝
pthread_mutex_unlock(&mutex);

세마포어(Semaphore) 기반 동기화

세마포어란?

세마포어(Semaphore)는 공유 자원에 여러 프로세스가 접근하는 것을 막는 것을 말한다.

이를 위해서 현재 공유 자원의 상태를 나타내는 카운터 변수를 사용하게 된다.
각각의 프로세스들은 카운터 변수를 확인하여 자원을 사용할 수 있는 상태라면 사용하고, 만약 누군가가 자원을 사용중이라는 것을 인지하게 되면, 반드시 기다렸다가 사용하게 된다. 이런 방식을 통해 여러 프로세스가 공유 자원에 동시에 접근하는 것을 막을 수 있다.

뮤텍스와 다른 점은, 뮤텍스는 잠금/해제의 개념으로 최대 1개의 쓰레드만이 접근 가능한 반면 세마포어는 카운터 변수 값을 사용자가 지정하기에 따라 동시에 접근할 수 있는 쓰레드의 수를 2 이상으로도 설정할 수 있다는 점이다. 0과 1만 사용하는, 즉 최대 1개의 쓰레드만 접근 가능한 세마포어를 이진 세마포어 (Binary Semephore)라고 한다.

세마포어 함수

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);

int sem_post(sem_t *sem);
int sem_wait(sem_t *sem);

init 함수의 인자는 다음과 같다.

  • sem : 세마포어 참조 값 저장을 위한 변수의 주소
  • pshared : 0 이외의 값 전달 시 둘 이상의 프로세스에 의해 접근 가능한 세마포어 생성, 0 전달 시 하나의 프로세스만 접근 가능한 세마포어 생성
  • value : 세마포어의 초기값

기본적인 구성은 다음과 같다.

sem_wait(&sem); // 세마포어 값을 감소 (0이면 다른 프로세스가 접근 불가)
// 임계 영역의 시작
...
// 임계 영역의 긑
sem_post(&sem); // 세마포어 값을 증가 (1 이상이면 다른 프로세스가 접근 가능)

멀티쓰레드와 뮤텍스를 이용한 채팅 프로그램

서버

int clnt_cnt = 0;
int clnt_socks[50];
pthread_mutex_t mutex;

int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
    struct sockaddr_in serv_addr, clnt_addr;
    int clnt_adr_sz;
    pthread_t t_id;
    
    pthread_mutex_init(&mutex, NULL);
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);
    
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(atoi(argv[1]));
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    
    bind(serv_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    listen(serv_sock, 5);
    
    while(1)
    {
    	clnt_adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_addr, &clnt_adr_sz);
        
        // 전체 client 개수를 늘려주고 clnt_sock 정보를 기록한다.
        // 이 때 clnt_socks나 clnt_cnt에 접근하면 누락되는 소켓이 생기므로 lock
        pthread_mutex_lock(&mutex);
        clnt_socks[clnt_cnt++] = clnt_sock;
        pthread_mutex_unlock(&mutex);
        
        pthread_create(&t_id, NULL, handle_cnt, (void*)&clnt_sock);
        pthread_detach(t_id);
        printf("Connected client IP : %s\n, inet_ntoa(clnt_adr.sin_addr));
    }
    close(serv_sock);
    return 0;
}

void * handle_cnt(void *arg)
{
	int clnt_sock = *((int*)arg);
    int str_len = 0, i;
    char msg[BUFSIZE];
    
    while((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
    	send_msg(msg, str_len);
    
    // 채팅이 끝나면 해당 소켓을 제거한다. 이 때도 critical section에 접근하므로 lock이 필요하다.
    pthread_mutex_lock(&mutex);
    for(i=0; i<clnt_cnt; i++)
    {
    	if(clnt_sock = clnt_socks[i])
        {
        	// 한 칸씩 땡겨준다
        	while(i++ < clnt_cnt-1)
            	clnt_socks[i] = clnt_socks[i+1];
            break;
        }
    }
    clnt_cnt--;
    pthread_mutex_unlock(&mutex);
    close(clnt_sock;
    return NULL;
}

// 연결된 모든 소켓에 메시지를 쏜다. 마찬가지로, critical section에 접근하므로 lock/unlock 과정을 거친다.
void send_msg(char *msg, int len)
{
	pthread_mutex_lock(&mutex);
    for(int i=0; i<clnt_cnt; i++)
    	write(clnt_socks[i], msg, len);
    pthread_mutex_unlock(&mutex);
}

클라이언트

int main(int argc, char *argv[])
{
	int sock;
    struct sockaddr_in serv_addr;
    pthread_t snd_thread, rcv_thread;
    void *thread_return;
    
    sock = socket(PF_INET, SOCK_STREAM, 0);
    
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
    serv_addr.sin_port = htons(atoi(argv[2]));
    
    connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    
    pthread_create(&snd_thread, NULL, send_msg, (void*)sock);
    pthread_create(&rcv_thread, NULL, recv_msg, (void*)sock);
    pthread_join(snd_thread, &thread_return);
    pthread_join(rcv_thread, &thread_return);
    
    close(sock);
    return 0;
}

void * send_msg(void *arg)
{
	int sock = *((int*)arg);
    while(1)
    {
    	fgets(msg, BUFSIZE, stdin);
        if(!strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))
        {
        	close(sock);
            exit(0);
        }
        write(sock, msg, strlen(name_msg));
    }
    return NULL;
}

void * recv_msg(void *arg)
{
    int sock = *((int*)arg);
    int str_len;

    while(1)
    {
        str_len = read(sock, msg, sizeof(msg));
        if(str_len == -1)
            return (void*)-1;
        msg[str_len] = 0;
        fputs(name_msg, stdout);
    }
    return NULL;
}

클라이언트는 send 작업과 recv 작업이 두 개의 쓰레드로 분리되어 실행되는 것 말고는 특별한 것이 없다.

profile
울면안돼 쫄면안돼 냉면됩니다

0개의 댓글