비동기 메시지를 사용하는 다른 응용 프로그램 사이에서 데이터 송수신
MOM을 구현한 시스템을 Message Queue:MQ 라고 한다.
프로세스 또는 프로그램 인스턴스가 데이터를 서로 교환할 때 사용하는 통신 방법(IPC : Inter Process Communication)중 하나이다.
메시지 큐는 메시지를 임시로 저장하는 간단한 버퍼라고 할 수 있다.
Queue 방식을 사용하기 때문에 기본적으로 FIFO 방식이다. 따라서 먼저 들어온 메시지를 먼저 수신하지만, 메시지 큐의 타입에 따라 특정 메시지를 먼저 수신할 수 있다.
메시지를 송/수신 할 때 Producer과 Consumer사이에 메시지 큐를 둔다.
중간에 메시지 큐를 둠으로써 동기방식의 데이터 통신에서 발생할 수 있는 병목현상을 방지할 수 있고, 중간 미들웨어에게 메시지를 위임하여 순차적으로 처리할 수 있게 한다.
각 메시지는 하나의 소비자에 의해 한 번만 처리된다 -> 1:1 통신방식
별도의 공정 작업을 연기할 수 있는 유연성을 제공하여 SOA(Service-Oriendted Architecture)의 개발에 도움을 줄 수 있다.
Message Queue의 오픈소스에 기반한 표준 프로토콜
대표적으로 RabbitMQ가 있다.
대용량 데이터를 처리하기 위한 배치 작업, 채팅, 비동기 데이터를 처리할 때 사용
웹 사이트의 비밀번호를 잊어버려 이메일을 통해 임시 비밀번호를 받는 경우, 인증 코드를 받는다.
필요한 헤더파일 (C Language)
#include <sys/msg.h> #include <sys/ipc.h> #include <sys/types.h>
Create Message
int masgget (key_t key, int msgflg);
Send Message
int msgsnd (int msqid, struct msgbuf * msgp, size_t msgsize, int msgflg);
Receive Message
ssize_t msgrcv (int msgid, struct msgbuf * msgp, size_t msgsize, long msgtype, int msgflg);
Control Message
int msgctl (int msqid, int cmd, struct msqid_ds *buf);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
// for Message queue
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define BUFFER_SIZE 1024
typedef struct { // Message buffer structure
long msgtype; // Message type, must be > 0 with 'long' data type
int value;
char buf[BUFFER_SIZE]; // Message data to push in queue
} msgbuf;
int main() {
int cnt = 0;
int key_id; // Message queue ID
msgbuf msg;
msg.msgtype = 1;
key_id = msgget((key_t) 1234, IPC_CREAT|0666); // Create Message (message queue key, message flag)
if (key_id == -1) {
printf("Message Get Failed!\n");
exit(0);
}
while (1) {
msg.value = ++cnt;
if (cnt >= 10) {
printf("Message Sending Finished!\n");
break;
}
if (msgsnd(key_id, &msg, sizeof(msg), IPC_NOWAIT) == -1) { // IPC_NOWAIT flag: if no more queu space, fail instead of blocking
printf("Message Sending Failed!\n");
exit(0);
}
printf("value: %d\n", msg.value);
sleep(1);
}
exit(0);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
// for Message queue
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define BUFFER_SIZE 1024
typedef struct {
long msgtype;
int value;
char buf[BUFFER_SIZE];
} msgbuf;
int main() {
int key_id;
msgbuf msg;
msg.msgtype = 1;
key_id = msgget((key_t) 1234, IPC_CREAT|0666); // Create Message (message queue key, message flag)
if (key_id == -1) {
printf("Message Get Failed!\n");
exit(0);
}
while (1) {
if (msgrcv(key_id, &msg, sizeof(msg), 1, 0) == -1) { // Receive if msgtype is 1
printf("Message Receiving Finished!\n");
exit(0);
}
printf("value: %d\n", msg.value);
}
printf("Message Receiving Finished!\n");
exit(0);
}