Guide_Messaging with RabbitMQ

Dev.Hammy·2023년 12월 12일
0

Spring Guides

목록 보기
8/46
post-custom-banner

이 가이드는 메시지를 게시하고 구독하는 RabbitMQ AMQP 서버를 설정하고 해당 RabbitMQ 서버와 상호 작용할 Spring Boot 애플리케이션을 만드는 과정을 안내합니다.

AMQP는 Advanced Message Queuing Protocol(고급 메시지 큐잉 프로토콜)의 약자로, 메시지 지향 미들웨어를 위한 개방형 표준 프로토콜입니다. AMQP는 다양한 메시지 지향 미들웨어 시스템 간의 상호 운용성을 제공하는 역할을 합니다. 이 프로토콜은 메시지 전송, 큐잉, 라우팅 및 기타 다양한 메시징 패턴을 지원하여 분산 시스템에서의 효율적인 통신을 돕는 것이 목적입니다.

RabbitMQ는 AMQP의 오픈 소스 구현체 중 하나로, 다양한 프로그래밍 언어와 플랫폼에서 메시지 지향 애플리케이션을 구축하고 운영할 수 있는 유연하고 강력한 메시징 브로커입니다. AMQP 표준을 준수하여 데이터를 안전하고 신속하게 전달할 수 있도록 도와줍니다.

What You Will Build

Spring AMQP의 RabbitTemplate을 사용하여 메시지를 게시하고 MessageListenerAdapter를 사용하여 POJO에서 메시지를 구독하는 애플리케이션을 구축합니다.

RabbitMQ 브로커 설정

메시징 애플리케이션을 구축하기 전에 메시지 수신 및 전송을 처리할 서버를 설정해야 합니다.

RabbitMQ는 AMQP 서버입니다. 서버는 https://www.rabbitmq.com/download.html에서 무료로 사용할 수 있습니다. 수동으로 다운로드하거나 Homebrew가 설치된 Mac을 사용하는 경우 터미널 창에서 다음 명령을 실행하여 다운로드할 수 있습니다.

ubuntu 설치

https://www.rabbitmq.com/install-debian.html#supported-distributions

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Community mirror of Cloudsmith: modern Erlang repository
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Community mirror of Cloudsmith: RabbitMQ repository
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

# another mirror for redundancy
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

## Provides RabbitMQ
##
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main

# another mirror for redundancy
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing

docker compose 사용해보기

Docker가 로컬에서 실행 중인 경우 Docker Compose를 사용하여 RabbitMQ 서버를 빠르게 시작할 수도 있습니다. Github의 전체 프로젝트 루트에 docker-compose.yml이 있습니다. 다음 목록과 같이 매우 간단합니다.

rabbitmq:
  image: rabbitmq:management
  ports:
    - "5672:5672"
    - "15672:15672"

현재 디렉터리에 이 파일이 있으면 docker-compose up을 실행하여 RabbitMQ를 컨테이너에서 실행할 수 있습니다.

Ubuntu에서 Docker 설치

https://docs.docker.com/engine/install/ubuntu/#install-using-the-repository

  1. 호스트 머신에 도커 엔진을 설치하기 전 도코 apt 레파지토리를 설정
# Add Docker's official GPG key:
sudo apt-get update
sudo apt-get install ca-certificates curl gnupg
sudo install -m 0755 -d /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
sudo chmod a+r /etc/apt/keyrings/docker.gpg

# Add the repository to Apt sources:
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update
  1. 가장 최신 버전 설치하기
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

도커 컴포즈 플러그인과 CLI가 포함되어 있다.

  1. 잘 설치되었는지 hello-world 이미지 실행으로 확인해보기
sudo docker run hello-world

이 명령어는 컨테이너 안에서 실행될 테스트용 이미지를 다운 받습니다.

인텔리제이에 Docker 연결

docker 그룹에 사용자 추가하여 unix socket 연결 권한 얻기

  1. id -nG: 현재 사용자가 속한 그룹을 출력합니다.

  2. sudo groupadd docker: 'docker'라는 그룹을 만들려고 시도했으나 이미 존재하는 그룹이라고 알려줍니다.

  3. sudo usermod -aG docker $USER: 현재 사용자를 'docker' 그룹에 추가합니다. 이 명령어를 통해 현재 사용자에게 Docker 명령어를 실행할 권한을 주기 위해서입니다.

