POSIX IPC - Message Queue

Merry Berry·2024년 2월 14일
0

System Programming

목록 보기
2/2
post-thumbnail

1. create & open

#include <fcntl.h> /* Defines O_* constants */
#include <sys/stat.h> /* Defines mode constants */
#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag, ...
				 /* mode_t mode, struct mq_attr *attr */);
/* Returns a message queue descriptor on success, or (mqd_t) –1 on error */

oflag에 입력될 값은 아래와 같다.

attr은 메시지 큐의 속성을 나타내는 struct mq_attr 인자이다.

struct mq_attr {
   long mq_flags; /* Message queue description flags: 0 or O_NONBLOCK
   [mq_getattr(), mq_setattr()] */
	long mq_maxmsg; /* Maximum number of messages on queue
    [mq_open(), mq_getattr()] */
   long mq_msgsize; /* Maximum message size (in bytes)
   [mq_open(), mq_getattr()] */
   long mq_curmsgs; /* Number of messages currently in queue
   [mq_getattr()] */
};

mq_open()에서는 mq_maxmsg, mq_msgsize만 읽는다.

  • mq_maxmsg: mq_send() 호출로 넣을 수 있는 최대 메시지 수. 0보다 커야 함
  • mq_msgsize: 메시지 큐에 넣을 수 있는 메시지 크기 상한. 0보다 커야 함

2. close

#include <mqueue.h>

int mq_close(mqd_t mqdes);
/* Returns 0 on success, or –1 on error */

메시지 큐 디스크립터를 닫는다. 이 함수는 프로세스가 종료되거나 exec() 시스템 콜을 호출했을 경우 자동으로 수행한다. 만약 mq_notify()로 등록한 통지가 있다면 해제된다.

3. removal

#include <mqueue.h>

int mq_unlink(const char *name);
/* Returns 0 on success, or –1 on error */

name에 해당되는 메시지 큐의 reference count가 0이 되면 제거한다.

4. message queue descriptor

각 프로세스마다 존재하는 message queue descriptor table의 엔트리에 있는 포인터는 시스템에 전역으로 존재하는 open message queue description table의 엔트리를 가리킨다. 그리고 그 엔트리는 시스템에 전역으로 존재하는 message queue table의 엔트리를 가리킨다.
프로세스 A, B에서 메시지 큐 디스크립터 x가 모두 동일한 열린 메시지 큐 디스크립션 테이블의 엔트리를 가리킨다. fork()한 이후 부모와 자식 프로세스의 파일 디스크립터 번호가 복사된 후 동일한 열린 파일 테이블 엔트리를 가리키는 것과 동일하다.
또한 A의 메시지 큐 디스크립터 z와 B의 메시지 큐 디스크립터 y는 서로 다른 열린 메시지 큐 디스크립션 테이블 엔트리를 가리키지만, 결론적으로는 동일한 메시지 큐를 가리킨다.

5. message queue attribute

#include <mqueue.h>

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
/* Returns 0 on success, or –1 on error */

메시지 큐의 속성 정보를 attr을 통해 받아오거나 설정한다.

  • mq_flags: message queue description의 플래그
  • mq_curmsgs: 현재 큐에 남아 있는 메시지의 수

SUSv3에서는 mq_setattr()로 수정 가능한 유일한 속성은 O_NONBLOCK 플래그 상태이다.

6. enqueue & dequeue

#include <mqueue.h>

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
/* Returns 0 on success, or –1 on error */

메시지를 큐에 넣는다. 이때 msg_lenmsg_ptr이 가리키는 메시지의 크기를 명시하며, 이 값은 mq_msgsize보다 작거나 동일해야 한다. 그렇지 않으면 EMSGSIZE 에러가 발생한다. 길이가 0인 메시지도 가능하다.
msg_prio는 메시지의 우선순위 값으로, 클 수록 우선순위가 높다. 메시지 큐 내에서는 각 메시지의 msg_prio 값을 기준으로 내림차순 정렬한다. 우선순위를 사용할 필요가 없다면 해당 필드를 0으로 설정한다.
메시지 큐가 가득찬 경우, O_NONBLOCK 플래그가 비활성화되어 있으면 blocking되고, 활성화되어 있다면 EAGAIN 에러가 발생하며 바로 리턴한다.

#include <mqueue.h>

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
/* Returns number of bytes in received message on success, or –1 on error */

메시지 큐에서 가장 우선순위가 높은 메시지를 dequeue하여 읽는다. msg_lenmsg_ptr이 가리키는 버퍼의 가용 크기를 의미하며, 이 값은 mq_maxsize보다 크거나 같아야 한다. 그렇지 않으면 EMSGSIZE 에러가 발생한다.
msg_prioNULL이 아니면 읽는 메시지의 우선순위 값을 저장한다.
메시지 큐가 비어있는 경우, O_NONBLOCK 플래그가 비활성화되어 있으면 blocking되고, 활성화되어 있다면 EAGAIN 에러가 발생하며 바로 리턴한다.

7. example

