epoll

sesame·2022년 2월 25일
0

교육

목록 보기
38/46

TCP/IP 다중 통신 개념 (fork 이용)

epoll 설명

epoll 프로그래밍 흐름

  • epoll fd 생성 : epoll_create() * 연결 받기 (listen, accept)
  1. listen 용 fd 생성 및 준비 : socket(), bind(), setsockopt()
  2. epoll에 등록 : epoll_ctl(..., EPOLL_CTL_ADD, ... )
  3. listen()
  4. epoll_wait를 통해, fd_listen 에 사건발생(즉, 누군가 접속시도함)을 감지하여 접속받음 : accept()
  5. accept로부터 넘어온 fd에 통신준비 : fcntl()
  6. epoll에 새로운 fd 등록 : epoll_ctl(..., EPOLL_CTL_ADD, ... )
  • 연결 하기 (connect)
  1. 사전준비(상대방 ip주소, port번호등) 그리고 연결 : connect()
  2. 연결성공하면 connect로부터 넘어온 fd에 통신준비 : fcntl()
  3. epoll에 새로운 fd 등록 : epoll_ctl(..., EPOLL_CTL_ADD, ... )
  • 연결 끊기 (connect)
  1. 연결을 끊고 싶거나, 연결이 끊김 사건 발생 또는 에러발생 : read(), epoll_wait() 에서 감지
  2. epoll에서 해당 fd 삭제 : epoll_ctrl(..., EPOLL_CTL_DEL, ...)
  3. 연결 닫음 : close()
  • epoll 종료 : close(), epoll 도 FILE이기 때문에 그냥 닫으면 된다 ;)

  • 읽기 (read)
  1. 읽을 데이타 도착했음을 감지 : epoll_wait()
  2. 데이타 읽기 : read()
  3. 읽는 도중 발생하는 에러처리, 필요시 연결끊기
  • 쓰기 (write)
  1. 보낼 데이터가 발생한 경우 Send Buffer에 일단 저장
  2. 보내도 된다는 신호를 감지 : epoll_wait()
  3. 데이터 전송 : write()
    1) 전송 도중 발생하는 에러처리, 필요시 연결끊기 혹은 WOULDBLOCK 처리
    참고 1, WOULDBLOCK 인 경우는 에러지만 에러처리하지 않고, 단순히 리턴하고 기다리다가, EPOLLOUT 신호가 다시 발생했을 경우에 데이터를 전송하도록 한다.
    참고 2, Send Buffering (]?SendBuffer])

select 기반의 IO 멀티 플렉싱이 느린 이유

  • 소켓 관리하는 fd_set을 직접 관리해야 함

  • select로 fd_set을 전달할 때 원본을 직접 전달하지 않고 복사본을 전달해야 하는 번거로움

  • select가 한번 일어날 때 마다 fd_set을 모두 점검하면서 변화가 일어났는지 체크해야 함

  • select는 운영체제에 의해서 완성되는 기능이 아니고 함수에 의해 완성되기 때문에 번거롭고 성능이 떨어질 수 있음

  • 이 문제를 해결하기 위해선 운영체제에 관찰 대상(소켓)의 정보를 한번만 알려주고 변화가 있을 때 변화가 있는 소켓의 정보들만 받으면 됨

  • 위처럼 운영체제 레벨에서 지원하는 멀티 플렉싱 모델이 epoll(리눅스), IOCP(윈도우)

select를 사용할 상황

  • 서버의 접속자가 많지 않음
  • 다양한 운영체제에서 사용할 수 있어야 함
    -> epoll이나 iocp는 운영체제에 종속적임

epoll 방식의 장점

  • select처럼 모든 소켓에 대해 반복문을 수행하면서 점검할 필요가 없음
    -> 변화가 일어난 소켓의 정보만 전달해서 주기 때문

  • select함수에 대응하는 epoll_wait 시 소켓의 정보를 매번 전달할 필요가 없음

epoll()

typedef union epoll_data
{
        void *ptr;
        int fd;                    // 이벤트가 일어난(or 일어날) 파일 디스크립터
        __unit32_t u32;
        __unit64_t u64;
} epoll_data_t;

