Annotation 방식으로 구현하기

kiyoung·2024년 1월 18일

spring cloud stream

목록 보기
4/8

앞서 구성도에서 확인하였던 Producer와 Receiver 서비스 간 메시지 전송 기능을 구현하도록 하겠습니다.


Spring initializer로 프로젝트 생성하기

spring initializer로 Producer와 Receiver 서비스를 생성합니다.

https://start.spring.io

항목
ProjectGradle-Groovy
LanguageJava
Spring Boot3.1.x
Group적절한 그룹
Artifactproducer
PackagingJar
Java17
Dependencies- Lombok
- Spring Web

Spring Boot 버전은 항상 업데이트 되기 때문에 원하는 버전을 찾지 못할 수 있습니다. 가장 낮은 버전을 선택합니다.

Generate 버튼을 클릭하여 압축 파일을 받은 다음 원하는 경로에 압축을 해제합니다.

마찬가지로 Receiver 서비스도 생성합니다.

항목
ProjectGradle-Groovy
LanguageJava
Spring Boot3.1.x
Group적절한 그룹
Artifactreceiver
PackagingJar
Java17
Dependencies- Lombok

Generate 버튼을 클릭하여 압축 파일을 받은 다음 원하는 경로에 압축을 해제합니다.


Producer 앱 구현하기

build.gradle에서 spring boot의 버전을 수정하고 spring cloud stream dependencies를 추가합니다.

build.gradle

plugins {
	id 'java'
    // 버전 변경
	id 'org.springframework.boot' version '2.7.6'
    // 버전 변경
	id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}

group = 'com.github.questcollector'
version = '0.0.1-SNAPSHOT'

java {
	sourceCompatibility = '17'
}

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
    
    // 추가
    // spring cloud stream rabbitmq
	implementation 'org.springframework.cloud:spring-cloud-starter-stream-rabbit:3.2.6'
}

// 제거
//tasks.named('bootBuildImage') {
//	builder = 'paketobuildpacks/builder-jammy-//base:latest'
//}

tasks.named('test') {
	useJUnitPlatform()
}

binding을 구성하기 위해서는 application.properties에 속성을 정의합니다.

src/main/resourcesapplication.properties에서 rabbitmq의 속성과 binding의 속성을 정의합니다.

src/main/resources/application/properties

server.port=18081

# RabbitMQ
spring.rabbitmq.addresses=${RABBITMQ_HOST:localhost}:${RABBITMQ_PORT:5672}
spring.rabbitmq.username=${RABBITMQ_USER:guest}
spring.rabbitmq.password=${RABBITMQ_PASSWORD:guest}

# spring cloud stream
#spring.cloud.stream.bindings.producerPublish.binder=rabbit
spring.cloud.stream.bindings.producerPublish.destination=test1

#spring.cloud.stream.bindings.producerConsume.binder=rabbit
spring.cloud.stream.bindings.producerConsume.destination=test2
spring.cloud.stream.bindings.producerConsume.group=test2

binding의 구성은 spring.cloud.stream.bindings 하위에 정의됩니다.
해당 속성 하위에 binding의 이름을 적고 그 하위에 여러 속성들을 정의할 수 있습니다.

binder 속성은 어떤 바인더를 사용할 것인지에 대한 것으로 spring-cloud-starter-stream-rabbit dependency를 이용하면 기본적으로 rabbit이라는 이름의 바인더가 등록됩니다.

destination 속성은 메시지의 목적지를 정의하는 속성으로 rabbitmq에서는 exchange, kafka에는 topic에 연계됩니다.

group 속성은 메시지를 소비할 때 바라보는 그룹을 정의하는 속성입니다. rabbitmq에서는 queue, kafka에서는 consumer group과 연계됩니다.

메시지의 자료 구조를 먼저 정의합니다.

src/main/java/
com.github.questcollector.producer
MyMessage.java

package com.github.questcollector.producer;

public record MyMessage(
        long id,
        String content
) {}

메시지는 spring-messaging의 MessageChannel을 통해 주고받게 됩니다.

MessageChannel을 구성하기 위해서 interface를 생성하고 MessageChannel을 리턴하는 메소드를 생성합니다.

@Ouput 애노테이션에는 메시지를 전송하는 binding의 이름을 입력하고,
@Input 애노테이션에는 메시지를 받는 binding의 이름을 입력합니다.

src/main/java/
com.github.questcollector.producer
ProducerEventMessageChannels.java

package com.github.questcollector.producer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface ProducerEventMessageChannels {
    String PRODUCER_PUBLISH = "producerPublish";
    String PRODUCER_CONSUME = "producerConsume";

    @Output(PRODUCER_PUBLISH)
    MessageChannel producerPublish();

    @Input(PRODUCER_CONSUME)
    SubscribableChannel producerConsume();
}