7.1. posix_mq_send.c

#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <sys/stat.h>

#define MQ_NAME "/myposixmq"
#define MQ_MAXMSG 5

typedef struct{
        int id;
        char msg[16];
} msg_t;


int main(){
        int i;
        mqd_t mqd;
        struct mq_attr attr;
        msg_t msgbox[MQ_MAXMSG + 1] = {
                {.id = 1, .msg = "apple"},
                {.id = 2, .msg = "banana"},
                {.id = 3, .msg = "carrot"},
                {.id = 4, .msg = "dog"},
                {.id = 5, .msg = "elephant"},
                {.id = 6, .msg = "frog"} };

        attr.mq_maxmsg = MQ_MAXMSG;
        attr.mq_msgsize = sizeof(msg_t);

        if((mqd = mq_open(MQ_NAME, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR, &attr)) == (mqd_t)-1){
                perror("mq_open()");
                exit(EXIT_FAILURE);
        }

        for(i = 0; i < MQ_MAXMSG; i++){
                if(mq_send(mqd, (const char *)&msgbox[i], sizeof(msg_t), i) == -1){
                        perror("mq_send()");
                        exit(EXIT_FAILURE);
                }
                printf("sended message {id: %d}\n", i + 1);
        }

        //queue is full
        if(mq_send(mqd, (const char *)&msgbox[MQ_MAXMSG], sizeof(msg_t), 5) == -1){
                perror("mq_send()");
                exit(EXIT_FAILURE);
        }

        printf("sended last message {id: %d}\n", MQ_MAXMSG + 1);

        mq_close(mqd);
        exit(EXIT_SUCCESS);
}

7.2. posix_mq_recv.c

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <sys/stat.h>

#define MQ_NAME "/myposixmq"

typedef struct{
        int id;
        char msg[16];
} msg_t;


int main(){
        int i, recvsz, prio;
        mqd_t mqd;
        struct mq_attr attr;
        msg_t msgbuf;


        if((mqd = mq_open(MQ_NAME, O_RDONLY)) == -1){
                perror("mq_open()");
                exit(EXIT_FAILURE);
        }

        if(mq_getattr(mqd, &attr) == -1){
                perror("mq_getattr()");
                exit(EXIT_FAILURE);
        }
        printf("maximum messages: %ld\n", attr.mq_maxmsg);
        printf("message size: %ld\n", attr.mq_msgsize);

        for(i = 0; i < attr.mq_maxmsg + 1; i++){
                if(mq_getattr(mqd, &attr) == -1){
                        perror("mq_getattr()");
                        exit(EXIT_FAILURE);
                }
                printf("current message(s): %ld\n", attr.mq_curmsgs);

                if((recvsz = mq_receive(mqd, (char *)&msgbuf, attr.mq_msgsize, &prio)) == -1){
                        perror("mq_receive()");
                        exit(EXIT_FAILURE);
                }
                printf("    id:      %d\n", msgbuf.id);
                printf("    prio:    %d\n", prio);
                printf("    content: %s\n\n", msgbuf.msg);
        }

        printf("THERE'S NO MESSAGE IN MQ\n");

        //change flag to O_NONBLOCK
        attr.mq_flags = attr.mq_flags | O_NONBLOCK;
        if(mq_setattr(mqd, &attr, NULL) == -1){
                perror("mq_setattr()");
                exit(EXIT_FAILURE);
        }

        printf("added O_NONBLOCK flag\n");

        if(mq_receive(mqd, (char *)&msgbuf, sizeof(msg_t), NULL) == -1){
                if(errno == EAGAIN)
                        printf("NONBLOCK mode. Returned immediately\n");
                else{
                        perror("mq_receive()");
                        exit(EXIT_FAILURE);
                }
        }
        else
                printf("??????????\n");

        mq_close(mqd);
        // remove message queue
        if(mq_unlink(MQ_NAME) == -1){
                perror("mq_unlink()");
                exit(EXIT_FAILURE);
        }

        exit(EXIT_SUCCESS);
}

7.3 results

posix_mq_send 먼저 실행, 우선순위가 5인 6번째 메시지 {id=6, msg='frog'}를 enqueue하려 했으나 메시지 큐가 가득 차 blocking된 상태이다.

posix_mq_recv가 실행되자 마자 sender에 의해 enqueue된 5개의 메시지 중 가장 우선순위가 높은 메시지가 읽혔다.

그리고 하나의 메시지가 receiver에 의해 dequeue되면서 여유 공간이 생겼고, 결국 6번째 메시지가 enqueue된다.

따라서 기존 나머지 4개의 메시지 + 새로 enqueue된 6번째 메시지, 총 5개의 메시지를 우선순위대로 읽는다.

큐에 있는 메시지를 모두 다 읽었다. 이때 mq_setattr()O_NONBLOCK 플래그를 활성화한 후 mq_recv() 함수를 다시 호출했다. 그 결과 errno의 값이 EAGAIN으로 설정되었다.

0개의 댓글

관련 채용 정보