Part2_멀티쓰레드 기반의 서버 구현

·2023년 12월 3일
0

쓰레드는 리눅스보다는 윈도우즈에서 보다 친숙한 개념임
그러나 웹이 발전하면서 유닉스 계열의 환경에서도 쓰레드의 중요성이 부각됨

[프로세스와 쓰레드]

이전에 프로세스를 생성하면 프로세스마다 완전히 독룁된 메모리 공간을 유지하기 때문에 프로세스간 통신을 위해 select 함수를 기반으로 하는 멀티플렉싱 서버의 구현 예제를 공부했었음
그러나 멀티 프로세스 서버를 완전히 대체하지는 못함
→ 그렇다면 멀티 쓰레드 기반으로 동작하는 다중 접속 서버는 어떨까?

쓰레드는 프로세스의 장점을 지니면서도 어느 정도 단점을 극복한 '경량화된 프로세스'임
쓰레드를 가지고도 프로세스가 하는 일을 해낼 수만 있다면 좋은 서버 구현 방법이 될 것임

그러나 쓰레드는 프로세스와 달리 공유되는 메모리 공간을 가지고 있음
→ 장점인 동시에 단점이 될 수도 있음
쓰레드간의 통신을 하기에는 쉬우나 쓰레드들이 동시에 메모리에 접근했을 때는 문제를 일으킬 수 있음

[쓰레드 생성하기]

프로세스는 운영체제가 바라봤을 때의 일의 단위임
쓰레드는 운영체제의 관점에서 구분하는 일의 단위가 아니라 프로세스 내에서 다시 나누어지는 일의 단위가 됨
→ 쓰레드는 그 기반을 프로세스에 두고 있음
즉, 프로세스가 존재하지 않는 상태에서의 쓰레드 생성은 불가능함

#include <pthread.h>

int pthread_create(pthread_t* thread, pthread_attr_t* attr, void* (*start_routine)(void*), void* arg);

// 성공 시 0, 실패 시 이외의 값 리턴
// thread : 생성된 쓰레드의 ID를 저장할 변수의 포인터를 인자로 전달
// 함수가 호출되고 나면 쓰레드가 생성되는데, 생성되는 모든 쓰레드는 프로세스처럼 ID를 할당받게 됨
// attr : 생성하고자 하는 쓰레드의 특성을 설정할 때 사용, 일반적으로 NULL
// start_routine : 쓰레드가 생성되고 나서 실행해야 하는 함수 루틴을 설정
// 즉, 쓰레드가 생성되자마자 여기서 인자로 전달된 함수를 호출하게 될거임
// 이 함수의 종료와 동시에 쓰레드는 소멸됨
// arg : 쓰레드에 의해 호출되는 함수(start_routine 포인터가 가리키는 함수)에 전달하고자 하는 인자 값을 넘겨줌
// thread1.c

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

void *thread_function(void* arg);

int main(int argc, char** argv)
{
	int state;
	pthread_t t_id;
	void* t_return;

	// 쓰레드 생성
	state = pthread_create(&t_id, NULL, thread_function, NULL);
	if(state != 0)
	{
		puts("쓰레드 생성 오류");
		exit(1);
	}
	printf("생성된 쓰레드의 ID : %d \n", t_id);
	sleep(3);
	puts("main함수 종료");

	return 0;
}

// 쓰레드가 호출할 함수
// 2초를 주기로 "쓰레드 실행 중"이라는 메시지를 총 세 번 출력하게끔 되어있음
void *thread_function(void* arg)
{
	int i;
	for(i = 0; i < 3; i++)
	{
		sleep(2);
		puts("쓰레드 실행 중");
	}
}

[실행 결과]

쓰레드 실행 중이라는 메시지가 세 번 출력 되어야 함에도 불구하고 한 번 밖에 출력되지 않음
쓰레드가 생성되지 않은 것일까? → 생성됨. 다만 너무 빨리 소멸되어버림

thread_function 함수 내에서 메시지를 출력할 때마다 2초간의 대기 상태를 지니게 되므로 완전히 실해오디기기 위해서는 최소한 6초 이상의 시간이 필요함
그러나 main 함수의 흐름을 보면 25에서 3초간의 지연 시간을 갖는 것이 전부임
생성된 쓰레드에 의해서 실행되는 thread_function 함수 보다 main 함수가 더 일찍 끝남
→ main 함수의 종료와 더불어 프로세스가 종료되고, 프로세스가 종료되면서 그 안에서 생성된 모든 쓰레드들도 함께 종료됨