이렇게 정의한 인터페이스는 @EnableBinding 애노테이션에 등록하면 Bean으로 등록되어 Spring 애플리케이션에서 사용됩니다.

@EnableBinding 애노테이션은 @Configuration 애노테이션과 함께 쓰이는데, 여기에서는 @Configuration 애노테이션이 포함되어 있는 @SpringBootApplication과 함께 사용하도록 하겠습니다.

src/main/java/
com.github.questcollector.producer
ProducerApplication.java

package com.github.questcollector.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding({ProducerEventMessageChannels.class})
public class ProducerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class, args);
	}

}

이제 애플리케이션 내에서 메시지를 보내는 로직으로 구현하도록 하겠습니다.

GET / API가 호출되면 random한 id와 parameter로 받은 content를 가지는 MyMessage를 생성하고 이것을 MessageChannel을 통해 보냅니다.

response로 ok 코드와 MyMessage를 보내는 로직을 구현합니다.

src/main/java/
com.github.questcollector.producer
ProducerRestController.java

package com.github.questcollector.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
@Slf4j
public class ProducerRestController {
    private final ProducerEventMessageChannels producerEventMessageChannels;

    public ProducerRestController(ProducerEventMessageChannels producerEventMessageChannels) {
        this.producerEventMessageChannels = producerEventMessageChannels;
    }

    @GetMapping("/")
    public ResponseEntity<MyMessage> produceMessage(
            @RequestParam(value = "content", defaultValue = "Hello World") String content) {
        // return 객체 생성
        Random random = new Random(System.currentTimeMillis());
        MyMessage myMessage = new MyMessage(
                Math.abs(random.nextLong()),
                content
        );

        // MessageChannel로 메시지 전송
        MessageChannel messageChannel = producerEventMessageChannels.
                producerPublish();
        messageChannel.send(
                MessageBuilder.withPayload(myMessage).build()
        );
        log.info("published message to producerPublish binding");
        if (log.isDebugEnabled()) {
            log.debug("payload: " + myMessage.toString());
        }

        return ResponseEntity.ok(myMessage);
    }

}

마지막으로 메시지가 들어오게 되면 어떤 작업을 할 지 정의하는 StreamListener를 구현합니다.

src/main/java/
com.github.questcollector.producer
ProducerMessageListener.java

package com.github.questcollector.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ProducerMessageListener {
    
    @StreamListener(ProducerEventMessageChannels.PRODUCER_CONSUME)
    public void consumeEvent(MyMessage myMessage) {
        log.info("received message: " + myMessage.toString());
    }
}

Receiver 앱 구현하기

build.gradle 파일에서 spring boot의 버전을 수정하고 spring cloud stream dependency를 추가합니다.

build.gradle

plugins {
	id 'java'
	// 버전 변경
	id 'org.springframework.boot' version '2.7.6'
	// 버전 변경
	id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}

group = 'com.github.questcollector'
version = '0.0.1-SNAPSHOT'

java {
	sourceCompatibility = '17'
}

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'

	// 추가
	// spring cloud stream rabbitmq
	implementation 'org.springframework.cloud:spring-cloud-starter-stream-rabbit:3.2.6'
}

// 제거
//tasks.named('bootBuildImage') {
//	builder = 'paketobuildpacks/builder-jammy-base:latest'
//}

tasks.named('test') {
	useJUnitPlatform()
}

src/main/resources 디렉토리 하위의 application.properties에서 binding 정의를 추가합니다.

src/main/resources/application.properties

server.port=18082

# RabbitMQ
spring.rabbitmq.addresses=${RABBITMQ_HOST:localhost}:${RABBITMQ_PORT:5672}
spring.rabbitmq.username=${RABBITMQ_USER:guest}
spring.rabbitmq.password=${RABBITMQ_PASSWORD:guest}

# spring cloud stream
#spring.cloud.stream.bindings.receiverConsume.binder=rabbit
spring.cloud.stream.bindings.receiverConsume.destination=test1
spring.cloud.stream.bindings.receiverConsume.group=test1

#spring.cloud.stream.bindings.receiverPublish.binder=rabbit
spring.cloud.stream.bindings.receiverPublish.destination=test2

메시지의 자료구조를 동일하게 생성합니다.

src/main/java/
com.github.questcollector.receiver
MyMessage.java

package com.github.questcollector.receiver;

import lombok.Data;

public record MyMessage(
        long id,
        String content
) {}

메시지를 송/수신하는 매개인 MessageChannel을 등록하는 interface를 생성합니다.

