RabbitMQ 프로듀서, 컨슈머 애플리케이션 만들기 (w. Spring boot)

🔥Log·2024년 6월 10일
0

RabbitMQ

목록 보기
5/9

✍️ 이 글에서 사용한 깃헙

☕ 개요


이번 글에서는 RabbitMQ의 프로듀서와 컨슈머를 Spring boot 서버로 만들어보도록 하겠다.
혹시나 RabbitMQ에 대한 개념 및 사용법에 대해서 알고 싶다면, 이전 글들을 참고해주면 감사하겠다.

ㄱㄱ 🔥

🤔 어떤 방식으로 Publish/consume할 것인지?

프로듀서에서는 특정 클래스를 Json으로 RabbitMQ에 보내고, 컨슈머는 Json으로된 Body를 다시 인스턴스로 변환해서 처리를 하는 상황이라고 가정하고 개발을 해보도록 하겠다.



🧐 사전 준비


코드 레벨에서도 Exchange, Queue 등등을 만들어줄 수 있지만, 나는 Management UI에서 미리 만들어두도록 하겠다.

1) Exchange 생성

나는 Fanout exchange를 하나 만들어주었다.

2) Queue 생성 & 바인딩

q.app.event라는 이름의 Classic 큐를 하나 만들고, 위 Exchange와 바인딩해주었다.



💻 프로젝트 세팅


1) build.gradle

plugins {
	id 'java'
	id 'org.springframework.boot' version '3.3.0'
	id 'io.spring.dependency-management' version '1.1.5'
}

allprojects  {
	group = 'study'
	version = '1.0.0'

	repositories {
		mavenCentral()
	}
}


subprojects {
	apply plugin: 'java'
	apply plugin: 'org.springframework.boot'
	apply plugin: 'io.spring.dependency-management'

	sourceCompatibility = '17'

	configurations {
		compileOnly {
			extendsFrom annotationProcessor
		}
	}

	dependencies {
		implementation 'org.springframework.boot:spring-boot-starter-amqp'
		implementation 'org.springframework.boot:spring-boot-starter-web'
		implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.0.4'

		compileOnly 'org.projectlombok:lombok'
		annotationProcessor 'org.projectlombok:lombok'

		testImplementation 'org.springframework.boot:spring-boot-starter-test'
		testImplementation 'org.springframework.amqp:spring-rabbit-test'
		testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
		testCompileOnly 'org.projectlombok:lombok'
		testAnnotationProcessor 'org.projectlombok:lombok'
	}

	tasks.named('test') {
		useJUnitPlatform()
	}

}

project(':rabbitmq-consumer') {
	bootJar { enabled = true }
	jar { enabled = true }

	dependencies {
		implementation project(':rabbitmq-common')
	}
}

project(':rabbitmq-producer') {
	bootJar { enabled = true }
	jar { enabled = true }

	dependencies {
		implementation project(':rabbitmq-common')
	}
}

project(':rabbitmq-common') {
	bootJar { enabled = false }
	jar { enabled = true }
}

bootJar.enabled = false

이번 글에서는 Gradle 멀티 모듈을 활용해서 프로듀서와 컨슈머를 하나의 코드 베이스에서 만들 예정이다. 그래서 위와 같이 조금은 특이한 형태로 build.gradle을 작성해준다.

여기서 중요한 것은 아래 2개의 패키지가 포함되어 있으면 된다는 것이다.

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-amqp'
	implementation 'org.springframework.boot:spring-boot-starter-web'
}

2) setting.gradle

rootProject.name = 'spring-rabbitmq'
include 'rabbitmq-producer'
include 'rabbitmq-consumer'
include 'rabbitmq-common'

프로듀서 모듈, 컨슈머 모듈, 공통적으로 사용될 모듈을 포함시켜준다.

3) MessageDTO

import lombok.*;

import java.io.Serializable;

@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {

    private String id;
    private String message;

}

프로듀서와 컨슈머가 주고 받을 클래스를 하나 만들어주자. 이 클래스는 공통 모듈 안에 만들어주면 된다.

