소프트웨어 시스템 간의 데이터를 비동기적으로 전송하는데 사용되는 중간 매개체이다.
일반적으로 분산 시스템에서 사용되며, 시스템 간의 결합도를 낮추고 확장성을 높이는데 도움을 준다.
메시지 큐를 통해 전송되는 데이터 단위이다. 메시지는 텍스트, JSON, 바이너리 등 다양한 형식으로 구성 가능하다.
메시지가 저장되는 공간이다. 일반적으로 FIFO 방식으로 동작하며, 먼저 도착한 메시지가 먼저 처리된다.
메시지를 생성하고 큐에 전송하는 역할을 한다. 예를 들어, 데이터베이스에서 새로운 레코드를 생성하고 이를 메시지로 큐에 보낼 수 있다.
큐에서 메시지를 가져와서 처리하는 역할을 한다. 예를 들어, 소비자는 메시지를 읽고 데이터베이스에 저장하거나, 특정 작업을 수행할 수 있다.
메시지 큐 시스템을 관리하고 메시지를 중계하는 중간 매개체이다. 브로커는 다양한 프로토콜을 통해 생산자와 소비자 간의 통신을 도와준다.
대표적인 메시지 큐 시스템
RabbitMQ, Apache Kafka, ActiveMQ, Redis 등이 있다.
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq
설치 후 레빗엠큐 플러그인을 활성화 시켜주는 코드를 추가해준다.
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management
기본으로 아이디/비번은 guest이다.
위계정은 최초 admin 계정에 해당하고, 추가로 계정을 생성하고 싶으면, 상단 nav 바 우측에 admin 이라는 페이지가 있어 새로운 계정을 만들 수 있다.
Producer는 메시지를 큐에 publish 해주는 역할을 한다.
대시보드에서는 publish를 통해 메시지 큐에 메시지를 적재하여 확인이 가능하다.
그리고 Consumer는 큐에 적재된 메시지를 꺼내서 소비하는 역할을 한다.
대시보드에서는 getMessage 명령을 통해 꺼낼 수 있다.
implementation 'org.springframework.boot:spring-boot-starter-amqp'
spring:
rabbitmq:
host: ip주소(VM_외부 ip / localhost)
username: 계정명
password: 비밀번호
port: 포트번호(관리자 페이지용 말고)
public class Producer{
private RabbitTemplate rabbitTemplate;
// objectMapper로 객체 직렬화(객체 → String)
rabbitTemplate.convertAndSend("큐 이름", 전송할 데이터);
}
public class Consumer{
@RabbitListener(queues = "큐 이름")
public void 메서드명(String message){
// 보통 objectMessage를 이용해 역직렬화함
}
}
단일 서버에서 여러 개의 컨테이너를 하나의 서비스로 정의해 컨테이너의 묶음으로 관리할 수 있는 작업 환경을 제공하는 관리 도구이다.
여러 개의 컨테이너가 하나의 어플리케이션으로 동작 시 도커 컴포즈
를 사용하지 않는다면, 이를 테스트하려면 각 컨테이너를 하나씩 생성해야 한다.
예를 들면, 웹 애플리케이션을 테스트하려면 웹 서버 컨테이너, 데이터베이스 컨테이너 두 개의 컨테이너를 각각 생성해야 한다.
데이터 피드의 분산 스트리밍, 파이프 라이닝 및 재생을 위한 실시간 스트리밍 데이터 처리를 위한 목적으로 설계된 오픈 소스 분산형 게시/구독/메시징 플랫폼.
서버 클러스터 내에서 데이터 스트림을 레코드로 유지하는 방식으로 작동하는 브로커 기반 솔루션.
Kafka 서버는 여러 데이터 센터에 분산되어 있을 수 있으며 여러 서버 인스턴스에 걸쳐 레코드 스트림을 토픽으로 저장하여 데이터 지속성을 제공할 수 있다.
게시/구독 메시징에서 보편적인 개념으로, 토픽은 지정된 데이터 스트림에 대한 관심을 표시하는 데 사용되는 주소 지정 가능한 추상화 개념이다.(메시징 큐처럼 사용)
파티션은 일련의 순서 대기열로 토픽의 세분화된 단위이다. (토픽 > 파티션)
레코드/메시지가 게시될 때 지속적으로 유지하는 서버 클러스터를 유지 관리하여 작동한다.
데이터 프로듀서는 주어진 레코드/메시지가 게시되어야 하는 토픽을 정의한다.
레코드/메시지를 처리하는 엔티티이다. 개별 워크로드에서 독립적으로 작업하거나 지정된 워크로드에서 다른 컨슈머와 협력하여 작업하도록 구성할 수 있다.(로드 밸런싱)
version: '3' # docker-compose 버전 지정
services: # docker-compose의 경우 docker 컨테이너로 수행될 서비스들은 services 하위에 기술
zookeeper: # 서비스 이름. service 하위에 작성하면 해당 이름으로 동작
image: wurstmeister/zookeeper # 도커 이미지
container_name: zookeeper
ports: # 외부포트:컨테이너내부포트
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports: # 외부포트:컨테이너내부포트
- "9092:9092"
environment: # kafka 브로터를 위한 환경 변수 지정
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka가 zookeeper에 커넥션하기 위한 대상을 지정
volumes:
- /var/run/docker.sock:/var/run/docker.sock
위에 생성한 docker-compose.yml 파일 위치로 이동한다.
docker-compose 실행이 안 될 경우 윈도우는 WSL을 통해 우분투를 열고 아래 명령어를 입력해 설치한다.
# 우분투, 데비안 계열
$ sudo apt-get update
$ sudo apt-get install docker-compose-plugin
docker-compose up -d
생성 후 Docker Desktop을 확인한다.
*# kafka 접속*
docker exec -it kafka bash
*# kafka 버전확인*
kafka-topics.sh --version
*# 토픽 생성*
kafka-topics.sh --create --topic sample_topic_1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
*# 토픽 목록 조회*
kafka-topics.sh --list --bootstrap-server localhost:9092
프로듀서는 토픽으로 발송하고, 컨슈머는 구독중인 토픽이 갱신되었는지 기다렸다가 갱신 후 등록된 메시지를 가져온다.
# 프로듀스 실행(토픽명: test_topic)
kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
> message1
> message2
> message3
# 컨슈머 실행
# --from-beginning 옵션은 해당 토픽의 맨 처음 메시지부터 확인 가능
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic
# 토픽 정보 조회
kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --describe
gradle에서 kafka 의존성 설정을 진행한다. kafka 버전은 위에서 테스트한 버전과 맞춰준다.
# kafka 버전 맞춰서 의존성 설정
implementation ('org.springframework.kafka:spring-kafka:2.8.1')
application.yml에서 kafka 설정을 추가한다. 여기서는 접속정보만 작성했다.
spring:
kafka:
bootstrap-servers: localhost:9092
JavaConfig 설정을 작성한다. application.yml을 통해 설정할수도 있지만 명식적 선언을 위해서 javaconfig로 설정한다.
@EnableKafka// @KafkaListener 사용을 위한 설정
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
// ------------------------ Publish 설정 ------------------------------------
// 테스트 Topic 생성 1
@Bean
public NewTopic myTopic1() {
return new NewTopic("my_topic_1", 1, (short) 1);
}
// 테스트 Topic 생성 2
@Bean
public NewTopic myTopic2() {
return new NewTopic("my_topic_2", 1, (short) 1);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// ------------------------ Consumer 설정 -------------------------------------
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
다음으로 Producer 테스트 클래스를 작성한다. API로 호출로 테스트하기 위해 Controller를 작성해준다. 위에서 설정한 2개의 토픽을 테스트하기 위해 메소드를 2개로 나누어 작성했지만 구조는 동일하다.
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("item-service/kafka")
@AllArgsConstructor
@Slf4j
public class ProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
private final NewTopic myTopic1;
private final NewTopic myTopic2;
@GetMapping("/publish/mytopic1")
public String publishSpringTopic1() {
String message = "publish message to my_topic_1 " + UUID.randomUUID();
CompletableFuture<CompletableFuture<SendResult<String,String>>> future
= CompletableFuture.supplyAsync(() ->
kafkaTemplate.send(myTopic1.name(), message).completable());
future.thenAccept(result -> {
log.info("Sent message=[" + message + "]");
}).exceptionally(ex -> {
log.error("Unable to send message=[" + message + "] due to: " + ex.getMessage());
return null;
});
return "done";
}
@GetMapping("/publish/mytopic2")
public String publish() {
String message = "publish message to my_topic_1 " + UUID.randomUUID();
CompletableFuture<CompletableFuture<SendResult<String,String>>> future
= CompletableFuture.supplyAsync(() ->
kafkaTemplate.send(myTopic2.name(), message).completable());
future.thenAccept(result -> {
log.info("Sent message=[" + message + "]");
}).exceptionally(ex -> {
log.error("Unable to send message=[" + message + "] due to: " + ex.getMessage());
return null;
});
return "done";
}
}
메시지 처리를 위해 Consumer 테스트 클래스를 Service로 작성하고, @KafkaListener를 적용한다.
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class KafkaConsumerService {
@KafkaListener(topics = "#{myTopic1.name}", groupId = "group1")
public void consumeMyopic1(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
log.info("[consume message]: {} from partition: {}", message, partition);
}
@KafkaListener(topics = "#{myTopic2.name}", groupId = "group1")
public void consumeMyopic2(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition){
log.info("[consume message - topic2]: {} from partition: {}", message, partition);
}
}
이제 API 테스트를 해주면 된다.
우선, 애플리케이션을 실행하면 Config 파일에 설정한 Topic 들이 생성되어 있는 것을 확인한다.
*# 토픽 목록 조회*
kafka-topics.sh --list --bootstrap-server localhost:9092
다음으로 콘솔에서 토픽 내부 레코드/메시지 확인을 위해 kafka 컨테이너에 접속한 뒤 consumer 설정을 해준다.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_2
이제 publish API를 호출해보면(to ‘mytopic1’) 각각 Consumer 로그들을 확인해 볼 수 있다.
http://localhost:8000/(개별마이크로서비스명)/kafka/publish/mytopic1