sudo systemctl start docker 명령을 실행하면 docker 프로세스가 생성됩니다. 해당 도커 프로세스에는 dockerd 데몬 스레드가 포함되어 있습니다. 이 명령은 기본 docker.sock Unix 소켓도 생성합니다. docker.sock 소켓은 dockerd 데몬 스레드에 의해 지속적으로 수신됩니다. 이를 통해 docker.pid 프로세스를 사용하여 커널 수준 IPC를 수행할 수 있습니다. 이 도커 소켓을 사용하려면 프로세스 수준(docker.pid)과 파일 수준(docker.sock)에서 적절한 권한이 필요합니다. 따라서 아래 두 명령을 실행하면 문제가 해결됩니다.

  1. sudo chmod a+rwx /var/run/docker.socksudo chmod a+rwx /var/run/docker.pid: Docker 소켓 파일과 PID 파일에 대한 권한을 변경하여 모든 사용자에게 읽기, 쓰기 및 실행 권한을 부여합니다.

  2. sudo systemctl start docker: Docker 서비스를 시작합니다.

compose.yamlservice 옆에 생긴 아이콘은 docker-compose up을 실행하여 RabbitMQ를 컨테이너 내부에서 실행한다.

Starting with SpringInitalizr

build.gradle

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.2.0'
    id 'io.spring.dependency-management' version '1.1.4'
}

group = 'guides'
version = '0.0.1-SNAPSHOT'

java {
    sourceCompatibility = '17'
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
}

tasks.named('test') {
    useJUnitPlatform()
}

compose.yaml

services:
  rabbitmq:
    image: 'rabbitmq:latest'
    environment:
      - 'RABBITMQ_DEFAULT_PASS=secret'
      - 'RABBITMQ_DEFAULT_USER=myuser'
    ports:
      - '5672'

Create RabbitMQ Message Receiver

메시징 기반 애플리케이션을 사용하려면 게시된 메시지에 응답하는 수신자를 만들어야 합니다. 다음 목록(src/main/java/guides.messagingrabbitmq/Receiver.java)에서는 이를 수행하는 방법을 보여줍니다.

package guides.messagingrabbitmq;

import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

@Component
public class Receiver {
    
    private CountDownLatch latch = new CountDownLatch(1);
    
    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
        latch.countDown();
    }
    
    public CountDownLatch getLatch() {
        return latch;
    }
    
}

Receiver는 메시지 수신 방법을 정의하는 POJO입니다. 메시지 수신을 위해 등록할 때 원하는 이름을 지정할 수 있습니다.

편의를 위해 이 POJO에는 CountDownLatch도 있습니다. 이를 통해 메시지가 수신되었음을 알릴 수 있습니다. 이는 프로덕션 애플리케이션에서 구현할 가능성이 없는 것입니다.

-CountDownLatch는 Java에서 동시성 및 다중 스레드 프로그래밍을 지원하는 유틸리티 클래스입니다. 일반적으로 CountDownLatch는 다음과 같은 상황에서 활용됩니다:

  1. 멀티스레드 환경에서 작업 동기화: 여러 스레드가 있는 상황에서 특정 작업이 시작되거나 완료될 때까지 스레드가 대기하도록 할 수 있습니다.
  2. 병렬 작업 완료 시점 동기화: 병렬로 실행되는 작업이 모두 완료될 때까지 대기할 때 사용됩니다.
  • CountDownLatch는 생성할 때 카운트를 지정하고, countDown() 메서드를 호출하여 카운트를 감소시킵니다. await() 메서드를 호출하여 카운트가 0이 될 때까지 대기하도록 설정할 수 있습니다.

  • 일반적으로 프로덕션 환경에서 CountDownLatch를 사용하여 이벤트를 처리하거나 작업 상태를 추적하는 데에는 일반적으로 적용되지 않습니다. 이는 다음과 같은 몇 가지 이유로 설명될 수 있습니다:

  1. 동기화 오버헤드: CountDownLatch를 사용하여 스레드 간에 작업을 동기화하는 것은 추가적인 오버헤드를 유발할 수 있습니다. 이것은 복잡성을 증가시키고 성능에 영향을 미칠 수 있습니다.

  2. 디버깅 어려움: CountDownLatch를 사용하면 코드의 복잡성이 증가하고, 다중 스레드 환경에서 예측하기 어려운 상태 관리 문제가 발생할 수 있습니다.

  3. 불안정성 및 잠재적인 오류: 이러한 동기화 메커니즘은 잘못 사용할 경우 데드락(Deadlock)이나 무한 대기 등의 문제를 야기할 수 있습니다.

