Message Queue

sz L·2023년 9월 12일
0

통신

목록 보기
6/11

MOM(Message Oriented Middleware)_메시지 지향 미들웨어

비동기 메시지를 사용하는 다른 응용 프로그램 사이에서 데이터 송수신
MOM을 구현한 시스템을 Message Queue:MQ 라고 한다.

Message Queue

프로세스 또는 프로그램 인스턴스가 데이터를 서로 교환할 때 사용하는 통신 방법(IPC : Inter Process Communication)중 하나이다.

메시지 큐는 메시지를 임시로 저장하는 간단한 버퍼라고 할 수 있다.

Queue 방식을 사용하기 때문에 기본적으로 FIFO 방식이다. 따라서 먼저 들어온 메시지를 먼저 수신하지만, 메시지 큐의 타입에 따라 특정 메시지를 먼저 수신할 수 있다.

메시지를 송/수신 할 때 Producer과 Consumer사이에 메시지 큐를 둔다.

중간에 메시지 큐를 둠으로써 동기방식의 데이터 통신에서 발생할 수 있는 병목현상을 방지할 수 있고, 중간 미들웨어에게 메시지를 위임하여 순차적으로 처리할 수 있게 한다.

각 메시지는 하나의 소비자에 의해 한 번만 처리된다 -> 1:1 통신방식

별도의 공정 작업을 연기할 수 있는 유연성을 제공하여 SOA(Service-Oriendted Architecture)의 개발에 도움을 줄 수 있다.

Message Queue의 특징

  • 메시지 큐는 사용자의 요청과 응답에 시간이 걸림
  • consumer가 실제로 메시지를 어느 시점에서 가져가서 처리해야할지 보장되지 않음
  • 메시지 큐에 저장된 데이터는 언젠가는 소비되어 처리 됨 : 비동기적 특징
  • 대용량 데이터를 처리하기 위한 배치 작업, 채팅 서비스, 비동기 데이터를 처리할 때 사용
  • 사용자가 많아지거나 데이터가 많아지면 응답 지연으로 서비스가 정상적으로 작동하지 못하는 상황 발생
    • 기존에 분산되어 있던 데이터 처리를 한 곳에 집중하면서 메시지 브로커를 통해 필요한 프로그램에 작업을 분산시키는 방법
  • 실패하면 치명적인 핵심 작업보다 어플리케이션의 부가적인 기능에 사용하는 것이 적합
    • MSA의 구현이 중시되면서 많은 MSA 패턴들이 트랜잭션 처리를 위해 메시지 큐를 사용하기 시작
      ※ MSA(MicroService Architecture) : 소프트웨어의 모든 구성요소가 한 프로젝트에 통합되어 있는 형태이다.

AMQP

Message Queue의 오픈소스에 기반한 표준 프로토콜
대표적으로 RabbitMQ가 있다.

  • 목적
    서로 다른 시스템 간에 최대한 효율적인 방법으로 메시지를 교환하기 위한 MQ 프로토콜
  • AMQP 요구사항
    AMQP는 벤더에 종속되는 것을 방지하기 위해 다음과 같은 조건을 충족해야 한다.
    벤더 종속에 대한 링크 click
    1. 모든 broker들이 동일한 방식으로 동작
    2. 모든client들이 동일한 방식으로 동작
    3. 네트워크로 전송되는 명령들의 표준화
    4. 프로그래밍 언어에 중립적

Message Queue의 장점

  1. 비동기
    • 생산된 메시지의 저장, 전송에 대해 동기화 처리를 하지 않음(메시지 큐에 저장해두기 때문에 나중에 처리 가능)
  2. 낮은 결합도
    • producer와 consumer가 독립적으로 행동 : 서비스 결합도가 낮아짐
  3. 확장성
    • 다수의 프로세스들이 큐에 메시지를 보낼 수 있음
  4. 탄력성
    • consumer 서비스가 down되더라도 애플리케이션이 중단되지 않고 메시지는 메시지 큐에 계속 남아있음 -> 서비스 일부의 실패가 시스템 전체에 영향을 주지 않음
  5. 보장성
    • 큐에 저장되어 있는 모든 메시지들이 결국에는 consumer에 의해 처리된다
  6. 과잉
    • 실패 시 재실행이 가능