그렇다면 어떻게 해야할까?
sleep()를 여유있게 10초 정도로 잡으면 되겠지만 이런 방법은 프로그램상에서 쓰레드의 실행 시간을 예측해야 하기 때문에 그리 좋은 방법은 아님

#include <pthread.h>

int pthread_join(pthread_t th, void** thread_return);

// 성공 시 0을, 실패 시 그 이외의 값을 리턴
// th : th에 인자로 들어오는 ID의 쓰레드가 종료할 때까지 실행을 지연시킴
// thread_return : 쓰레드가 종료 시 반환되는 값에 접근할 수 있는 2차원 포인터

인자로 전달되는 ID에 해당하는 쓰레드가 종료될 때까지 대기기 상태에 들어가기 위해서 호출하는 함수임

// thread2.c

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

void *thread_function(void *arg);

int main(int argc, char** argv)
{
	int state;
	pthread_t t_id;
	void* t_return;

	// 쓰레드 생성 t_id 변수에 생성된 쓰레드의 ID 저장
	state = pthread_create(&t_id, NULL, thread_function, NULL);
	if(state != 0)
	{
		puts("쓰레드 생성 오류");
		exit(1);
	}
	printf("생성된 쓰레드의 ID : %d \n", t_id);

	// main 함수에서 pthread_join를 호출하면서 t_id를 
	state = pthread_join(t_id, &t_return);
	if(state != 0)
	{
		puts("쓰레드 Join 오류");
		exit(1);
	}
	printf("main함수 종료, 쓰레드 리턴 %s", (char*)t_return);
	free(t_return);
	return 0;
}

void *thread_function(void* arg)
{
	int i;
	char* p = (char*)malloc(20 * sizeof(char));
	strcpy(p, "쓰레드 종료됨!\n");
	
	for(i = 0; i < 3; i++)
	{
		sleep(2);
		puts("쓰레드 실행 중");
	}

	return p;
}

[실행 결과]

thread_function 함수가 “쓰레드 실행 중”이라는 메시지를 세 번 출력됨
프로세스가 thread에게 JOIN 메시지를 전달하면서 일단 대기 상태에 들어가고 있음
JOIN 메시지를 받은 쓰레드가 종료하게 되면 프로세스는 다시 이어서 실행을 하게 됨

[임계 영역과 쓰레드에 안전한 함수의 호출]

동시에 여러 개의 쓰레드를 생성해서 실행시키는 멀티쓰레드 예제를 만들어보려고 함
두 개 이상의 쓰레드에 의해서 동시에 실행되면 안 되는 영역이 존재함
→ 임계 영역

쓰레드 관점에서 볼 때 함수는 크게게 두 가지 종류로 나눌 수 있음

  • 쓰레드 안전한 함수 : 두 개 이상의 쓰레드에 의해서 사용되어도 문제가 전혀 없는 함수
  • 쓰레드 불안전한 함수 : 두 개 이상의 쓰레드에 의해서 사용될 경우 문제가 되는 함수들

일반적으로 쓰레드에 안전한 형태로 재구현된 함수의 이름에는 _r이 붙게 됨

헤더파일을 포함하기 전에 _REENTRANT 매크로가 정의되어 있는 경우 _r 함수의 호출로 변경됨

[동시에 실행하는 멀티쓰레드 예제]

두 개의 쓰레드를 만들어서 하나는 1 ~ 5까지 더하게 하고, 또 하나는 6부터 10까지 더하게 해서 그 결과를 main 영역에서 참조하게 하자
→ 보스 워커 쓰레드 모델

// thread3.c

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

#define NUMBER 10000

void* thread_increment(void* arg);
int num = 0;

int main(int argc, char** argv)
{
    int i;
	pthread_t thread_id[10];
	void* t_return;

    for(i = 0; i < 10; i++)
    {
        pthread_create(&thread_id[i], NULL, thread_increment, NULL);
    }

    for(i = 0; i < 10; i++)
        pthread_join(thread_id[i], &t_return);
    printf("main함수 종료, num = %d \n", num);

	return 0;
}

