이 가이드는 Spring Cloud Stream의 개요와 이벤트 기반 스트리밍 애플리케이션을 생성하는 프로세스를 안내합니다.
plugins {
id 'java'
id 'org.springframework.boot' version '3.2.2'
id 'io.spring.dependency-management' version '1.1.4'
}
group = 'guide'
version = '0.0.1-SNAPSHOT'
java {
sourceCompatibility = '17'
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2023.0.0")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
실시간 스트림 처리를 위한 이벤트 기반 Spring Boot 마이크로서비스를 구축하기 위한 프레임워크입니다.
이 가이드는 Spring Cloud Stream의 기능을 보여줍니다. Spring Cloud Stream의 다양한 기능을 보여주기 위해 세 가지 애플리케이션을 만듭니다.
name-source
: 정기적으로 String
을 게시(publish)하여 스트림을 시작합니다. 이 예에서는 이름을 String
로 게시합니다.
name-processor
: name-source
로 게시된 String
을 사용하고 어떤 방식으로든 데이터를 변환합니다. 결과를 다른 거래소(exchange
)에 게시합니다. 이 예에서는 String
이라는 이름을 사용하여 레코드를 생성하고 타임스탬프를 추가합니다.
name-sink
: 네임 프로세서의 결과를 사용하고 작업을 수행(perform the action)합니다. 이 경우 결과를 표준 출력으로 인쇄하십시오.
이 예에서 애플리케이션 이름은 Spring Cloud Stream 개념(Source
, Processor
, Sink
)을 따릅니다. 이러한 개념은 Java 8 함수(각각 Supplier
, Function
, Consumer
)와 논리적으로 동등한(equivalent) 개념에 매핑됩니다. Spring Cloud Stream은 Source
와 Sink
(함수 합성을 통해)에서 하나 이상의 Function
인스턴스를 지원할 수 있지만, 각각이 독립적인 애플리케이션으로 작동하는 방법을 보여주는 세 가지 별도의 애플리케이션이 있습니다.
이 가이드에서는 뒤에서 앞으로 작업합니다. 즉, 먼저 Sink
애플리케이션을 빌드한 다음 Processor
를 빌드하고 마지막으로 Source
를 빌드합니다. 우리는 RabbitMQ
대시보드 UI를 사용하여 진행하면서 각 구성 요소를 테스트합니다.
Spring Cloud Stream 기능을 사용하려면 메시지 브로커에 액세스할 수 있는지 확인해야 합니다. 이 가이드에서는 RabbitMQ를 사용합니다. 로컬 Docker 환경이 발견되면 다음 명령으로 RabbitMQ를 시작할 수 있습니다.
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
결과적으로 RabbitMQ는 사용자 이름/비밀번호 (guest
/guest
)를 사용하여 로컬에서 액세스할 수 있어야 합니다.
name-source
, name-processor
, name-sink
등 예제에서 사용할 애플리케이션 모듈을 추가한다.
public record Person(String name, Long processedTimestamp) {
}
import java.util.function.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NameSinkConfiguration {
// tag::namesink[]
@Bean
public Consumer<Person> nameSink() {
return person -> {
System.out.println(person.name());
System.out.println(person.processedTimestamp());
};
}
// end::namesink[]
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class NameSinkApplication {
public static void main(String[] args) {
SpringApplication.run(NameSinkApplication.class, args);
}
}
# Spring Boot will automatically assign an unused http port
server.port=0
# tag::sinkexchangeconfig[]
spring.cloud.stream.function.bindings.nameSink-in-0=sinkinput
# end::sinkexchangeconfig[]
public record Person(String name, Long processedTimestamp) {
}
import java.util.Date;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NameProcessorConfiguration {
// tag::processname[]
@Bean
public Function<String, Person> processName() {
return name -> new Person(name, new Date().getTime());
}
// end::processname[]
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class NameProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(NameProcessorApplication.class, args);
}
}
# Spring Boot will automatically assign an unused http port
server.port=0
# tag::processorexchangeconfig[]
spring.cloud.stream.function.bindings.processName-in-0=processorinput
spring.cloud.stream.function.bindings.processName-out-0=sinkinput
# end::processorexchangeconfig[]
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NameSourceConfiguration {
// tag::supplyname[]
@Bean
public Supplier<String> supplyName() {
return () -> "Christopher Pike";
}
// end::supplyname[]
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class NameSourceApplication {
public static void main(String[] args) {
SpringApplication.run(NameSourceApplication.class, args);
}
}
# Spring Boot will automatically assign an unused http port
server.port=0
# tag::supplierexchangeconfig[]
spring.cloud.stream.function.bindings.supplyName-out-0=processorinput
# end::supplierexchangeconfig[]
Sink
(java.util.function.Consumer
)는 NameSinkConfiguration
에서 다음과 같이 정의됩니다.
@Bean
public Consumer<Person> nameSink() {
return person -> {
System.out.println(person.name());
System.out.println(person.processedTimestamp());
};
}
교환 이름(exchange name)을 구성(configure)하지 않고 이 애플리케이션을 시작하면 RabbitMQ에 nameSink-in-0
이라는 교환(exchange)이 자동으로 생성됩니다. 나중에 프로세서를 싱크에 연결(hook)할 수 있도록 이 교환을 사용자 정의(customize)하고 싶습니다.
spring.cloud.stream.function.bindings.nameSink-in-0=sinkinput
싱크를 테스트하기 위해 Person
레코드를 나타내는 JSON 메시지를 새로 생성된 교환(exchange)에 수동(manually)으로 게시합니다.
"message input published but not routed"
publish 된 메시지가 정상적으로 라우트 되어 콘솔 로그에 출력됩니다.
"Hook"이라는 용어는 소프트웨어 개발에서 다양한 의미로 사용됩니다. 여기서 "hook"은 두 개의 시스템 또는 컴포넌트를 연결하거나 상호 작용할 수 있도록 만드는 메커니즘을 의미합니다.
RabbitMQ에서의 "hook"은 두 개의 exchange를 연결하여 메시지를 교환할 수 있도록 만드는 것을 의미합니다. 일반적으로 RabbitMQ에서는 메시지를 producer(데이터를 생성하는 애플리케이션)가 exchange로 보내고, 해당 exchange는 메시지를 consumer(데이터를 소비하는 애플리케이션)에게 라우팅합니다.
exchange 사이의 "hook"은 메시지의 흐름을 조정하고, 원하는 방식으로 메시지를 필터링하거나 라우팅할 수 있게 해줍니다. 예를 들어, 하나의 exchange에서 받은 메시지를 처리한 후 다른 exchange로 보내거나, 여러 개의 exchange로부터 메시지를 수신하여 특정한 비즈니스 규칙에 따라 처리할 수 있습니다.
즉, "hook"을 설정함으로써 데이터의 흐름을 제어하고 다양한 시스템 또는 컴포넌트 간에 메시지를 교환하고 처리할 수 있습니다. 이를 통해 애플리케이션 간의 통합을 더욱 유연하게 구현할 수 있습니다.
프로세서(java.util.function.Function
)는 NameProcessorConfiguration
에서 다음과 같이 정의됩니다.
@Bean
public Function<String, Person> processName() {
return name -> new Person(name, new Date().getTime());
}
이 함수는 문자열(String) 값을 입력(input)으로 사용하고 데이터가 처리된 시점의 타임스탬프를 추가하는 새 Person
레코드를 생성합니다. 이 애플리케이션을 실행하면 RabbitMQ에 processName-in-0
및 processName-out-0
이라는 두 개의 새로운 교환이 생성됩니다. 싱크 애플리케이션에 적용한 구성과 유사하게 이러한 교환 이름을 변경하여 싱크와 supplier(source)에 연결할 수 있도록 하려고 합니다.
spring.cloud.stream.function.bindings.processName-in-0=processorinput
spring.cloud.stream.function.bindings.processName-out-0=sinkinput
프로세서의 출력은 싱크의 입력과 일치합니다.
이제 RabbitMQ 대시보드를 사용하여 문자열(이름)을 프로세서 입력 교환으로 보내고 연결된 싱크로 흐르는 것을 확인할 수 있습니다.
프로세서와 싱크가 제대로 연결되면 실행 중인 싱크의 출력이 표시됩니다.
2번 보내서 NameSinkApplication 로그에 두번 출력됨.
소스(java.util.function.Supplier
)는 NameSourceConfiguration
에서 다음과 같이 정의됩니다.
@Bean
public Supplier<String> supplyName() {
return () -> "Christopher Pike";
}
프로세서 출력을 싱크 입력에 연결한 방법과 유사하게 동일한 작업을 수행하고 소스 출력을 프로세서 입력에 연결해야 합니다.
spring.cloud.stream.function.bindings.supplyName-out-0=processorinput
소스의 출력은 프로세서의 입력과 일치해야 합니다.
name-processor
와 name-sink
가 이미 실행 중인 경우 name-source
를 시작하면 시스템을 통해 메시지 흐름이 즉시 시작됩니다. 메시지가 프로세서를 통과할 때 약간 다른 타임스탬프와 함께 name-sink
에 의해 지속적으로 생성된 동일한 이름이 표시되어야 합니다. 더 이상 RabbitMQ 대시보드 테스트가 필요하지 않습니다! 이제 완벽하게 작동하는 스트림 애플리케이션이 생겼습니다.
축하해요! Spring Cloud Stream의 상위 수준 개요를 완료했으며 RabbitMQ와 통신하는 Spring Cloud Stream 애플리케이션을 빌드하고 테스트할 수 있었습니다.