이전에 멀티 프로세스 환경에 대해서 다룰 때 단점들에 대해서 언급한 적이 있다.
멀티 프로세스는 크게 두 가지 문제가 있다.
그렇다면 쓰레드는?
위 그림에서 보이듯 프로세스끼리는 완전 독립적이다.
반면 쓰레드는 하나의 프로세스 안에 위치하며, 공유하는 데이터 영역이 있기 때문에 쓰레드 간 데이터 공유가 매우 용이하다.
#include <pthread.h>
int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr,
void *(start_routine)(void*), void *restrict arg);
파라미터 전달 정보는 다음과 같다.
thread
: 쓰레드 ID 저장을 위한 변수의 주소 값attr
: 쓰레드에 부여할 특성, NULL
전달 시 기본 쓰레드가 생성start_routine
: 쓰레드의 main 함수 역할을 하는, 별도의 실행 흐름의 시작이 되는 함수 포인터 값arg
: start_routine
함수가 실행될 때 전달해 주는 인자int main()
{
pthread_t t_id;
int thread_param = 5;
pthread_create(&t_id, NULL, thread_main, (void*)&thread_param);
sleep(10);
puts("end of main");
}
void * thread_main(void *arg)
{
int i;
int cnt = *(int*)arg;
for(int i=0; i<cnt; i++)
{
sleep(1);
puts("running thread");
}
}
위 코드처럼 sleep
함수를 이용해 흐름을 제어하는 것은 한계가 있다.
main 함수에서 프로세스가 종료되면 그 안에서 생성된 쓰레드도 함께 종료된다.
쓰레드는 열심히 돌아가고 있는데 종료돼버리면 안 되지 않는가? 그래서 쓰레드의 종료를 기다리는 pthread_join
함수가 있다.
#include <pthread_h>
int pthread_join(pthread_t thread, void **status);
첫 번째 인자로 전달되는 id의 쓰레드가 종료될 때까지 대기 상태에 둔다.
status는 쓰레드의 main 함수가 반환하는 값이 저장된다.
int main()
{
pthread_t t_id;
int thread_param = 5;
void *thr_ret;
pthread_create(&t_id, NULL, thread_main, (void*)&thread_param);
pthread_join(t_id, &thr_ret);
printf("Thread return message : %s\n", (char*)thr_ret);
free(thr_ret);
return 0;
)
void *thread_main(void *arg)
{
int cnt = *((int*)arg);
char *msg = (char*)malloc(sizeof(char) * 50);
strcpy(msg, "Hello, I'm Thread");
for(int i=0; i<cnt; i++)
{
sleep(1);
puts("running thread");
}
return (void*)msg;
}
쓰레드의 장점 중 하나가 자원 공유가 용이함
이라고 하였는데, 이것이 마냥 장점은 아닐 수 있다.
예를 들어 두 쓰레드가 cnt = 100
이라는 변수에 동시에 접근해서, 1씩 증가시킨다고 해보자.
우리가 원하는 결과는, 한 쓰레드가 cnt
를 101로 만들고 또 다른 쓰레드가 102로 만드는 것이겠지만, 그렇지 않을 수도 있다. 두 쓰레드 모두 cnt
가 100일 때 접근했다면, 그 상태로 1씩 증가시켜도 결과적으로 cnt
에는 101이 들어가 있게 된다.
이렇게 자원 접근 조건을 컨트롤 해 주지 않으면, 공유 자원을 사용한다는 것이 문제점으로 작용할 수도 있다. 그리고 이런 공유 자원에 접근하는 부분을 임계 영역(Critical Section) 이라고 한다.
따라서 쓰레드 동기화가 필요한데 이것에 대해서는 다음 챕터에서 다루도록 하겠다.
쓰레드가 종료(리턴)해도 자동 소멸되지 않는다. pthread_join
이나 pthread_detach
함수를 호출하여 쓰레드를 소멸시켜야 한다.
동기화가 필요한 상황은 대표적으로 두 가지이다.
즉, 동기화를 통해 동시 접근을 막을 뿐만 아니라 접근의 순서를 지정해 줄 수도 있다.
뮤텍스(Mutex)는 상호 배제 (MUTual EXclusion)를 뜻하는 말로 critical section을 가지는 쓰레드들 사이에서 서로 겹쳐 접근하는 것을 막아주는 기법이다.
한 번에 하나의 쓰레드만이 공유 자원에 접근하도록 하며, 구현을 위해 lock
과 unlock
개념이 존재한다.
마치 공공 화장실을 쓰듯이, 한 명이 들어가서 문을 잠그고 화장실을 사용하는 것처럼 임계 영역에 접근한 쓰레드가 잠금과 잠금 해제의 권한을 가진다.
잠금 상태일 때는 다른 쓰레드는 해당 공유 자원에 접근할 수 없으며 기다려야 한다. 잠금이 해제 되면 다른 쓰레드가 접근할 수 있다.
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
init
함수에서 attr
인자는 뮤텍스의 특성 정보를 담고 있다. 별도의 특성을 지정하지 않으면 NULL
을 전달한다.
기본적인 구성은 다음과 같다.
pthread_mutex_lock(&mutex);
// 임계 영역 시작
...
// 임계 영역 끝
pthread_mutex_unlock(&mutex);
세마포어(Semaphore)는 공유 자원에 여러 프로세스가 접근하는 것을 막는 것을 말한다.
이를 위해서 현재 공유 자원의 상태를 나타내는 카운터 변수를 사용하게 된다.
각각의 프로세스들은 카운터 변수를 확인하여 자원을 사용할 수 있는 상태라면 사용하고, 만약 누군가가 자원을 사용중이라는 것을 인지하게 되면, 반드시 기다렸다가 사용하게 된다. 이런 방식을 통해 여러 프로세스가 공유 자원에 동시에 접근하는 것을 막을 수 있다.
뮤텍스와 다른 점은, 뮤텍스는 잠금/해제의 개념으로 최대 1개의 쓰레드만이 접근 가능한 반면 세마포어는 카운터 변수 값을 사용자가 지정하기에 따라 동시에 접근할 수 있는 쓰레드의 수를 2 이상으로도 설정할 수 있다는 점이다. 0과 1만 사용하는, 즉 최대 1개의 쓰레드만 접근 가능한 세마포어를 이진 세마포어 (Binary Semephore)라고 한다.
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_post(sem_t *sem);
int sem_wait(sem_t *sem);
init
함수의 인자는 다음과 같다.
sem
: 세마포어 참조 값 저장을 위한 변수의 주소pshared
: 0 이외의 값 전달 시 둘 이상의 프로세스에 의해 접근 가능한 세마포어 생성, 0 전달 시 하나의 프로세스만 접근 가능한 세마포어 생성value
: 세마포어의 초기값기본적인 구성은 다음과 같다.
sem_wait(&sem); // 세마포어 값을 감소 (0이면 다른 프로세스가 접근 불가)
// 임계 영역의 시작
...
// 임계 영역의 긑
sem_post(&sem); // 세마포어 값을 증가 (1 이상이면 다른 프로세스가 접근 가능)
int clnt_cnt = 0;
int clnt_socks[50];
pthread_mutex_t mutex;
int main(int argc, char *argv[])
{
int serv_sock, clnt_sock;
struct sockaddr_in serv_addr, clnt_addr;
int clnt_adr_sz;
pthread_t t_id;
pthread_mutex_init(&mutex, NULL);
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(atoi(argv[1]));
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(serv_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
listen(serv_sock, 5);
while(1)
{
clnt_adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_addr, &clnt_adr_sz);
// 전체 client 개수를 늘려주고 clnt_sock 정보를 기록한다.
// 이 때 clnt_socks나 clnt_cnt에 접근하면 누락되는 소켓이 생기므로 lock
pthread_mutex_lock(&mutex);
clnt_socks[clnt_cnt++] = clnt_sock;
pthread_mutex_unlock(&mutex);
pthread_create(&t_id, NULL, handle_cnt, (void*)&clnt_sock);
pthread_detach(t_id);
printf("Connected client IP : %s\n, inet_ntoa(clnt_adr.sin_addr));
}
close(serv_sock);
return 0;
}
void * handle_cnt(void *arg)
{
int clnt_sock = *((int*)arg);
int str_len = 0, i;
char msg[BUFSIZE];
while((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
send_msg(msg, str_len);
// 채팅이 끝나면 해당 소켓을 제거한다. 이 때도 critical section에 접근하므로 lock이 필요하다.
pthread_mutex_lock(&mutex);
for(i=0; i<clnt_cnt; i++)
{
if(clnt_sock = clnt_socks[i])
{
// 한 칸씩 땡겨준다
while(i++ < clnt_cnt-1)
clnt_socks[i] = clnt_socks[i+1];
break;
}
}
clnt_cnt--;
pthread_mutex_unlock(&mutex);
close(clnt_sock;
return NULL;
}
// 연결된 모든 소켓에 메시지를 쏜다. 마찬가지로, critical section에 접근하므로 lock/unlock 과정을 거친다.
void send_msg(char *msg, int len)
{
pthread_mutex_lock(&mutex);
for(int i=0; i<clnt_cnt; i++)
write(clnt_socks[i], msg, len);
pthread_mutex_unlock(&mutex);
}
int main(int argc, char *argv[])
{
int sock;
struct sockaddr_in serv_addr;
pthread_t snd_thread, rcv_thread;
void *thread_return;
sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
pthread_create(&snd_thread, NULL, send_msg, (void*)sock);
pthread_create(&rcv_thread, NULL, recv_msg, (void*)sock);
pthread_join(snd_thread, &thread_return);
pthread_join(rcv_thread, &thread_return);
close(sock);
return 0;
}
void * send_msg(void *arg)
{
int sock = *((int*)arg);
while(1)
{
fgets(msg, BUFSIZE, stdin);
if(!strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))
{
close(sock);
exit(0);
}
write(sock, msg, strlen(name_msg));
}
return NULL;
}
void * recv_msg(void *arg)
{
int sock = *((int*)arg);
int str_len;
while(1)
{
str_len = read(sock, msg, sizeof(msg));
if(str_len == -1)
return (void*)-1;
msg[str_len] = 0;
fputs(name_msg, stdout);
}
return NULL;
}
클라이언트는 send 작업과 recv 작업이 두 개의 쓰레드로 분리되어 실행되는 것 말고는 특별한 것이 없다.