Guide_Spring Cloud Stream

Dev.Hammy·2024년 1월 28일
0

Spring Guides

목록 보기
45/46

Getting Started with Spring Cloud Stream

이 가이드는 Spring Cloud Stream의 개요와 이벤트 기반 스트리밍 애플리케이션을 생성하는 프로세스를 안내합니다.

build.gradle

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.2.2'
    id 'io.spring.dependency-management' version '1.1.4'
}

group = 'guide'
version = '0.0.1-SNAPSHOT'

java {
    sourceCompatibility = '17'
}

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "2023.0.0")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
    testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

tasks.named('test') {
    useJUnitPlatform()
}

스프링 클라우드 스트림이란

실시간 스트림 처리를 위한 이벤트 기반 Spring Boot 마이크로서비스를 구축하기 위한 프레임워크입니다.

스트림 처리 애플리케이션 개요

이 가이드는 Spring Cloud Stream의 기능을 보여줍니다. Spring Cloud Stream의 다양한 기능을 보여주기 위해 세 가지 애플리케이션을 만듭니다.

  • name-source: 정기적으로 String을 게시(publish)하여 스트림을 시작합니다. 이 예에서는 이름을 String로 게시합니다.

  • name-processor: name-source로 게시된 String을 사용하고 어떤 방식으로든 데이터를 변환합니다. 결과를 다른 거래소(exchange)에 게시합니다. 이 예에서는 String이라는 이름을 사용하여 레코드를 생성하고 타임스탬프를 추가합니다.

  • name-sink: 네임 프로세서의 결과를 사용하고 작업을 수행(perform the action)합니다. 이 경우 결과를 표준 출력으로 인쇄하십시오.

이 예에서 애플리케이션 이름은 Spring Cloud Stream 개념(Source, Processor, Sink)을 따릅니다. 이러한 개념은 Java 8 함수(각각 Supplier, Function, Consumer)와 논리적으로 동등한(equivalent) 개념에 매핑됩니다. Spring Cloud Stream은 SourceSink(함수 합성을 통해)에서 하나 이상의 Function 인스턴스를 지원할 수 있지만, 각각이 독립적인 애플리케이션으로 작동하는 방법을 보여주는 세 가지 별도의 애플리케이션이 있습니다.

이 가이드에서는 뒤에서 앞으로 작업합니다. 즉, 먼저 Sink 애플리케이션을 빌드한 다음 Processor를 빌드하고 마지막으로 Source를 빌드합니다. 우리는 RabbitMQ 대시보드 UI를 사용하여 진행하면서 각 구성 요소를 테스트합니다.

필수 구성 요소 설치 - RabbitMQ

Spring Cloud Stream 기능을 사용하려면 메시지 브로커에 액세스할 수 있는지 확인해야 합니다. 이 가이드에서는 RabbitMQ를 사용합니다. 로컬 Docker 환경이 발견되면 다음 명령으로 RabbitMQ를 시작할 수 있습니다.

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

결과적으로 RabbitMQ는 사용자 이름/비밀번호 (guest/guest)를 사용하여 로컬에서 액세스할 수 있어야 합니다.


프로젝트 구조 만들기

Project Structure 모듈 추가

name-source, name-processor, name-sink 등 예제에서 사용할 애플리케이션 모듈을 추가한다.

name-sink 모듈

Person

public record Person(String name, Long processedTimestamp) {
}

NameSinkConfiguration

import java.util.function.Consumer;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NameSinkConfiguration {

	// tag::namesink[]
	@Bean
	public Consumer<Person> nameSink() {
		return person -> {
			System.out.println(person.name());
			System.out.println(person.processedTimestamp());
		};
	}
	// end::namesink[]
}

NameSinkApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class NameSinkApplication {

	public static void main(String[] args) {
		SpringApplication.run(NameSinkApplication.class, args);
	}

}

application.properties

# Spring Boot will automatically assign an unused http port
server.port=0

# tag::sinkexchangeconfig[]
spring.cloud.stream.function.bindings.nameSink-in-0=sinkinput
# end::sinkexchangeconfig[]

name-processor 모듈

Person

public record Person(String name, Long processedTimestamp) {
}

NameProcessorConfiguration

import java.util.Date;
import java.util.function.Function;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NameProcessorConfiguration {

	// tag::processname[]
	@Bean
	public Function<String, Person> processName() {
		return name -> new Person(name, new Date().getTime());
	}
	// end::processname[]
}

NameProcessorApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class NameProcessorApplication {

	public static void main(String[] args) {
		SpringApplication.run(NameProcessorApplication.class, args);
	}

}

application.properties

# Spring Boot will automatically assign an unused http port
server.port=0
# tag::processorexchangeconfig[]
spring.cloud.stream.function.bindings.processName-in-0=processorinput
spring.cloud.stream.function.bindings.processName-out-0=sinkinput
# end::processorexchangeconfig[]

name-source 모듈

NameSourceConfiguration

import java.util.function.Supplier;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NameSourceConfiguration {

	// tag::supplyname[]
	@Bean
	public Supplier<String> supplyName() {
		return () -> "Christopher Pike";
	}
	// end::supplyname[]
}

NameSourceApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class NameSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(NameSourceApplication.class, args);
	}

}