대신, 프로덕션 환경에서는 더 안정적이고 신뢰성 있는 방법을 사용하여 작업을 관리하고 상태를 추적하는 것이 좋습니다. 예를 들어, 메시지 수신 여부를 추적하는 대신 로깅, 이벤트 기반 알림 시스템, 또는 상태 관리를 위한 특정 데이터베이스를 사용하는 것이 더 일반적인 방법입니다.

Register the Listener and Send a Message

Spring AMQP의 RabbitTemplate은 RabbitMQ로 메시지를 보내고 받는 데 필요한 모든 것을 제공합니다. 그러나 다음을 수행해야 합니다.

  • 메시지 리스너 컨테이너를 구성합니다.
  • 대기열, 교환 및 이들 간의 바인딩을 선언합니다.
  • 리스너를 테스트하기 위해 일부 메시지를 보내도록 구성 요소를 구성합니다.

Spring Boot는 연결 팩토리와 RabbitTemplate을 자동으로 생성하여 작성해야 하는 코드의 양을 줄입니다.

RabbitTemplate을 사용하여 메시지를 보내고, 메시지 수신자 컨테이너에 Receiver를 등록하여 메시지를 받습니다. 연결 팩토리는 둘 다 구동하여 RabbitMQ 서버에 연결할 수 있도록 합니다. 다음 목록(src/main/java/guides.messagingrabbitmq/MessagingRabbitmqApplication.java)은 애플리케이션 클래스를 생성하는 방법을 보여줍니다.

package guides.messagingrabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MessagingRabbitmqApplication {

    static final String topicExchangeName = "spring-boot-exchange";

    static final String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
    }

}

ListenerAdapter() 메소드에 정의된 Bean은 컨테이너(container()에 정의됨)에 메시지 리스너로 등록됩니다. spring-boot 큐에서 메시지를 수신합니다. Receiver 클래스는 POJO이므로 MessageListenerAdapter에 래핑되어야 하며 여기서 receiveMessage를 호출하도록 지정해야 합니다.

JMS 대기열과 AMQP 대기열은 의미가 다릅니다. 예를 들어 JMS는 대기열에 있는 메시지를 하나의 소비자에게만 보냅니다. AMQP 대기열은 동일한 작업을 수행하지만 AMQP 생산자는 메시지를 대기열에 직접 보내지 않습니다. 대신 단일 대기열로 이동하거나 여러 대기열로 팬아웃할 수 있는 교환으로 메시지가 전송되어 JMS 토픽의 개념을 에뮬레이션합니다.

  • '팬아웃(Fanout)'은 메시지 라우팅과 관련된 개념으로, 메시지를 한 송신자가 여러 수신자에게 전달하는 것을 의미합니다.

  • AMQP의 팬아웃은 특정 대기열로 메시지를 보내는 것이 아니라, 메시지를 교환(exchange)에 보내고, 해당 교환에 연결된 모든 큐(queue)로 메시지를 전송하는 기능을 말합니다. AMQP에서는 팬아웃을 사용하여 메시지를 여러 대기열에 퍼뜨려 다수의 소비자가 해당 메시지를 받을 수 있도록 할 수 있습니다.

  • 이를 JMS의 토픽(Topic) 개념과 유사하게 생각할 수 있습니다. JMS 토픽도 한 번의 메시지 발행으로 여러 구독자에게 메시지를 전송하는데 사용됩니다. AMQP의 팬아웃은 이러한 기능을 수행하면서도, 더 유연한 라우팅 및 교환을 통해 메시지를 효율적으로 관리하고 전달할 수 있는 장점이 있습니다.

메시지 리스너 컨테이너와 receiver Bean만 있으면 메시지를 수신할 수 있습니다. 메시지를 보내려면 Rabbit 템플릿도 필요합니다.