void* thread_increment(void* arg)
{
	int i;
    for(i = 0; i < NUMBER; i++)
        num++;
}

[실행 결과]

실행할 때 마다 다른 값이 나옴
프로그램을 만 번 실행시켰을 때 단 한 번이라도 잘못 실행된다면, 그 프로그램은 디버깅을 해야하는 프로그램임
→ 이게 임계 영역의 문제점에 해당

[동기화]

  • 공유된 메모리에 둘 이상의 쓰레드가 동시에 접근할 때 생기는 문제점을 해결하는 것
    • 문제 해결을 위해서는 하나의 쓰레드가 공유되는 메모리에 접근하는 동안에는 다른 쓰레드들이 접근하지 못하도록 막아주면 됨
  • 쓰레드의 실행 순서를 컨트롤 하는 것
    • A 쓰레드가 데이터를 가져 다가 놓는 쓰레드고, B 쓰레드가 데이터를 가져가는 쓰레드라면 A 쓰레드가 실행되고 나서 B 쓰레드가 실행되도록 순서를 조절함

리눅스 기반에서 쓰레드를 동기화하는데 있어서 일반적으로 두 가지 방법을 사용함(세마포어, 뮤텍스)

[뮤텍스]

Mutual Exclusion의 줄임말로 쓰레드 서로간에 동시 접근을 허용하지 않겠다는 의미
쓰레드에 의해 공유되는 메모리를 화장실 변기라고 생각하면 되고, 임계 영역을 화장실이라고 생각하면 됨
화장실에 들어가야 변기를 사용할 수 있고, 화장실 문이 닫혀있으면(누군가 쓰고 있으면) 쓸 수 없음
따라서 임계 영역에 들어가는 쓰레드는 일을 하기 전에 일단 문고리부터 걸어 잠그고, 일을 다 보고 난 후에는 문고리를 풀어줘서 다른 쓰레드가 진입할 수 있도록 해 줘야 함
여기서 언급한 문고리가 pthread_mutex_t 타입의 데이터 변수임

#include <pthread.h>

// 뮤텍스를 사용하기 전에 초기화 과정을 거쳐야 함
// 초기화하고자 하는 뮤텍스의 포인터를 첫 번째 인자로, 뮤텍스의 특성을 설정할 때 두 번째 인자를 설정함
int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* mutexattr);

// 뮤텍스를 걸어 잠글 때 사용
// 이미 뮤텍스를 걸어 잠근 쓰레드가 존재하는 상태에서 이 함수를 호출하게 되면
// 이전에 진입한 쓰레드가 빠져 나올 때까지 대기 상태에 있게 됨
int pthread_mutex_lock(pthread_mutex_t* mutex);

// 뮤텍스를 풀어줄 때 사용
// 뮤텍스를 걸어 잠근 쓰레드는 뮤텍스를 반드시 풀어줘야 함
// 그렇지 않으면 다른 쓰레드는 진입할 수 없음 -> 이런 상황을 데드락이라고 함
int pthread_mutex_unlock(pthread_mutex_t* mutex);

// 사용하고 난 뒤 뮤텍스와 이에 관련된 리소스 해제 
int pthread_mutex_destroy(pthread_mutex_t* mutex);

// 함수 성공 시 0을 리턴함

현재 쓰레드 A가 임계 영역에 진입해서 실행 중인 동안에 쓰레드 B가 임계 영역으로의 접근을 시도하고 있는 상황을 보여줌
물론 현재 임계 영역은 락이 걸려있는 상태이므로 쓰레드 B는 대기 상태에 들어감
실행 중이던 쓰레드 A가 pthread_mutex_unlock 함수를 호출하며 임계 영역을 빠져 나오고, 뒤를 이어 쓰레드 B가 대기 상태에서 빠져 나와 임계 영역으로 진입하게 됨

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

void* thread_increment(void* arg);
char thread1[] = "A Thread";
char thread2[] = "B Thread";

pthread_mutex_t mutx;
int number = 0;

int main(int argc, char** argv)
{
	pthread_t t1, t2;
	void* thread_result;
	int state;

	state = pthread_mutex_init(&mutx, NULL);
	if(state)
	{
		puts("뮤텍스 초기화 실패");
		exit(1);
	}

	pthread_create(&t1, NULL, thread_increment, &thread1);
	pthread_create(&t2, NULL, thread_increment, &thread2);

	pthread_join(t1, &thread_result);
	pthread_join(t2, &thread_result);

	printf("최종 number : %d \n", number);
	pthread_mutex_destroy(&mutx);
	return 0;
}