Message Queue 사용처

대용량 데이터를 처리하기 위한 배치 작업, 채팅, 비동기 데이터를 처리할 때 사용

  • 다른 곳의 API로부터 데이터 송수신이 가능
  • 다양한 애플리케이션에서 비동기 통신이 가능
  • 이메일 발송 및 문서 업로드가 가능
  • 많은 양의 프로세스들을 처리할 수 있음

Message Queue 이해하기_이메일 전송

웹 사이트의 비밀번호를 잊어버려 이메일을 통해 임시 비밀번호를 받는 경우, 인증 코드를 받는다.

  1. 비밀번호 재설정을 위해 이메일을 발급하는 서비스, 회원가입을 위해 이메일을 발급하는 서비스 등은 메시지(이메일)를 큐에 넣는다.
  2. 이메일 전송 전용 서비스는 이메일이 어느 서비스로부터 생산되어는지와는 관계없이 메시지 큐의 메시지를 하나씩 소비하고, 이메일이 전송되어야 할 곳으로 이메일 전송한다.
  3. 메시지 큐에 들어오는 메시지 수가 너무 많아지는 경우 이메일 전송 전용 서비스 인스턴스를 더 둠으로서 확장할 수 있어서 확장성이 좋다.

Message Queue 구현하기

필요한 헤더파일 (C Language)

#include <sys/msg.h>
#include <sys/ipc.h>
#include <sys/types.h>

Create Message

int masgget (key_t key, int msgflg);

Send Message

int msgsnd (int msqid, struct msgbuf * msgp, size_t msgsize, int msgflg);

Receive Message

ssize_t msgrcv (int msgid, struct msgbuf * msgp, size_t msgsize, long msgtype, int msgflg);

Control Message

int msgctl (int msqid, int cmd, struct msqid_ds *buf);

Sender.c

c코드 예제
윈도우에서 리눅스처럼 gcc 사용하기1

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
// for Message queue
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define BUFFER_SIZE 1024

typedef struct { // Message buffer structure
  long msgtype; // Message type, must be > 0 with 'long' data type
  int value;
  char buf[BUFFER_SIZE]; // Message data to push in queue
} msgbuf;

int main() {
    int cnt = 0;
    int key_id; // Message queue ID
    msgbuf msg;
    msg.msgtype = 1;

    key_id = msgget((key_t) 1234, IPC_CREAT|0666); // Create Message (message queue key, message flag)

    if (key_id == -1) {
        printf("Message Get Failed!\n");
        exit(0);
    }
    
    while (1) {
        msg.value = ++cnt;
    
        if (cnt >= 10) {
            printf("Message Sending Finished!\n");
            break;
        }

        if (msgsnd(key_id, &msg, sizeof(msg), IPC_NOWAIT) == -1) { // IPC_NOWAIT flag: if no more queu space, fail instead of blocking
            printf("Message Sending Failed!\n");
            exit(0);
        }
        
        printf("value: %d\n", msg.value);
        sleep(1);
    }
    exit(0);
}

Receive.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
// for Message queue
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define BUFFER_SIZE 1024

typedef struct {
    long msgtype;
    int value;
    char buf[BUFFER_SIZE];
} msgbuf;

int main() {
    int key_id;
    msgbuf msg;
    msg.msgtype = 1;
    
    key_id = msgget((key_t) 1234, IPC_CREAT|0666); // Create Message (message queue key, message flag)
    
    if (key_id == -1) {
        printf("Message Get Failed!\n");
        exit(0);
    }
    
    while (1) {
        if (msgrcv(key_id, &msg, sizeof(msg), 1, 0) == -1) { // Receive if msgtype is 1
            printf("Message Receiving Finished!\n");
            exit(0);
        }
        
        printf("value: %d\n", msg.value);
    }

    printf("Message Receiving Finished!\n");
    exit(0);
}
profile
가랑비는 맞는다 하지만 폭풍은 내 것이야

0개의 댓글