#include <fcntl.h> /* Defines O_* constants */
#include <sys/stat.h> /* Defines mode constants */
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag, ...
/* mode_t mode, struct mq_attr *attr */);
/* Returns a message queue descriptor on success, or (mqd_t) –1 on error */
oflag
에 입력될 값은 아래와 같다.
attr
은 메시지 큐의 속성을 나타내는 struct mq_attr
인자이다.
struct mq_attr {
long mq_flags; /* Message queue description flags: 0 or O_NONBLOCK
[mq_getattr(), mq_setattr()] */
long mq_maxmsg; /* Maximum number of messages on queue
[mq_open(), mq_getattr()] */
long mq_msgsize; /* Maximum message size (in bytes)
[mq_open(), mq_getattr()] */
long mq_curmsgs; /* Number of messages currently in queue
[mq_getattr()] */
};
mq_open()
에서는 mq_maxmsg
, mq_msgsize
만 읽는다.
mq_send()
호출로 넣을 수 있는 최대 메시지 수. 0보다 커야 함#include <mqueue.h>
int mq_close(mqd_t mqdes);
/* Returns 0 on success, or –1 on error */
메시지 큐 디스크립터를 닫는다. 이 함수는 프로세스가 종료되거나 exec()
시스템 콜을 호출했을 경우 자동으로 수행한다. 만약 mq_notify()
로 등록한 통지가 있다면 해제된다.
#include <mqueue.h>
int mq_unlink(const char *name);
/* Returns 0 on success, or –1 on error */
name
에 해당되는 메시지 큐의 reference count
가 0이 되면 제거한다.
각 프로세스마다 존재하는 message queue descriptor table
의 엔트리에 있는 포인터는 시스템에 전역으로 존재하는 open message queue description table
의 엔트리를 가리킨다. 그리고 그 엔트리는 시스템에 전역으로 존재하는 message queue table
의 엔트리를 가리킨다.
프로세스 A, B에서 메시지 큐 디스크립터 x
가 모두 동일한 열린 메시지 큐 디스크립션 테이블의 엔트리를 가리킨다. fork()한 이후 부모와 자식 프로세스의 파일 디스크립터 번호가 복사된 후 동일한 열린 파일 테이블 엔트리를 가리키는 것과 동일하다.
또한 A의 메시지 큐 디스크립터 z
와 B의 메시지 큐 디스크립터 y
는 서로 다른 열린 메시지 큐 디스크립션 테이블 엔트리를 가리키지만, 결론적으로는 동일한 메시지 큐를 가리킨다.
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
/* Returns 0 on success, or –1 on error */
메시지 큐의 속성 정보를 attr
을 통해 받아오거나 설정한다.
message queue description
의 플래그SUSv3에서는 mq_setattr()
로 수정 가능한 유일한 속성은 O_NONBLOCK
플래그 상태이다.
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
/* Returns 0 on success, or –1 on error */
메시지를 큐에 넣는다. 이때 msg_len
은 msg_ptr
이 가리키는 메시지의 크기를 명시하며, 이 값은 mq_msgsize
보다 작거나 동일해야 한다. 그렇지 않으면 EMSGSIZE
에러가 발생한다. 길이가 0
인 메시지도 가능하다.
msg_prio
는 메시지의 우선순위 값으로, 클 수록 우선순위가 높다. 메시지 큐 내에서는 각 메시지의 msg_prio
값을 기준으로 내림차순 정렬한다. 우선순위를 사용할 필요가 없다면 해당 필드를 0
으로 설정한다.
메시지 큐가 가득찬 경우, O_NONBLOCK
플래그가 비활성화되어 있으면 blocking되고, 활성화되어 있다면 EAGAIN
에러가 발생하며 바로 리턴한다.
#include <mqueue.h>
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
/* Returns number of bytes in received message on success, or –1 on error */
메시지 큐에서 가장 우선순위가 높은 메시지를 dequeue하여 읽는다. msg_len
은 msg_ptr
이 가리키는 버퍼의 가용 크기를 의미하며, 이 값은 mq_maxsize
보다 크거나 같아야 한다. 그렇지 않으면 EMSGSIZE
에러가 발생한다.
msg_prio
가 NULL
이 아니면 읽는 메시지의 우선순위 값을 저장한다.
메시지 큐가 비어있는 경우, O_NONBLOCK
플래그가 비활성화되어 있으면 blocking되고, 활성화되어 있다면 EAGAIN
에러가 발생하며 바로 리턴한다.
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <sys/stat.h>
#define MQ_NAME "/myposixmq"
#define MQ_MAXMSG 5
typedef struct{
int id;
char msg[16];
} msg_t;
int main(){
int i;
mqd_t mqd;
struct mq_attr attr;
msg_t msgbox[MQ_MAXMSG + 1] = {
{.id = 1, .msg = "apple"},
{.id = 2, .msg = "banana"},
{.id = 3, .msg = "carrot"},
{.id = 4, .msg = "dog"},
{.id = 5, .msg = "elephant"},
{.id = 6, .msg = "frog"} };
attr.mq_maxmsg = MQ_MAXMSG;
attr.mq_msgsize = sizeof(msg_t);
if((mqd = mq_open(MQ_NAME, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR, &attr)) == (mqd_t)-1){
perror("mq_open()");
exit(EXIT_FAILURE);
}
for(i = 0; i < MQ_MAXMSG; i++){
if(mq_send(mqd, (const char *)&msgbox[i], sizeof(msg_t), i) == -1){
perror("mq_send()");
exit(EXIT_FAILURE);
}
printf("sended message {id: %d}\n", i + 1);
}
//queue is full
if(mq_send(mqd, (const char *)&msgbox[MQ_MAXMSG], sizeof(msg_t), 5) == -1){
perror("mq_send()");
exit(EXIT_FAILURE);
}
printf("sended last message {id: %d}\n", MQ_MAXMSG + 1);
mq_close(mqd);
exit(EXIT_SUCCESS);
}
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <sys/stat.h>
#define MQ_NAME "/myposixmq"
typedef struct{
int id;
char msg[16];
} msg_t;
int main(){
int i, recvsz, prio;
mqd_t mqd;
struct mq_attr attr;
msg_t msgbuf;
if((mqd = mq_open(MQ_NAME, O_RDONLY)) == -1){
perror("mq_open()");
exit(EXIT_FAILURE);
}
if(mq_getattr(mqd, &attr) == -1){
perror("mq_getattr()");
exit(EXIT_FAILURE);
}
printf("maximum messages: %ld\n", attr.mq_maxmsg);
printf("message size: %ld\n", attr.mq_msgsize);
for(i = 0; i < attr.mq_maxmsg + 1; i++){
if(mq_getattr(mqd, &attr) == -1){
perror("mq_getattr()");
exit(EXIT_FAILURE);
}
printf("current message(s): %ld\n", attr.mq_curmsgs);
if((recvsz = mq_receive(mqd, (char *)&msgbuf, attr.mq_msgsize, &prio)) == -1){
perror("mq_receive()");
exit(EXIT_FAILURE);
}
printf(" id: %d\n", msgbuf.id);
printf(" prio: %d\n", prio);
printf(" content: %s\n\n", msgbuf.msg);
}
printf("THERE'S NO MESSAGE IN MQ\n");
//change flag to O_NONBLOCK
attr.mq_flags = attr.mq_flags | O_NONBLOCK;
if(mq_setattr(mqd, &attr, NULL) == -1){
perror("mq_setattr()");
exit(EXIT_FAILURE);
}
printf("added O_NONBLOCK flag\n");
if(mq_receive(mqd, (char *)&msgbuf, sizeof(msg_t), NULL) == -1){
if(errno == EAGAIN)
printf("NONBLOCK mode. Returned immediately\n");
else{
perror("mq_receive()");
exit(EXIT_FAILURE);
}
}
else
printf("??????????\n");
mq_close(mqd);
// remove message queue
if(mq_unlink(MQ_NAME) == -1){
perror("mq_unlink()");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
posix_mq_send
먼저 실행, 우선순위가 5인 6번째 메시지 {id=6, msg='frog'}를 enqueue하려 했으나 메시지 큐가 가득 차 blocking된 상태이다.
posix_mq_recv
가 실행되자 마자 sender에 의해 enqueue된 5개의 메시지 중 가장 우선순위가 높은 메시지가 읽혔다.
그리고 하나의 메시지가 receiver에 의해 dequeue되면서 여유 공간이 생겼고, 결국 6번째 메시지가 enqueue된다.
따라서 기존 나머지 4개의 메시지 + 새로 enqueue된 6번째 메시지, 총 5개의 메시지를 우선순위대로 읽는다.
큐에 있는 메시지를 모두 다 읽었다. 이때 mq_setattr()
로 O_NONBLOCK
플래그를 활성화한 후 mq_recv()
함수를 다시 호출했다. 그 결과 errno
의 값이 EAGAIN
으로 설정되었다.