Message queue란 프로세스 간의 정보를 주고 받는 통신(IPC, Inter-process Communication)을 위해 사용하는 기술이다. 이름 그대로 queue를 사용하며 메시지를 주고 받는다.
Queue를 사용하기 때문에 기본적으로 FIFO 방식이다. 따라서 먼저 들어온 메시지를 먼저 수신하지만, (메시지 큐의 msgtype 타입에 따라) 특정 메시지를 먼저 수신할 수도 있다.
IPC (Inter-process Communication)을 검색하면 Message passing 이라는 IPC 모델이 찾을 수 있다. Message passing과 Message queue는 어떻게 다를까?
일단 Message passing은 구현보다는 개념적으로 사용되는 용어이다. 프로세스 간의 통신을 하기 위해 메시지를 어떻게 전송할지에 대한 방법론이고 그 구체적인 방법 중 하나가 Message queue 구현 방법이다.
또한, message passing은 동기적으로 전송할 수도 있지만, Message queue는 비동기적으로 전송하여 좀 더 효율적이며, queue가 버퍼의 역할을 하기에 어딘가에 저장되어 있다. 그 구체적인 위치는 kernel level이며 프로세스가 종료되어도 함수나 특정 동작으로 삭제하지 않으면 남아있게 된다.
Message queue 구현을 위해서는 아래의 3개의 헤더 파일이 필요하다.
#include <sys/msg.h>
#include <sys/ipc.h>
#include <sys/types.h>
위의 헤더 파일에서 정의하고 있는 몇 가지 중요한 시스템 콜들이 있다.
int masgget (key_t key, int msgflg);
int msgsnd(int msqid, struct msgbuf * msgp, size_t msgsize, int msgflg);
ssize_t msgrcv(int msgid, struct msgbuf * msgp, size_t msgsize, long msgtype, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
// for Message queue
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define BUFFER_SIZE 1024
typedef struct { // Message buffer structure
long msgtype; // Message type, must be > 0 with 'long' data type
int value;
char buf[BUFFER_SIZE]; // Message data to push in queue
} msgbuf;
int main() {
int cnt = 0;
int key_id; // Message queue ID
msgbuf msg;
msg.msgtype = 1;
key_id = msgget((key_t) 1234, IPC_CREAT|0666); // Create Message (message queue key, message flag)
if (key_id == -1) {
printf("Message Get Failed!\n");
exit(0);
}
while (1) {
msg.value = ++cnt;
if (cnt >= 10) {
printf("Message Sending Finished!\n");
break;
}
if (msgsnd(key_id, &msg, sizeof(msg), IPC_NOWAIT) == -1) { // IPC_NOWAIT flag: if no more queu space, fail instead of blocking
printf("Message Sending Failed!\n");
exit(0);
}
printf("value: %d\n", msg.value);
sleep(1);
}
exit(0);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
// for Message queue
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define BUFFER_SIZE 1024
typedef struct {
long msgtype;
int value;
char buf[BUFFER_SIZE];
} msgbuf;
int main() {
int key_id;
msgbuf msg;
msg.msgtype = 1;
key_id = msgget((key_t) 1234, IPC_CREAT|0666); // Create Message (message queue key, message flag)
if (key_id == -1) {
printf("Message Get Failed!\n");
exit(0);
}
while (1) {
if (msgrcv(key_id, &msg, sizeof(msg), 1, 0) == -1) { // Receive if msgtype is 1
printf("Message Receiving Finished!\n");
exit(0);
}
printf("value: %d\n", msg.value);
}
printf("Message Receiving Finished!\n");
exit(0);
}
아래는 Sender와 Receiver 코드의 실행 결과 화면이다.
그림 1) Sender 실행 결과 화면
Sender는 1초 간격으로 value 값을 1씩 늘려주며 메시지를 순차적으로 Queue에 삽입하여 전송을 시도한다. ipcs -q 명령어로 message queue를 확인해보니 총 10개의 메시지가 삽입되어 있는 것을 알 수 있다.
그림 2) Receiver 실행 결과 화면
Receiver는 큐에 있는 메시지를 꺼내어 수신한다. ipcs를 확인해보니 message가 이제 하나도 없는 것을 알 수 있다. 모두 수신했기 때문이다.