// 쓰레드에 의해 실행되는 함수
// 임계 영역
void* thread_increment(void* arg)
{
	int i;
	for(i = 0; i < 5; i++)
	{
		pthread_mutex_lock(&mutx);
		sleep(1); // 두 개의 쓰레드가 교대로 실행되게끔 넣어줌
		number++;
		printf("실행 : %s, number : %d \n", (char*)arg, number);
		pthread_mutex_unlock(&mutx);
	}
}

[실행 결과]

[세마포어]

세마포어에 의한 동기화에서는 sem_t 데이터 타입의 변수를 통해서 접근을 제어하는데
이 변수 자체를 세마포어라함

#include <semaphore.h>

// 세마포어를 초기화하는 함수
// 함수의 첫 번째 인자로 초기화하고자 하는 세마포어 변수의 포인터를 넘김
// 초기화되는 값은 세 번째 인자인 value에 의해 결정되며,
// 두 번째 인자는 세마포어의 타입을 설정하는 인자 값인데 여기서는 그냥 0을 전달하면 됨
// 두 번째 인자는 세마포어를 공유하는 범위를 지정하는데 사용됨
// 0은 전달했다는 의미는 하나의 프로세스 내에서만 사용하겠다는 의미임
// 0 이외의 값을 전달하게 되면 여러 개의 프로세스가 공유할 수 있는 세마포어가 생성됨
int sem_init(sem_t* sem, int pshared, unsigned int value);

// 세마포어의 값을 하나 감소함
// 단 값이 0인 경우는 값이 1이상이 될 때까지 대기 상태에 있다가, 1 이상이 되면 값을 감소시키며
// 대기상태에 빠져 나오게 됨
// 즉, 세마포어는 0보다 작은 값이 될 수 없음
int sem_wait(sem_t* sem);

// 세마포어의 값을 하나 증가시킴
int sem_post(sem_t* sem);

// 세마포어와 이에 관련된 리소스 소멸
int sem_destroy(sem_t* sem);

// 모든 함수 성공 시 0을 리턴함

세마포어 기반의 동기화에서는 세마포어가 0보다 작은 값이 될 수 없다는 특징을 사용해서 동기화를 하게 됨
sem_wait 함수를 호출 했을 때 세마포어의 현재 값이 0이라면 -1이 되어야 하지만 -1이 되지 않고, 다른 쓰레드에 의해서 세마포어의 값이 증가될 때까지 대기 상태에 있게 됨

예를 들어 변수 하나가 존재하는데 접근하려는 쓰레드는 A, B 두 개가 있음
A는 데이터를 채우는 작업을 하고, B는 A에 의해서 채워진 데이터를 가져가는 작업을 해야 함
따라서 실행 순서는 반드시 A와 B가 한 번씩 교대로 실행을 해서 B는 늘 새로운 데이터를 가져가야만 함

세마포어의 초기 값 0을 시작으로, A 쓰레드가 먼저 새로운 데이터를 저장하면서 세마포어 값을 하나 증가시키면(세마포어는 1), B 쓰레드가 세마포어 값을 하나 감소시키며(세마포어는 0) 데이터를 가져감
따라서 실행 순서에 있어서 동기화가 이루어짐

만약 순서가 바뀌어서 B 쓰레드가 먼저 접근한다면?
B 쓰레드의 경우 데이터를 가져가기 전에 일단 세마포어를 하나 감소시키기 위해서 sem_wait 함수를 호출하게 됨
그러나 현재 세마포어는 0임
따라서 sem_wait 함수 호출과 동시에 B 쓰레드는 대기 상태로 들어가게 됨
A 쓰레드에 의해서 sem_post 함수가 호출되어 세마포어가 1이 되면, 잠시 후 B 쓰레드는 세마포어를 하나 감소시킨 후 대기 상태에서 빠져 나오게 됨
즉 B 쓰레드가 먼저 데이터 영역에 접근하는 일은 발생하지 않음

[쓰레드 기반 서버 구현하기]

// chat_server.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>

#define BUFSIZE 100