struct epoll_event
{
        __unit32_t events;        // 관찰할 이벤트의 종류
        epoll_data_t data;
}

epoll_create: epoll fd 저장소 생성

epoll의 시작을 운영체제에 알려주어 OS가 fd를 관리할 저장소를 만들게 함

#include <sys/epoll.h>

int epoll_create(int size);

return: 성공 시 epoll 파일 디스크립터, 실패 시 -1


epoll_ctl: 저장소에 파일 디스크립터를 등록, 삭제

#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);

return: 성공 시 0, 실패 시 -1

epfd: epoll_create로 생성한 epoll file descriptor
op: 관찰 대상의 추가, 삭제, 변경 여부

// epoll.h
#define EPOLL_CTL_ADD 1        /* Add a file decriptor to the interface.  */
#define EPOLL_CTL_DEL 2        /* Remove a file decriptor from the interface.  */
#define EPOLL_CTL_MOD 3        /* Change file decriptor epoll_event structure.  */

fd : 등록할 file descriptor
event : 관찰 대상의 이벤트 유형

  • EPOLLIN : 수신할 데이터가 존재하는 이벤트
  • EPOLLOUT : 즉시 데이터를 전송할 수 있도록 출력 버퍼가 비워진 이벤트
  • EPOLLPRI : OOB(Out-Of-Band) 데이터가 수신된 이벤트
  • EPOLLRDHUP : 연결이 종료된 이벤트 (half-close 포함)
  • EPOLLET : Edge-trigger 방식으로 이벤트 감지. 이 경우 | 연산자를 이용해 이벤트 종류도 함께 명시
  • EPOLLONESHOT : 최초의 이벤트만 감지하고 이후에는 return하지 않는 방식. | 연산자를 이용해 이벤트 종류를 함께 명시
  • EPOLLERR : 에러가 발생한 이벤트

epoll_wait: select 함수처럼 file descriptor 변화를 대기

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

return: 성공 시 이벤트가 발생한 file descriptor의 수, 실패 시 -1

epfd: epoll 파일 디스크립터
events: 이벤트가 발생한 파일 디스크립터가 채워질 버퍼의 주소 값
maxevent: 최대 이벤트 수
timeout: 대기 시간, -1 전달 시 이벤트 발생 시까지 무한 대기

epfd=epoll_create(EPOLL_SIZE);
ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);

event.events=EPOLLIN;
event.data.fd=serv_sock;	
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);

while(1)
{
	event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
    ...;
}

epoll 기반의 에코 서버

