epoll 프로그래밍 흐름
- epoll fd 생성 : epoll_create() * 연결 받기 (listen, accept)
- listen 용 fd 생성 및 준비 : socket(), bind(), setsockopt()
- epoll에 등록 : epoll_ctl(..., EPOLL_CTL_ADD, ... )
- listen()
- epoll_wait를 통해, fd_listen 에 사건발생(즉, 누군가 접속시도함)을 감지하여 접속받음 : accept()
- accept로부터 넘어온 fd에 통신준비 : fcntl()
- epoll에 새로운 fd 등록 : epoll_ctl(..., EPOLL_CTL_ADD, ... )
- 연결 하기 (connect)
- 사전준비(상대방 ip주소, port번호등) 그리고 연결 : connect()
- 연결성공하면 connect로부터 넘어온 fd에 통신준비 : fcntl()
- epoll에 새로운 fd 등록 : epoll_ctl(..., EPOLL_CTL_ADD, ... )
- 연결 끊기 (connect)
- 연결을 끊고 싶거나, 연결이 끊김 사건 발생 또는 에러발생 : read(), epoll_wait() 에서 감지
- epoll에서 해당 fd 삭제 : epoll_ctrl(..., EPOLL_CTL_DEL, ...)
- 연결 닫음 : close()
- epoll 종료 : close(), epoll 도 FILE이기 때문에 그냥 닫으면 된다 ;)
- 읽기 (read)
- 읽을 데이타 도착했음을 감지 : epoll_wait()
- 데이타 읽기 : read()
- 읽는 도중 발생하는 에러처리, 필요시 연결끊기
- 쓰기 (write)
- 보낼 데이터가 발생한 경우 Send Buffer에 일단 저장
- 보내도 된다는 신호를 감지 : epoll_wait()
- 데이터 전송 : write()
1) 전송 도중 발생하는 에러처리, 필요시 연결끊기 혹은 WOULDBLOCK 처리
참고 1, WOULDBLOCK 인 경우는 에러지만 에러처리하지 않고, 단순히 리턴하고 기다리다가, EPOLLOUT 신호가 다시 발생했을 경우에 데이터를 전송하도록 한다.
참고 2, Send Buffering (]?SendBuffer])
소켓 관리하는 fd_set을 직접 관리해야 함
select로 fd_set을 전달할 때 원본을 직접 전달하지 않고 복사본을 전달해야 하는 번거로움
select가 한번 일어날 때 마다 fd_set을 모두 점검하면서 변화가 일어났는지 체크해야 함
select는 운영체제에 의해서 완성되는 기능이 아니고 함수에 의해 완성되기 때문에 번거롭고 성능이 떨어질 수 있음
이 문제를 해결하기 위해선 운영체제에 관찰 대상(소켓)의 정보를 한번만 알려주고 변화가 있을 때 변화가 있는 소켓의 정보들만 받으면 됨
위처럼 운영체제 레벨에서 지원하는 멀티 플렉싱 모델이 epoll(리눅스), IOCP(윈도우)
select를 사용할 상황
- 서버의 접속자가 많지 않음
- 다양한 운영체제에서 사용할 수 있어야 함
-> epoll이나 iocp는 운영체제에 종속적임
select처럼 모든 소켓에 대해 반복문을 수행하면서 점검할 필요가 없음
-> 변화가 일어난 소켓의 정보만 전달해서 주기 때문
select함수에 대응하는 epoll_wait 시 소켓의 정보를 매번 전달할 필요가 없음
typedef union epoll_data
{
void *ptr;
int fd; // 이벤트가 일어난(or 일어날) 파일 디스크립터
__unit32_t u32;
__unit64_t u64;
} epoll_data_t;
struct epoll_event
{
__unit32_t events; // 관찰할 이벤트의 종류
epoll_data_t data;
}
epoll의 시작을 운영체제에 알려주어 OS가 fd를 관리할 저장소를 만들게 함
#include <sys/epoll.h>
int epoll_create(int size);
return: 성공 시 epoll 파일 디스크립터, 실패 시 -1
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
return: 성공 시 0, 실패 시 -1
epfd: epoll_create로 생성한 epoll file descriptor
op: 관찰 대상의 추가, 삭제, 변경 여부
// epoll.h #define EPOLL_CTL_ADD 1 /* Add a file decriptor to the interface. */ #define EPOLL_CTL_DEL 2 /* Remove a file decriptor from the interface. */ #define EPOLL_CTL_MOD 3 /* Change file decriptor epoll_event structure. */
fd : 등록할 file descriptor
event : 관찰 대상의 이벤트 유형
- EPOLLIN : 수신할 데이터가 존재하는 이벤트
- EPOLLOUT : 즉시 데이터를 전송할 수 있도록 출력 버퍼가 비워진 이벤트
- EPOLLPRI : OOB(Out-Of-Band) 데이터가 수신된 이벤트
- EPOLLRDHUP : 연결이 종료된 이벤트 (half-close 포함)
- EPOLLET : Edge-trigger 방식으로 이벤트 감지. 이 경우 | 연산자를 이용해 이벤트 종류도 함께 명시
- EPOLLONESHOT : 최초의 이벤트만 감지하고 이후에는 return하지 않는 방식. | 연산자를 이용해 이벤트 종류를 함께 명시
- EPOLLERR : 에러가 발생한 이벤트
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
return: 성공 시 이벤트가 발생한 file descriptor의 수, 실패 시 -1
epfd: epoll 파일 디스크립터
events: 이벤트가 발생한 파일 디스크립터가 채워질 버퍼의 주소 값
maxevent: 최대 이벤트 수
timeout: 대기 시간, -1 전달 시 이벤트 발생 시까지 무한 대기
epfd=epoll_create(EPOLL_SIZE);
ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
event.events=EPOLLIN;
event.data.fd=serv_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);
while(1)
{
event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
...;
}
int main(int argc, char *argv[])
{
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];
struct epoll_event *ep_events;
struct epoll_event event;
int epfd, event_cnt;
...;
serv_sock=socket(PF_INET, SOCK_STREAM, 0);
...;
if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)
printf("bind() error");
if(listen(serv_sock, 5)==-1)
printf("listen() error");
epfd=epoll_create(EPOLL_SIZE); // epoll 저장소 생성
ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE); // 이벤트 저장할 공간 생성
/*
epoll 인스턴스에 있는 파일 디스크립터 중
실제로 이벤트가 발생한 파일 디스크립터를 따로 모아놓는 동적배열
최대 EPOLL_SIZE 만큼 이벤트가 발생할 수 있음
*/
event.events=EPOLLIN; // 수신한 데이터가 있는 이벤트
event.data.fd=serv_sock; // 서버소켓이 대상
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event); //서버 소켓의 이벤트 등록
while(1)
{
event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
/*
epoll 인스턴스에 있는 관심대상에서
이벤트가 발생할때까지 무한대기
*/
if(event_cnt==-1)
{
puts("epoll_wait() error");
break;
}
for(i=0; i<event_cnt; i++)// 이벤트 발생한 파일 디스크립터에 대해서만 반복문(select 기반과 다른 점)
{
if(ep_events[i].data.fd==serv_sock)
/*
클라이언트의 연결요청도 데이터 전송을 통해 이루어지므로
서버소켓에 수신된 데이터가 존재한다는 것은 클라이언트의 연결요청이 있었다는 의미
*/
{
adr_sz=sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz); // 클라이언트의 연결요청을 수락
event.events=EPOLLIN; // 이벤트 등록방식을 레벨트리거 방식으로
event.data.fd=clnt_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
// 새로운 소켓도 등록
printf("connected client: %d \n", clnt_sock);
}
else // 클라이언트의 메시지를 실제로 수신하는 소켓에 대해(accept 함수호출로 생성된 소켓)
{
str_len=read(ep_events[i].data.fd, buf, BUF_SIZE); // 클라이언트로부터 데이터 수신
if(str_len==0) // close request!(EOF 수신)
{
epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
// 클라이언트가 종료했으므로 이 소켓 또한 관심대상에서 제외하고
close(ep_events[i].data.fd); // 소켓 삭제
printf("closed client: %d \n", ep_events[i].data.fd);
}
else
{
write(ep_events[i].data.fd, buf, str_len); // 수신한 문자열을 다시 클라이언트로 에코
}
}
}
}
close(serv_sock); // 서버 소켓 소멸
close(epfd); // epoll 인스턴스 소멸
return 0;
}
레벨트리거: 입력버퍼에 데이터가 남아있는동안 계속해서 이벤트를 등록
엣지트리거: 데이터가 수신된 상황 단 한 번만 이벤트를 등록
-> 새로운 데이터가 들어와야만 이벤트 발생
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#define BUF_SIZE 4
#define EPOLL_SIZE 50
void error_handling(char* message){
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
int main(int argc, char* argv[]){
if(argc!=2)
error_handling("잘못된 인수입니다.");
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];
struct epoll_event* ep_events;
struct epoll_event event;
int epfd, event_cnt;
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_adr,0,sizeof(serv_adr));
serv_adr.sin_family=AF_INET;
serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_adr.sin_port=htons(atoi(argv[1]));
if(bind(serv_sock,(struct sockaddr*)&serv_adr,sizeof(serv_adr))==-1)
error_handling("bind() error");
if(listen(serv_sock,5)==-1)
error_handling("listen() error");
epfd=epoll_create(EPOLL_SIZE);
ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
event.events=EPOLLIN;
event.data.fd=serv_sock;
epoll_ctl(epfd,EPOLL_CTL_ADD,serv_sock,&event);
while(1){
event_cnt=epoll_wait(epfd,ep_events,EPOLL_SIZE,-1);
if(event_cnt==-1)
error_handling("epoll() error");
puts("return epoll_wait");
for(i=0;i<event_cnt;i++){
if(ep_events[i].data.fd==serv_sock){//서버소켓에서 이변발생시, 그것은 연결요청신호이다.
adr_sz=sizeof(clnt_adr);
clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_adr,&adr_sz);
event.events=EPOLLIN;
event.data.fd=clnt_sock;
epoll_ctl(epfd,EPOLL_CTL_ADD,clnt_sock,&event);
printf("%d번 파일디스크립터의 클라이언트소켓 연결\n",clnt_sock);
}
else{//클라이언트소켓에서 이변발생시, 그것은 데이터통신이다.
str_len=read(ep_events[i].data.fd,buf,BUF_SIZE);
if(str_len==0){
epoll_ctl(epfd,EPOLL_CTL_DEL,ep_events[i].data.fd,NULL);
close(ep_events[i].data.fd);
printf("%d번 파일디스크립터의 클라이언트연결 종료\n",ep_events[i].data.fd);
}
else{
write(ep_events[i].data.fd,buf,str_len);//에코
}
}
}
}
close(serv_sock);
close(epfd);
return 0;
}
앞서보인 예제와 다른 점
버퍼의 크기를 4바이트로 축소시킨 것과(입력버퍼에 수신된 데이터를 한 번에 읽어들이지 못하게 한 뒤 몇 번의 epoll_wait 함수의 반환이 이루어지는지를 검사하기 위해)
epoll_wait 함수의 호출횟수 확인용 puts 문 삽입뿐
레벨트리거 서버는 클라이언트로부터 메시지를 한 번 수신할 때마다 이벤트 등록이 여러 번 이루어지고, 이로 인해 epoll_wait 함수가 다수 호출됨을 확인
event.events = EPOLLIN | EPOLLET;
넌 블로킹 IO로 소켓을 변경
- 엣지 트리거는 데이터 수신 시 한번만 이벤트가 발생되기 때문에 충분한 양의 버퍼를 마련한 다음 데이터를 읽어 들여야 함
- 데이터의 양에 따라 블로킹이 발생할 수 있기 때문에 넌 블로킹 IO로 변경
- 그리고 errno 변수를 참조해서 EAGAIN 값이면 버퍼가 빈 상태인 것을 확인
int flag=fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flag|O_NONBLOCK);
소켓을 NONE BLOCKING로 만든 이유는 무엇인가?
엣지트리거의 방식 특석상 블로킹 방식으로 동작하는 read&write 함수의 호출은 서버를 오랜 시간 멈추는 상황으로 까지 이어지게 할 수 있다. 때문에 엣지트리거 방식에서는 반드시 넌 블로킹 소켓을 기반으로 read & write함수를 호출해야 한다.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#define BUF_SIZE 4
#define EPOLL_SIZE 50
void error_handling(char *message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
void setnonblockingmode(int fd){
int flag=fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL,flag|O_NONBLOCK);
}
int main(int argc, char* argv[]){
if(argc!=2)
error_handling("잘못된 인수입니다");
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];
struct epoll_event* ep_events;
struct epoll_event event;
int epfd, event_cnt;
serv_sock=socket(PF_INET,SOCK_STREAM,0);
memset(&serv_adr,0,sizeof(serv_adr));
serv_adr.sin_family=AF_INET;
serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_adr.sin_port=htons(atoi(argv[1]));
if(bind(serv_sock,(struct sockaddr*)&serv_adr, sizeof(serv_adr))==-1)
error_handling("bind() error");
if(listen(serv_sock,5)==-1)
error_handling("listen() error");
epfd=epoll_create(EPOLL_SIZE);
ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
/**/
setnonblockingmode(serv_sock);
event.events=EPOLLIN;
event.data.fd=serv_sock;
epoll_ctl(epfd,EPOLL_CTL_ADD,serv_sock,&event);
while(1){
event_cnt=epoll_wait(epfd,ep_events,EPOLL_SIZE,-1);
if(event_cnt==-1)
error_handling("epoll_wait() erorr");
puts("return epoll_wait");
for(i=0;i<event_cnt;i++){
if(ep_events[i].data.fd==serv_sock){//서버소켓에서 일어난 데이터관련 이변 == 신규연결요청
adr_sz=sizeof(clnt_adr);
clnt_sock=accept(serv_sock,(struct sockaddr*)&clnt_adr,&adr_sz);
setnonblockingmode(clnt_sock);
event.events=EPOLLIN|EPOLLET;
event.data.fd=clnt_sock;
epoll_ctl(epfd,EPOLL_CTL_ADD,clnt_sock,&event);
printf("fd %d 번 클라이언트 연결완료\n",clnt_sock);
}
else{//그외 == 단순데이터전송이면
while(1){ // 이벤트 발생시 입력버퍼에 모든 데이터를 수신하기 위함임
str_len=read(ep_events[i].data.fd,buf,BUF_SIZE); // 클라이언트로부터 데이터 수신
if(str_len==0){ // close request!(EOF 수신)
epoll_ctl(epfd,EPOLL_CTL_DEL,ep_events[i].data.fd,NULL); // 클라이언트가 종료했으므로 이 소켓 또한 관심대상에서 제외하고
close(ep_events[i].data.fd); // 이 소켓의 연결도 종료
printf("fd %d 번 클라이언트 접속종료\n",clnt_sock);
break;
}
else if(str_len<0){
if(errno==EAGAIN)//입력버퍼에 더 이상 읽을 것이 없다는 에러가 났을 경우
break;
}
else{
write(ep_events[i].data.fd,buf,str_len); //수신한 문자열을 다시 클라이언트로 에코
}
}
}
}
}
close(serv_sock);
close(epfd);
return 0;
}
다른 점
클라이언트로부터 데이터가 수신될 때, 각각 단 한번의 return epoll_wait 만 출력
이는 이벤트가 메시지 수신 당 길이, 즉 읽는 횟수에 관계없이 단 1번만 발생함을 의미
데이터의 수신과 데이터가 처리되는 시점을 분리할 수 있다.
->입력 버퍼에 데이터가 수신된 상황임에도 불구하고 읽어들이고 처리하는 시점을 서버가 결정할 수 있다.
- 클라이언트 c와 b는 서버로 데이터를 전송하고 있는데, a는 아직 연결조차 하지 않은 경우
- 클라이언트 a, b, c가 순서에 상관없이 데이터를 서버로 전송하는 경우
- 서버로 데이터가 전송되고 있는데, 정작 이 데이터를 수신할 클라이언트가 아직 연결되지 않은 경우
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define BUF_SIZE 1024
void error_handling(char *message);
int main(int argc, char *argv[])
{
int sock;
char message[BUF_SIZE];
int str_len;
struct sockaddr_in serv_adr;
if(argc!=3) {
printf("Usage : %s <IP> <port>\n", argv[0]);
exit(1);
}
sock=socket(PF_INET, SOCK_STREAM, 0);
if(sock==-1)
error_handling("socket() error");
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family=AF_INET;
serv_adr.sin_addr.s_addr=inet_addr(argv[1]);
serv_adr.sin_port=htons(atoi(argv[2]));
if(connect(sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr))==-1)
error_handling("connect() error!");
else
puts("Connected...........");
while(1)
{
fputs("Input message(Q to quit): ", stdout);
fgets(message, BUF_SIZE, stdin);
if(!strcmp(message,"q\n") || !strcmp(message,"Q\n"))
break;
write(sock, message, strlen(message));
str_len=read(sock, message, BUF_SIZE-1);
message[str_len]=0;
printf("Message from server: %s", message);
}
close(sock);
return 0;
}
void error_handling(char *message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}