src/main/java/
com.github.questcollector.receiver
ReceiverEventMessageChannels.java

package com.github.questcollector.receiver;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface ReceiverEventMessageChannels {
    String RECEIVER_PUBLISH = "receiverPublish";
    String RECEIVER_CONSUME = "receiverConsume";

    @Output(RECEIVER_PUBLISH)
    MessageChannel receiverPublish();

    @Input(RECEIVER_CONSUME)
    SubscribableChannel receiverConsume();
}

@EnableBinding에 ReceiverEventMessageChannels를 추가하여 Spring에서 Bean으로 등록할 수 있도록 설정합니다.

@Configuration을 함께 가지고 있는 @SpringBootApplication이 적용된 ReceiverApplication에 추가합니다.

src/main/java/
com.github.questcollector.receiver
ReceiverApplication.java

package com.github.questcollector.receiver;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding({ReceiverEventMessageChannels.class})
public class ReceiverApplication {

	public static void main(String[] args) {
		SpringApplication.run(ReceiverApplication.class, args);
	}

}

메시지를 전달받았을 때 content에 "2" 값을 추가해서 다시 메시지 채널을 통해 전송하는 StreamListener를 구현합니다.

src/main/java/
com.github.questcollector.receiver
ReceiverMessageListener.java

package com.github.questcollector.receiver;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiverMessageListener {
    private final ReceiverEventMessageChannels receiverEventMessageChannels;

    public ReceiverMessageListener(ReceiverEventMessageChannels receiverEventMessageChannels) {
        this.receiverEventMessageChannels = receiverEventMessageChannels;
    }

    @StreamListener(ReceiverEventMessageChannels.RECEIVER_CONSUME)
    public void consumeEvent(MyMessage myMessage) {
        log.info("received message: " + myMessage.toString());
		
        // content에 "2" 추가
        MyMessage newMessage = new MyMessage(
                    myMessage.id(), myMessage.content() + "2"
            );

		// MessageChannel을 통해 메시지 전송
        MessageChannel messageChannel = receiverEventMessageChannels
                .receiverPublish();
        messageChannel.send(
                MessageBuilder.withPayload(newMessage).build()
        );
        log.info("published message to receiverPublish binding");
        if (log.isDebugEnabled()) {
            log.debug("payload: " + newMessage.toString());
        }
    }
}

실행해보기

Producer 앱과 Receiver 앱을 모두 실행합니다.

localhost:15672로 rabbitmq management에 접속해 보면

Exchange 탭을 확인해 보면 test1과 test2가 생성되어 있는 것을 확인할 수 있습니다.

spring cloud stream에서는 바인딩을 설정하면 destination 이름에 맞추어 exchange를 자동으로 생성합니다. (기존에 있던 exchange라면 그대로 사용합니다.)

그리고 Queues 탭에 보면 test1.test1, test2.test2로 queue가 생성된 것을 확인할 수 있습니다.

spring cloud stream에서는 메시지를 받는 input 바인딩을 설정했을 때 계속 주시하고 있을 queue가 없다면 exchange와의 연결인 bind와 함께 자동으로 생성합니다.

이름 규칙은 destination과 group의 값을 콤마"."로 연결한 값으로 생성되고, 이러한 이름 규칙이 맘에 들지 않는다면 group의 이름만으로 queue를 생성하는 옵션도 있습니다.


그리고 브라우저에서 localhost:18081로 접속하여 Producer의 GET / API를 호출합니다.

랜덤한 id 값과 함께 content에는 기본값인 Hello World가 담겨 있는 데이터를 response로 받았습니다.

Producer 앱의 로그를 확인해 보면 메시지를 잘 전송했다는 것을 확인할 수 있습니다.

...  INFO 4255 --- [io-18081-exec-4] c.g.q.producer.ProducerRestController    : published message to producerPublish binding

이제 Receiver 앱에서의 로그를 확인해 봅니다.
id 값이 동일한 객체를 전송받은 것이 확인됩니다.

...  INFO 4245 --- [ test1.test1-22] c.g.q.receiver.ReceiverMessageListener   : received message: MyMessage(id=4908239329263513229, content=Hello World)

Receiver의 StreamListener에서는 메시지가 들어오면 content 값에 "2"를 추가하여 다시 전송합니다.

...  INFO 4245 --- [ test1.test1-22] c.g.q.receiver.ReceiverMessageListener   : published message to receiverPublish binding

다시 Producer의 로그를 보면 "2"값이 추가된 메시지가 들어온 로그도 확인할 수 있습니다.

...  INFO 4255 --- [ test2.test2-18] c.g.q.producer.ProducerMessageListener   : received message: MyMessage(id=4908239329263513229, content=Hello World2)

0개의 댓글