application.properties

# Spring Boot will automatically assign an unused http port
server.port=0

# tag::supplierexchangeconfig[]
spring.cloud.stream.function.bindings.supplyName-out-0=processorinput
# end::supplierexchangeconfig[]

Sink Application

Sink(java.util.function.Consumer)는 NameSinkConfiguration에서 다음과 같이 정의됩니다.

@Bean
public Consumer<Person> nameSink() {
  return person -> {
    System.out.println(person.name());
    System.out.println(person.processedTimestamp());
  };
}

교환 이름(exchange name)을 구성(configure)하지 않고 이 애플리케이션을 시작하면 RabbitMQ에 nameSink-in-0이라는 교환(exchange)이 자동으로 생성됩니다. 나중에 프로세서를 싱크에 연결(hook)할 수 있도록 이 교환을 사용자 정의(customize)하고 싶습니다.

spring.cloud.stream.function.bindings.nameSink-in-0=sinkinput

싱크를 테스트하기 위해 Person 레코드를 나타내는 JSON 메시지를 새로 생성된 교환(exchange)에 수동(manually)으로 게시합니다.

"message input published but not routed"

publish 된 메시지가 정상적으로 라우트 되어 콘솔 로그에 출력됩니다.


"Hook"이라는 용어는 소프트웨어 개발에서 다양한 의미로 사용됩니다. 여기서 "hook"은 두 개의 시스템 또는 컴포넌트를 연결하거나 상호 작용할 수 있도록 만드는 메커니즘을 의미합니다.

RabbitMQ에서의 "hook"은 두 개의 exchange를 연결하여 메시지를 교환할 수 있도록 만드는 것을 의미합니다. 일반적으로 RabbitMQ에서는 메시지를 producer(데이터를 생성하는 애플리케이션)가 exchange로 보내고, 해당 exchange는 메시지를 consumer(데이터를 소비하는 애플리케이션)에게 라우팅합니다.

exchange 사이의 "hook"은 메시지의 흐름을 조정하고, 원하는 방식으로 메시지를 필터링하거나 라우팅할 수 있게 해줍니다. 예를 들어, 하나의 exchange에서 받은 메시지를 처리한 후 다른 exchange로 보내거나, 여러 개의 exchange로부터 메시지를 수신하여 특정한 비즈니스 규칙에 따라 처리할 수 있습니다.

즉, "hook"을 설정함으로써 데이터의 흐름을 제어하고 다양한 시스템 또는 컴포넌트 간에 메시지를 교환하고 처리할 수 있습니다. 이를 통해 애플리케이션 간의 통합을 더욱 유연하게 구현할 수 있습니다.


Processor Application

프로세서(java.util.function.Function)는 NameProcessorConfiguration에서 다음과 같이 정의됩니다.

@Bean
public Function<String, Person> processName() {
  return name -> new Person(name, new Date().getTime());
}

이 함수는 문자열(String) 값을 입력(input)으로 사용하고 데이터가 처리된 시점의 타임스탬프를 추가하는 새 Person 레코드를 생성합니다. 이 애플리케이션을 실행하면 RabbitMQ에 processName-in-0processName-out-0이라는 두 개의 새로운 교환이 생성됩니다. 싱크 애플리케이션에 적용한 구성과 유사하게 이러한 교환 이름을 변경하여 싱크와 supplier(source)에 연결할 수 있도록 하려고 합니다.

spring.cloud.stream.function.bindings.processName-in-0=processorinput
spring.cloud.stream.function.bindings.processName-out-0=sinkinput

프로세서의 출력은 싱크의 입력과 일치합니다.

이제 RabbitMQ 대시보드를 사용하여 문자열(이름)을 프로세서 입력 교환으로 보내고 연결된 싱크로 흐르는 것을 확인할 수 있습니다.

프로세서와 싱크가 제대로 연결되면 실행 중인 싱크의 출력이 표시됩니다.

2번 보내서 NameSinkApplication 로그에 두번 출력됨.

Source Application

소스(java.util.function.Supplier)는 NameSourceConfiguration에서 다음과 같이 정의됩니다.

@Bean
public Supplier<String> supplyName() {
  return () -> "Christopher Pike";
}

프로세서 출력을 싱크 입력에 연결한 방법과 유사하게 동일한 작업을 수행하고 소스 출력을 프로세서 입력에 연결해야 합니다.

spring.cloud.stream.function.bindings.supplyName-out-0=processorinput

소스의 출력은 프로세서의 입력과 일치해야 합니다.

name-processorname-sink가 이미 실행 중인 경우 name-source를 시작하면 시스템을 통해 메시지 흐름이 즉시 시작됩니다. 메시지가 프로세서를 통과할 때 약간 다른 타임스탬프와 함께 name-sink에 의해 지속적으로 생성된 동일한 이름이 표시되어야 합니다. 더 이상 RabbitMQ 대시보드 테스트가 필요하지 않습니다! 이제 완벽하게 작동하는 스트림 애플리케이션이 생겼습니다.

요약

축하해요! Spring Cloud Stream의 상위 수준 개요를 완료했으며 RabbitMQ와 통신하는 Spring Cloud Stream 애플리케이션을 빌드하고 테스트할 수 있었습니다.

0개의 댓글