int main(int argc, char *argv[])
{
	int serv_sock, clnt_sock;
	struct sockaddr_in serv_adr, clnt_adr;
	socklen_t adr_sz;
	int str_len, i;
	char buf[BUF_SIZE];

	struct epoll_event *ep_events;
	struct epoll_event event;
	int epfd, event_cnt;
	...;

	serv_sock=socket(PF_INET, SOCK_STREAM, 0);
	...;
	
	if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
		printf("bind() error");
	if(listen(serv_sock, 5)==-1)
		printf("listen() error");

	epfd=epoll_create(EPOLL_SIZE);		// epoll 저장소 생성
	ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);	// 이벤트 저장할 공간 생성
     /*
    epoll 인스턴스에 있는 파일 디스크립터 중
    실제로 이벤트가 발생한 파일 디스크립터를 따로 모아놓는 동적배열
    최대 EPOLL_SIZE 만큼 이벤트가 발생할 수 있음
    */

	event.events=EPOLLIN; // 수신한 데이터가 있는 이벤트
	event.data.fd=serv_sock; // 서버소켓이 대상
	epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);		//서버 소켓의 이벤트 등록

	while(1)
	{
		event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
        /*
        epoll 인스턴스에 있는 관심대상에서
        이벤트가 발생할때까지 무한대기
        */
		if(event_cnt==-1)
		{
			puts("epoll_wait() error");
			break;
		}

		for(i=0; i<event_cnt; i++)// 이벤트 발생한 파일 디스크립터에 대해서만 반복문(select 기반과 다른 점)
		{
			if(ep_events[i].data.fd==serv_sock)
            /*
            클라이언트의 연결요청도 데이터 전송을 통해 이루어지므로
            서버소켓에 수신된 데이터가 존재한다는 것은 클라이언트의 연결요청이 있었다는 의미
            */
			{
				adr_sz=sizeof(clnt_adr);
				clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz); // 클라이언트의 연결요청을 수락
				event.events=EPOLLIN; // 이벤트 등록방식을 레벨트리거 방식으로
				event.data.fd=clnt_sock;
				epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);		
                // 새로운 소켓도 등록
				printf("connected client: %d \n", clnt_sock);
			}
			else // 클라이언트의 메시지를 실제로 수신하는 소켓에 대해(accept 함수호출로 생성된 소켓)
			{
					str_len=read(ep_events[i].data.fd, buf, BUF_SIZE); // 클라이언트로부터 데이터 수신
					if(str_len==0)   // close request!(EOF 수신)
					{
						epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);		
                        // 클라이언트가 종료했으므로 이 소켓 또한 관심대상에서 제외하고
						close(ep_events[i].data.fd);		// 소켓 삭제
						printf("closed client: %d \n", ep_events[i].data.fd);
					}
					else
					{
						write(ep_events[i].data.fd, buf, str_len);    // 수신한 문자열을 다시 클라이언트로 에코
					}
	
			}
		}
	}
	close(serv_sock);		// 서버 소켓 소멸
	close(epfd);			// epoll 인스턴스 소멸
	return 0;
}

레벨트리거 VS 엣지트리거

레벨트리거: 입력버퍼에 데이터가 남아있는동안 계속해서 이벤트를 등록

엣지트리거: 데이터가 수신된 상황 단 한 번만 이벤트를 등록
-> 새로운 데이터가 들어와야만 이벤트 발생

  • 기본적으로는 레벨 트리거로 설정되어있음
    서버쪽에서 컨트롤 요소가 많고 데이터 송수신이 빈번한 경우는 엣지 트리거가 유리
    단순하고 데이터 송수신 상황이 다양하지 않으면 레벨 트리거 방식이 유리

레벨트리거를 이용한 epoll 서버

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#define BUF_SIZE 4
#define EPOLL_SIZE 50

void error_handling(char* message){
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

int main(int argc, char* argv[]){
    if(argc!=2)
        error_handling("잘못된 인수입니다.");
    
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    socklen_t adr_sz;
    int str_len, i;
    char buf[BUF_SIZE];

    struct epoll_event* ep_events;
    struct epoll_event event;
    int epfd, event_cnt;

    serv_sock=socket(PF_INET,SOCK_STREAM,0);
    memset(&serv_adr,0,sizeof(serv_adr));
    serv_adr.sin_family=AF_INET;
    serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
    serv_adr.sin_port=htons(atoi(argv[1]));

    if(bind(serv_sock,(struct sockaddr*)&serv_adr,sizeof(serv_adr))==-1)
        error_handling("bind() error");
    if(listen(serv_sock,5)==-1)
        error_handling("listen() error");
    
    epfd=epoll_create(EPOLL_SIZE);
    ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
    event.events=EPOLLIN;
    event.data.fd=serv_sock;
    epoll_ctl(epfd,EPOLL_CTL_ADD,serv_sock,&event);

    while(1){
        event_cnt=epoll_wait(epfd,ep_events,EPOLL_SIZE,-1);
        if(event_cnt==-1)
            error_handling("epoll() error");

        puts("return epoll_wait");
        for(i=0;i<event_cnt;i++){
            if(ep_events[i].data.fd==serv_sock){//서버소켓에서 이변발생시, 그것은 연결요청신호이다.
                adr_sz=sizeof(clnt_adr);
                clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_adr,&adr_sz);
                event.events=EPOLLIN;
                event.data.fd=clnt_sock;
                epoll_ctl(epfd,EPOLL_CTL_ADD,clnt_sock,&event);
                printf("%d번 파일디스크립터의 클라이언트소켓 연결\n",clnt_sock);
            }
            else{//클라이언트소켓에서 이변발생시, 그것은 데이터통신이다.
                str_len=read(ep_events[i].data.fd,buf,BUF_SIZE);
                if(str_len==0){
                    epoll_ctl(epfd,EPOLL_CTL_DEL,ep_events[i].data.fd,NULL);
                    close(ep_events[i].data.fd);
                    printf("%d번 파일디스크립터의 클라이언트연결 종료\n",ep_events[i].data.fd);
                }
                else{
                    write(ep_events[i].data.fd,buf,str_len);//에코
                }
            }
        }
    }
    close(serv_sock);
    close(epfd);
    return 0;
}

