#ifndef IPC_MSG_Q_H
# define IPC_MSG_Q_H
# include <cstdio>
# include <cstdlib>
# include <cstring>
# include <sys/msg.h>
# include <cerrno>
# include <ctime>
# include <unistd.h>
# define MSG_SZ 80
# define KEY 1 //for msgid instead of ftok()
/*//////////msgflg//////////
//시스템마다 처리하는 값이 다를 수 있어서 따로 정의하는 것은 좋지 않음
# ifndef IPC_NOWAIT
# define IPC_NOWAIT 2048
# endif
# ifndef MSG_NOERROR
# define MSG_NOERROR 4096
# endif
# ifndef MSG_EXCEPT
# define MSG_EXCEPT 8192
# endif
*////////////////////////////
typedef struct s_msg_buf {
long mtype;
char mtext[MSG_SZ];
} t_msg_buf;
#endif
t_msg_buf 구조체의 mtype 변수는 양의 정수를 가져야만 합니다.
#include "ipc_msg_q.h"
//argv : [path][block/nonblock][msg][count]
int main(int argc, char *argv[]) {
t_msg_buf m_buf;
int msgid = msgget(KEY, IPC_CREAT | 0644);
if (msgid < 0) {
printf("Error msgget : %s\n", strerror(errno));
}
m_buf.mtype = 1; //using by recv
strcpy(m_buf.mtext, "===test msg===");
int msgflg = 0; //initialize to block
if (argc >= 2 && !(memcmp(argv[1], "nowait", 6))) {
msgflg = IPC_NOWAIT; //2048
printf("Send : setting nowait\n");
}
if (argc >= 3) {
memcpy(m_buf.mtext, argv[2], MSG_SZ);
}
int count = 1;
if (argc >= 4) //if argv[3] isn't type int -> atoi return 0 and send nothhing
count = atoi(argv[3]); //if count is big size that waste resource
int chk_send = 0;
for (int i = 0; i < count; ++i) {
chk_send = msgsnd(msgid, (char *)&m_buf, MSG_SZ, msgflg);
if (chk_send < 0) {
printf("Error msgget : %s\n", strerror(errno));
}
}
return (0);
}
고유한 key를 넣으면 그에 대한 식별자(msgid)를 리턴해줍니다.
key값은 ftok() 함수를 사용해서 만들 수도 있습니다.
이 msgid는 커널에서 다루는 msg queue를 식별하는 식별자입니다.
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
버퍼에 담긴 데이터를 커널의 message queue에 메모리를 복사합니다.
만약 대기열이 충분하지 않다면 msgsnd()의 기본 동작은 공간이 사용가능해질 때 까지 차단합니다. IPC_NOWAIT가 msgflg에 지정된 경우 대신 EAGAIN 오류와 함께호출 실패합니다.
성공적으로 완료시 message queue 의 data structure는 다음과 같이 업데이트 됩니다.
#include "ipc_msg_q.h"
//argv : [path][msgflg]
int main(int argc, char *argv[]) {
t_msg_buf m_buf;
int msgid = msgget(KEY, 0); //if already exist identifer by the key, return same identifier
if (msgid < 0) {
printf("Error msgget : %s\n", strerror(errno));
}
int msgflg = 0;
if (argc >= 2) {
if (!(memcmp(argv[1], "nowait", 6))) {
msgflg = IPC_NOWAIT;
printf("Recv : setting nowait\n");
}
else if (!(memcmp(argv[1], "noerror", 7))) {
msgflg = MSG_NOERROR;
printf("Recv : setting noerror\n");
}
else if (!(memcmp(argv[1], "except", 6))) {
msgflg = MSG_EXCEPT;
printf("Recv : setting except\n");
}
else
printf("first arg is invalid flag");
}
//4 args set 0 : read from the first queue
int len = 0;
while (len = msgrcv(msgid, (char *)&m_buf, MSG_SZ, 0, msgflg) >= 0) {
printf("receive msg : %s, len : %d\n", m_buf.mtext, len);
}
if (len < 0) {
printf("Error msgrev : %s\n", strerror(errno));
}
return (0);
}
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
int msgflg);
#include "ipc_msg_q.h"
void printTime(time_t *target_time) {
time_t timer = time(target_time);
struct tm *t = localtime(&timer);
printf("%d년 %d월 %d일 %d시 %d분 %d초\n",
t->tm_year + 1900, t->tm_mon + 1, t->tm_mday, t->tm_hour, t->tm_min, t->tm_sec);
}
int main() {
struct msqid_ds msqstat;
int msgid = msgget(KEY, 0);
if (msgid < 0) {
printf("[monitor] Error megget : %s\n", strerror(errno));
}
int chk_msqstat = msgctl(msgid, IPC_STAT, &msqstat);
if (chk_msqstat < 0) {
printf("[monitor] Error msgctr : %s\n", strerror(errno));
}
printf("대기열의 최대 바이트 수 : %lu\n", msqstat.msg_qbytes);
printf("대기열에 있는 바이트 수 : %lu\n", msqstat.msg_cbytes);
printf("매세지 큐에 저장된 매세지 개수 : %lu\n", (msqstat.msg_qnum));
printf("마지막으로 매세지 보낸 시간 :"); printTime(&msqstat.msg_stime);
printf("마지막으로 매세지 받은 시간 :"); printTime(&msqstat.msg_rtime);
printf("매세지 생성 시간 혹은 수정 시간:"); printTime(&msqstat.msg_ctime);
printf("마지막으로 매세지 보낸 pid %d\n", (int)msqstat.msg_lspid);
printf("마지막으로 매세지 받은 pid %d\n", (int)msqstat.msg_lrpid);
printf("%s\n", msqstat.msg_perm);
return (0);
}
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
cmd에는 몇몇의 옵션이 들어올 수 있으며, 옵션을 기준으로 buf에 값을 정합니다.
MSGMNI: Maximum number of message queue identifiers (system wide).
MSGMNB: Maximum number of bytes per message queue.
컴파일
gcc -o revc recv.c
gcc -o send send.c
gcc -o monitor monitor.cmonitor할 msg queue 채우고 확인하기
./send block testsetsetsetsetestsetsetset 999999999
./monitor
./recv결과
1.send의 첫번째 인자값을 nowait으로 하면 msg queue가 다 차더라도 block에 걸리지 않아 에러를 출력합니다.
2.block으로 걸려있다면 출력하지않고 대기합니다.
3.block으로 걸려있을 때 보낼 msg가 남은상태에서 recv로 큐를 비워주면 다시 send를 합니다.
1번
[ydh@krujyit1 ~/msgq]$ ./monitor
대기열의 최대 바이트 수 : 536862720
대기열에 있는 바이트 수 : 536862720
매세지 큐에 저장된 매세지 개수 : 6710784
2번
[ydh@krujyit1 ~/msgq]$ ./monitor
대기열의 최대 바이트 수 : 536862720
대기열에 있는 바이트 수 : 440936240
매세지 큐에 저장된 매세지 개수 : 5511703
:2024년 2월 23일 11시 23분 37초
3번
[ydh@krujyit1 ~/msgq]$ ./monitor
대기열의 최대 바이트 수 : 536862720
대기열에 있는 바이트 수 : 77127280
매세지 큐에 저장된 매세지 개수 : 964091
:2024년 2월 23일 11시 23분 41초
위의 출력문은 block 테스트 상황에서 monitor로 확인한 값입니다.
send process를 띄우고 msg가 다 차있는 상태에서 recv process를 띄우니
대기열의 바이트 수가 줄어들기 시작했습니다. 1번과 2번의 결과를 보면 줄어드는 차이가 크지 않았고,
그 이유는 recv가 비워주는 족족 send가 채우고있었기 때문입니다.
3번은 send process를 종료시킨후 바로 확인한 결과입니다. 3~4초만에 거의 모든 메시지가 처리된 것을 확인할 수 있습니다.