Spring cloud stream Kafka

Dev. 로티·2022년 1월 26일
1

Spring boot

목록 보기
5/12
post-thumbnail

Spring에서는 Spring Cloud Stream이라는 메시징 시스템을 추상화한 구현체를 제공합니다.


Spring Cloud Stream

Spring cloud stream을 사용하는 애플리케이션은 미들웨어(Kafka, rabbitMQ 등) 과 직접적인 의존관계에 있지 않고, Spring cloud stream에서 제공하는 Binder라는 구현체를 중간에 두고 통신하기 때문에 어느 하나의 미들웨어에 강결합 되어있지 않은 상태에서 애플리케이션을 개발할 수 있게 됩니다.


Spring cloud stream의 대표적인 아키텍쳐

각각 요소에 대해 설명하자면 이렇습니다.
Binder는 미들웨어와의 통신을 담당하는 컴포넌트를 의미합니다.
Binding(input/output) 은 미들웨어와 통신을 위한 브릿지 역할을 합니다.

Spring boot에서 Spring cloud stream을 사용하는데에 있어 미들웨어인 Kafka와 함께 사용하는 방법에 대해 포스팅하도록 하겠습니다.

가장 먼저 dependency 두가지를 추가해야합니다.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

그 후 @Configuration 클래스에 @EnableBinding(채널 정보가 들어있는 Interface)를 추가해줍니다. 아래 예시에서는 Sink.class를 추가해주었는데 위 인터페이스는 Spring Cloud Stream이 기본적으로 제공하는 채널입니다.

사용자는 Sink.class와 같은 인터페이스에 정의된 채널을 통해 미들웨어와 통신하게 됩니다.

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringCloudStreamKafkaExampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamKafkaExampleApplication.class, args);
    }
}

@EnableBinding 클래스에 어떤 인터페이스를 넣어야하는가?

기본적으로 Spring Cloud Stream이 제공하는 인터페이스는 아래의 3가지 입니다.
public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

public interface Processor extends Source, Sink {
}

위 코드를 보면 Sink는 INPUT을 Source는 OUTPUT을 그리고 마지막으로 Processor는 두가지를 모두 포함하고있습니다. 여기서 의미하는 INPUT은 consumer 입장에서 subscribe 받을 TOPIC명을 의미하고, OUTPUT은 producer 입장에서 publish할 TOPIC명을 의미합니다.

사용자는 위와같은 채널들을 아래와 같이 용도에 맞게 정의할 수 있습니다.

public interface ProcessMessage {
    String SEND_MESSAGE = "send-message";
    String RECIVE_MESSAGE = "recive-message";

    @Output(SEND_MESSAGE)
    MessageChannel sendMessage();

    @Input(RECIVE_MESSAGE)
    SubscribableChannel getMessage();
}


@EnableBinding(ProcessMessage.class)
@SpringBootApplication
public class SpringCloudStreamKafkaExampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamKafkaExampleApplication.class, args);
    }
}

마지막으로 INPUT, OUTPUT 채널을 세팅한 후 아래와 같이 application.yml을 수정해주면
Spring cloud stream Kafka를 사용하는데에 있어 기본적인 세팅이 끝납니다.
(아래의 application.yml은 INPUT(Consumer) 세팅만 한 상태임)

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost			# 브로커 IP
          defaultBrokerPort: 9092		# 브로커 포트
          consumer-properties:
		…
      bindings:
        recive-message: 				# INPUT
          destination: test				# 토픽명
          group: myTestGroup			# Group Id

0개의 댓글