앞서보인 예제와 다른 점
버퍼의 크기를 4바이트로 축소시킨 것과(입력버퍼에 수신된 데이터를 한 번에 읽어들이지 못하게 한 뒤 몇 번의 epoll_wait 함수의 반환이 이루어지는지를 검사하기 위해)
epoll_wait 함수의 호출횟수 확인용 puts 문 삽입뿐

레벨트리거 서버는 클라이언트로부터 메시지를 한 번 수신할 때마다 이벤트 등록이 여러 번 이루어지고, 이로 인해 epoll_wait 함수가 다수 호출됨을 확인

엣지트리거를 이용한 epoll 서버

event.events = EPOLLIN | EPOLLET;

넌 블로킹 IO로 소켓을 변경

  • 엣지 트리거는 데이터 수신 시 한번만 이벤트가 발생되기 때문에 충분한 양의 버퍼를 마련한 다음 데이터를 읽어 들여야 함
  • 데이터의 양에 따라 블로킹이 발생할 수 있기 때문에 넌 블로킹 IO로 변경
  • 그리고 errno 변수를 참조해서 EAGAIN 값이면 버퍼가 빈 상태인 것을 확인
int flag=fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flag|O_NONBLOCK);

소켓을 NONE BLOCKING로 만든 이유는 무엇인가?
엣지트리거의 방식 특석상 블로킹 방식으로 동작하는 read&write 함수의 호출은 서버를 오랜 시간 멈추는 상황으로 까지 이어지게 할 수 있다. 때문에 엣지트리거 방식에서는 반드시 넌 블로킹 소켓을 기반으로 read & write함수를 호출해야 한다.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#define BUF_SIZE 4
#define EPOLL_SIZE 50

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

void setnonblockingmode(int fd){
    int flag=fcntl(fd,F_GETFL,0);
    fcntl(fd,F_SETFL,flag|O_NONBLOCK);
}