void* clnt_connection(void* arg);
void send_message(char* message, int len);
void error_handling(char* message);

int clnt_number = 0;
int clnt_socks[10];
pthread_mutex_t mutx;

int main(int argc, char** argv)
{
    int serv_sock;
    int clnt_sock;
    struct sockaddr_in serv_addr;
    struct sockaddr_in clnt_addr;
    int clnt_addr_size;
    pthread_t thread;

    if(argc != 2)
    {
        printf("Usage : %s <port>\n", argv[0]);
        exit(1);
    } 

    if(pthread_mutex_init(&mutx, NULL))
        error_handling("mutex init error");
    
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(atoi(argv[1]));

    if(bind(serv_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr))== -1)
        error_handling("bind() error");
    
    if(listen(serv_sock, 5) == -1)
        error_handling("listen() error");
    
    while(1)
    {
        clnt_addr_size = sizeof(clnt_addr);
        clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_addr, &clnt_addr_size);
        pthread_mutex_lock(&mutx);
        clnt_socks[clnt_number++] = clnt_sock;
        pthread_mutex_unlock(&mutx);

        pthread_create(&thread, NULL, clnt_connection, (void*)clnt_sock);
        printf("새로운 연결, 클라이언트 IP : %s \n", inet_ntoa(clnt_addr.sin_addr));
    }
    return 0;
}

void* clnt_connection(void* arg)
{
    int clnt_sock = (int)arg;
    int str_len = 0;
    char message[BUFSIZE];
    int i;

    while((str_len = read(clnt_sock, message, sizeof(message))) != 0)
        send_message(message, str_len);
    
    pthread_mutex_lock(&mutx);
    for(i = 0; i < clnt_number; i++)
    {
        if(clnt_sock == clnt_socks[i])
        {
            for(; i < clnt_number - 1; i++)
                clnt_socks[i] = clnt_socks[i + 1];
            break;
        }
    }
    clnt_number--;
    pthread_mutex_unlock(&mutx);

    close(clnt_sock);
    return 0;
}

void send_message(char* message, int len)
{
    int i;
    pthread_mutex_lock(&mutx);
    for(i = 0; i < clnt_number; i++)
        write(clnt_socks[i], message, len);
    pthread_mutex_unlock(&mutx);
}

void error_handling(char* message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}
// chat_client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUFSIZE 100
#define NAMESIZE 20

void* send_message(void* arg);
void* recv_message(void* arg);
void error_handling(char* message);

char name[NAMESIZE] = "[Default]";
char message[BUFSIZE];

int main(int argc, char** argv)
{
    int sock;
    struct sockaddr_in serv_addr;
    pthread_t snd_thread, rcv_thread;
    void* thread_result;

    if(argc != 4)
    {
        printf("Usage : %s <IP> <port> <name>\n", argv[0]);
        exit(1);
    }

    sprintf(name, "[%s]", argv[3]);
    sock = socket(PF_INET, SOCK_STREAM, 0);
    if(sock == -1)
        error_handling("socket() error");

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
    serv_addr.sin_port = htons(atoi(argv[2]));

    if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1)
        error_handling("connect() error");
    
    pthread_create(&snd_thread, NULL, send_message, (void*)sock);
    pthread_create(&rcv_thread, NULL, recv_message, (void*)sock);

    pthread_join(snd_thread, &thread_result);
    pthread_join(rcv_thread, &thread_result);

    close(sock);
    return 0;
}

void* send_message(void* arg)
{
    int sock = (int)arg;
    char name_message[NAMESIZE + BUFSIZE];
    while(1)
    {
        fgets(message, BUFSIZE, stdin);
        if(!strcmp(message, "q\n"))
        {
            close(sock);
            exit(0);
        }
        sprintf(name_message, "%s %s", name, message);
        write(sock, name_message, strlen(name_message));
    }
}

void* recv_message(void* arg)
{
    int sock = (int)arg;
    char name_message[NAMESIZE + BUFSIZE];
    int str_len;
    while(1)
    {
        str_len = read(sock, name_message, NAMESIZE+BUFSIZE - 1);
        if(str_len == -1) return 1;
        name_message[str_len] = 0;
        fputs(name_message, stdout);
    }
}

void error_handling(char* message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

[실행 결과]

client_Cho

client_Thomas

server

0개의 댓글