queue() 메서드는 AMQP 대기열을 생성합니다. exchange() 메서드는 주제 교환을 생성합니다. binding() 메서드는 이 두 가지를 함께 바인딩하여 RabbitTemplate이 교환에 게시할 때 발생하는 동작을 정의합니다.

Spring AMQP를 제대로 설정하려면 Queue, TopicExchangeBinding이 최상위 Spring 빈으로 선언되어야 합니다.

이 경우 주제 교환을 사용하고 대기열은 foo.bar.#이라는 라우팅 키로 바인딩됩니다. 이는 foo.bar로 시작하는 라우팅 키로 전송되는 모든 메시지를 의미합니다. 대기열로 라우팅됩니다.

빈 간의 의존성

빈 간의 의존성을 확인하려면 스프링 컨테이너의 빈 구성 정보를 확인해야 합니다. 이를 위해 스프링 프레임워크는 다양한 방법을 제공합니다.

  1. 의존성 그래프 출력: 스프링 컨테이너는 내부적으로 빈의 의존성 그래프를 가지고 있습니다. 이를 출력하려면 ApplicationContextgetBeanFactory() 메서드를 사용하여 DefaultListableBeanFactory를 가져온 다음, getDependenciesForBean() 등의 메서드를 사용하여 의존성을 출력할 수 있습니다. 하지만 이 방법은 비교적 복잡하고 직접적인 방법이 아닙니다.

  2. 스프링 부트 Actuator 사용: Actuator의 /actuator/dependencies 엔드포인트를 호출하여 의존성 정보를 확인할 수 있습니다. 이 엔드포인트는 의존성 트리를 JSON 형태로 제공하여 각 빈 간의 의존성을 볼 수 있습니다.

  3. 의존성 시각화 도구: 일부 도구는 스프링 애플리케이션의 빈 구성을 시각적으로 나타내는 기능을 제공합니다. 이를 통해 빈 간의 의존성을 쉽게 파악할 수 있습니다.

  4. IDE의 스프링 관련 플러그인: 일부 IDE는 스프링 빈 간의 의존성을 시각적으로 표시해주는 플러그인을 제공합니다. IntelliJ IDEA 등의 IDE에서는 이러한 플러그인을 통해 빈 간의 의존성을 쉽게 확인할 수 있습니다.

이러한 방법들 중에서 선택하여 빈 간의 의존성을 확인할 수 있습니다. Actuator 엔드포인트나 의존성 시각화 도구를 사용하는 것이 더 직관적이고 편리할 수 있습니다.


Send a Test Message

테스트 메시지 보내기
이 샘플에서 테스트 메시지는 CommandLineRunner에 의해 전송됩니다. CommandLineRunner는 receiver의 래치를 기다리고 애플리케이션 컨텍스트를 닫습니다. 다음 목록(src/main/java/guides.messagingrabbitmq/Runner.java)은 작동 방식을 보여줍니다.

package guides.messagingrabbitmq;

import java.util.concurrent.TimeUnit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

    private final RabbitTemplate rabbitTemplate;
    private final Receiver receiver;

    public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
        this.receiver = receiver;
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Sending message...");
        rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
        receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
    }

}

템플릿은 binding과 일치하는 foo.bar.baz의 라우팅 키를 사용하여 exchange로 메시지를 라우팅합니다.

테스트에서는 수신기를 별도로(독립적으로 isolation) 테스트할 수 있도록 실행기를 모의할 수 있습니다.

Run the Application

main() 메소드는 Spring 애플리케이션 컨텍스트를 생성하여 해당 프로세스를 시작합니다. 그러면 메시지 수신(listening)을 시작하는 메시지 리스너 컨테이너가 시작됩니다. 자동으로 실행되는 Runner Bean이 있습니다. 애플리케이션 컨텍스트에서 RabbitTemplate을 검색하고 Hello from RabbitMQ!를 보냅니다. spring-boot 대기열에 메시지가 표시됩니다. 마지막으로 Spring 애플리케이션 컨텍스트를 닫고 애플리케이션이 종료됩니다.

    Sending message...
    Received <Hello from RabbitMQ!>

post-custom-banner

0개의 댓글