int main(int argc, char* argv[]){
    if(argc!=2)
        error_handling("잘못된 인수입니다");
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    socklen_t adr_sz;
    int str_len, i;
    char buf[BUF_SIZE];

    struct epoll_event* ep_events;
    struct epoll_event event;
    int epfd, event_cnt;

    serv_sock=socket(PF_INET,SOCK_STREAM,0);
    memset(&serv_adr,0,sizeof(serv_adr));
    serv_adr.sin_family=AF_INET;
    serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
    serv_adr.sin_port=htons(atoi(argv[1]));
    if(bind(serv_sock,(struct sockaddr*)&serv_adr, sizeof(serv_adr))==-1)
        error_handling("bind() error");
    if(listen(serv_sock,5)==-1)
        error_handling("listen() error");
    
    epfd=epoll_create(EPOLL_SIZE);
    ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);

    /**/
    setnonblockingmode(serv_sock);
    event.events=EPOLLIN;
    event.data.fd=serv_sock;
    epoll_ctl(epfd,EPOLL_CTL_ADD,serv_sock,&event);

    while(1){
        event_cnt=epoll_wait(epfd,ep_events,EPOLL_SIZE,-1);
        if(event_cnt==-1)
            error_handling("epoll_wait() erorr");
        puts("return epoll_wait");
        for(i=0;i<event_cnt;i++){
            if(ep_events[i].data.fd==serv_sock){//서버소켓에서 일어난 데이터관련 이변 == 신규연결요청
                adr_sz=sizeof(clnt_adr);
                clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_adr,&adr_sz);
                setnonblockingmode(clnt_sock);
                event.events=EPOLLIN|EPOLLET;
                event.data.fd=clnt_sock;
                epoll_ctl(epfd,EPOLL_CTL_ADD,clnt_sock,&event);
                printf("fd %d 번 클라이언트 연결완료\n",clnt_sock);
            }
            else{//그외 == 단순데이터전송이면
                while(1){ // 이벤트 발생시 입력버퍼에 모든 데이터를 수신하기 위함임
                    str_len=read(ep_events[i].data.fd,buf,BUF_SIZE); // 클라이언트로부터 데이터 수신
                    if(str_len==0){ // close request!(EOF 수신)
                        epoll_ctl(epfd,EPOLL_CTL_DEL,ep_events[i].data.fd,NULL); // 클라이언트가 종료했으므로 이 소켓 또한 관심대상에서 제외하고
                        close(ep_events[i].data.fd); // 이 소켓의 연결도 종료
                        printf("fd %d 번 클라이언트 접속종료\n",clnt_sock);
                        break;
                    }
                    else if(str_len<0){
                        if(errno==EAGAIN)//입력버퍼에 더 이상 읽을 것이 없다는 에러가 났을 경우
                            break;
                    }
                    else{
                        write(ep_events[i].data.fd,buf,str_len); //수신한 문자열을 다시 클라이언트로 에코
                    }
                }
            }
        }
    }
    close(serv_sock);
    close(epfd);
    return 0;
}

다른 점
클라이언트로부터 데이터가 수신될 때, 각각 단 한번의 return epoll_wait 만 출력
이는 이벤트가 메시지 수신 당 길이, 즉 읽는 횟수에 관계없이 단 1번만 발생함을 의미

엣지 트리거 장점

데이터의 수신과 데이터가 처리되는 시점을 분리할 수 있다.
->입력 버퍼에 데이터가 수신된 상황임에도 불구하고 읽어들이고 처리하는 시점을 서버가 결정할 수 있다.

  • 클라이언트 c와 b는 서버로 데이터를 전송하고 있는데, a는 아직 연결조차 하지 않은 경우
  • 클라이언트 a, b, c가 순서에 상관없이 데이터를 서버로 전송하는 경우
  • 서버로 데이터가 전송되고 있는데, 정작 이 데이터를 수신할 클라이언트가 아직 연결되지 않은 경우

echo client

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define BUF_SIZE 1024
void error_handling(char *message);

int main(int argc, char *argv[])
{
	int sock;
	char message[BUF_SIZE];
	int str_len;
	struct sockaddr_in serv_adr;

	if(argc!=3) {
		printf("Usage : %s <IP> <port>\n", argv[0]);
		exit(1);
	}
	
	sock=socket(PF_INET, SOCK_STREAM, 0);   
	if(sock==-1)
		error_handling("socket() error");
	
	memset(&serv_adr, 0, sizeof(serv_adr));
	serv_adr.sin_family=AF_INET;
	serv_adr.sin_addr.s_addr=inet_addr(argv[1]);
	serv_adr.sin_port=htons(atoi(argv[2]));
	
	if(connect(sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr))==-1)
		error_handling("connect() error!");
	else
		puts("Connected...........");
	
	while(1) 
	{
		fputs("Input message(Q to quit): ", stdout);
		fgets(message, BUF_SIZE, stdin);
		
		if(!strcmp(message,"q\n") || !strcmp(message,"Q\n"))
			break;

		write(sock, message, strlen(message));
		str_len=read(sock, message, BUF_SIZE-1);
		message[str_len]=0;
		printf("Message from server: %s", message);
	}
	
	close(sock);
	return 0;
}

void error_handling(char *message)
{
	fputs(message, stderr);
	fputc('\n', stderr);
	exit(1);
}

0개의 댓글