위 클래스까지 만들었다면, 일단 프로젝트 세팅은 완료가 되었다.
이제 본격적으로 프로듀서와 컨슈머를 만들어보자.



📥 Producer


1) application.yml

server:
  port: 8000
spring:
  application:
    name: producer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

springdoc:
  show-actuator: false
  swagger-ui:
    doc-expansion: none

2) RabbitConfig

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    MessageConverter messageConverter() {
        return new SimpleMessageConverter();
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

}

RabbitMQ 기능을 편리하게 사용할 수 있도록 AMQP 라이브러리에서는 RabbitTemplate를 제공한다. 이 클래스를 Bean으로 등록해주자.

여기서 중요한 점은 SimpleMessageConverter로 메세지 컨버터를 등록해줘야한다는 점이다. SimpleMessageConverter는 Publish된 데이터를 추가적인 인코딩이나 변조 없이 Publish한다.

3) RabbitProducer

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitProducer {

    private final RabbitTemplate rabbitTemplate;

    public String sendMessage(String routingKey, Object obj) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        String json = objectMapper.writeValueAsString(obj);

        rabbitTemplate.convertAndSend("ex.app.event", routingKey, json);
        return json;
    }

}

아주 간단한 기능을 갖고 있는 프로듀서 컴포넌트를 하나 만들어주었다. HeadersProperties는 건들지 않고, Routing key와 Body만 전송할 수 있는 메서드이다.

4) ProducerController

import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import study.rabbitmq.dto.MessageDTO;

@Slf4j
@RestController
@RequiredArgsConstructor
public class ProducerController {

    private final RabbitProducer rabbitProducer;

    @PostMapping("/send/{key}")
    public String send(@PathVariable("key") String key, MessageDTO message) throws JsonProcessingException {
        return rabbitProducer.sendMessage(key, message);
    }

}

위에서 만든 기능을 API호출로 사용하고 싶어서 컨트롤러를 하나 만들어주었다.

5) 동작 확인

서버를 실행하고, Swagger 문서에서 다양한 메세지를 Publish해보자.

그러면, Management UI에서 큐에 메세지들이 잘 쌓인 것을 확인할 수 있다.



📤 Consumer


컨슈밍하는 기능을 구현하는 방법은 몇 가지가 있는데, 그 중에서 가장 심플한 방법으로 구현해보도록 하겠다.

1) application.yml

server:
  port: 9000
spring:
  application:
    name: consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      direct:
        prefetch: 10
      simple:
        prefetch: 10

springdoc:
  show-actuator: false
  swagger-ui:
    doc-expansion: none

2) RabbitConsumer

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import study.rabbitmq.dto.MessageDTO;

@Slf4j
@Component
public class RabbitConsumer {

    @RabbitListener(queues = {"q.app.event"}, concurrency = "2")
    public void consumeMessage(Message msg) throws JsonProcessingException {
            ObjectMapper objectMapper = new ObjectMapper();
            String body = new String(msg.getBody());
            MessageDTO dto = objectMapper.readValue(body, MessageDTO.class);

            log.info("dto.getId()={}", dto.getId());
            log.info("dto.getMessage()={}", dto.getMessage());
    }

}

위와 같이 컨슈머 컴포넌트를 하나 만들어주자. 메세지를 받아서 Body를 약속된 클래스(MessageDTO)로 변환하고, 필요한 처리를 해주는 간단한 기능을 갖고 있다.

여기서 concurrency는 동시에 몇 개의 컨슈머 쓰레드를 사용할지를 결정하는 값이다. 나는 예시로 2를 입력했다.

📌 concurrency 뿐만 아니라 매우 다양한 옵션을 줄 수 있다.



3) 동작 확인

컨슈머 애플리케이션을 실행만 해도 이전 섹션에서 Publish했던 메세지들이 바로 컨슈밍되는 것을 확인할 수 있다.

0개의 댓글

관련 채용 정보