
RabbitMQ, Kafka를 사용해 실습을 진행해보며 대규모 스트림 처리 방법에 대해 학습하였다.

**docker run -d --name rabbitmq -p5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management**
spring.application.name=order
message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guestimport org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OrderApplicationQueueConfig {
@Value("${message.exchange}")
private String exchange;
@Value("${message.queue.product}")
private String queueProduct;
@Value("${message.queue.payment}")
private String queuePayment;
@Bean public TopicExchange exchange() { return new TopicExchange(exchange); }
@Bean public Queue queueProduct() { return new Queue(queueProduct); }
@Bean public Queue queuePayment() { return new Queue(queuePayment); }
@Bean public Binding bindingProduct() { return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct); }
@Bean public Binding bindingPayment() { return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment); }
}
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class OrderController {
private final OrderService orderService;
@GetMapping("/order/{id}")
public String order(@PathVariable String id) {
orderService.createOrder(id);
return "Order complete";
}
}
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class OrderService {
@Value("${message.queue.product}")
private String productQueue;
@Value("${message.queue.payment}")
private String paymentQueue;
private final RabbitTemplate rabbitTemplate;
public void createOrder(String orderId) {
rabbitTemplate.convertAndSend(productQueue, orderId);
rabbitTemplate.convertAndSend(paymentQueue, orderId);
}
}





spring.application.name=payment
message.queue.payment=market.payment
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class PaymentEndpoint {
@Value("${spring.application.name}")
private String appName;
@RabbitListener(queues = "${message.queue.payment}")
public void receiveMessage(String orderId) {
log.info("receive orderId:{}, appName : {}", orderId, appName);
}
}



spring.application.name=product
message.queue.product=market.product
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ProductEndpoint {
@Value("${spring.application.name}")
private String appName;
@RabbitListener(queues = "${message.queue.product}")
public void receiveMessage(String orderId) {
log.info("receive orderId:{}, appName : {}", orderId, appName);
}
}



Kafka는 분산 스트리밍 플랫폼으로, 주로 실시간 데이터 피드의 빅 데이터 처리를 목적으로 사용된다.
Kafka는 메시지 큐와 유사하지만, 대용량 데이터 스트림을 저장하고 실시간으로 분석하거나 처리하는데 중점을 둔다.
RabbitMQ는 전통적인 메시지 브로커로, 메시지의 안정적 전달과 큐잉에 중점을 두는 반면 Kafka는 분산 스트리밍 플랫폼으로, 대규모 실시간 데이터 스트림의 저장과 분석에 중점을 둔다.
메시지는 Kafka를 통해 전달되는 데이터 단위이다. 예를 들어, 로그 데이터나 이벤트 데이터가 메시지가 될 수 있다.
메시지는 키(key), 값(value), 타임스탬프(timestamp), 그리고 몇 가지 메타데이터로 구성된다.
메시지를 생성하고 Kafka에 보내는 역할을 한다. 예를 들어, 웹 애플리케이션이 로그 데이터를 Kafka에 보내는 경우 프로듀서가 된다.
프로듀서는 특정 토픽(topic)에 메시지를 보낸다.
메시지를 저장하는 장소이다. 메시지는 토픽에 저장되었다가 소비자에게 전달된다.
토픽은 여러 파티션(partition)으로 나누어질 수 있으며, 파티션은 메시지를 순서대로 저장한다. 파티션을 통해 병렬 처리가 가능하다.
예: “user-activity”라는 토픽에 사용자의 활동 로그를 저장할 수 있다.
파티션은 토픽을 물리적으로 나눈 단위로, 각 파티션은 독립적으로 메시지를 저장하고 관리한다.
각 파티션은 메시지를 순서대로 저장하며, 파티션 내의 메시지는 고유한 오프셋(offset)으로 식별된다.
파티션을 통해 데이터를 병렬로 처리할 수 있으며, 클러스터 내의 여러 브로커에 분산시켜 저장할 수 있다.
키는 메시지를 특정 파티션에 할당하는 데 사용되는 값이다.
동일한 키를 가진 메시지는 항상 동일한 파티션에 저장된다.
예를 들어, 특정 사용자 ID를 키로 사용하여 해당 사용자의 모든 이벤트가 동일한 파티션에 저장되도록 할 수 있다.
토픽에서 메시지를 가져와 처리하는 역할을 한다.
컨슈머는 특정 컨슈머 그룹(consumer group)에 속하며, 같은 그룹에 속한 컨슈머들은 토픽의 파티션을 분산 처리한다.
기본적으로 컨슈머는 스티키 파티셔닝(Sticky Partitioning)을 사용한다. 이는 특정 컨슈머가 특정 파티션에 붙어서 계속해서 데이터를 처리하는 방식으로, 이는 데이터 지역성을 높여 캐시 히트율을 증가시키고 전반적인 처리 성능을 향상시킨다.
Kafka 클러스터의 각 서버를 의미하며, 메시지를 저장하고 전송하는 역할을 한다.
하나의 Kafka 클러스터는 여러 브로커로 구성될 수 있으며, 각 브로커는 하나 이상의 토픽 파티션을 관리한다.
Kafka 클러스터를 관리하고 조정하는 데 사용되는 분산 코디네이션 서비스이다.
주키퍼는 브로커의 메타데이터를 저장하고, 브로커 간의 상호작용을 조정한다.
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
platform: linux/amd64
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
platform: linux/amd64
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-ui:
image: provectuslabs/kafka-ui:latest
platform: linux/amd64
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_READONLY: "false"
버전 관련으로 에러가 발생할 경우 zookeeper의 image를 wurstmeister/zookeeper:latest로 변경
docker compose up -d


spring.application.name=producer
server.port=8090
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class ProducerApplicationKafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class ProducerController {
private final ProducerService producerService;
@GetMapping("/send")
public String sendMessage(@RequestParam("topic") String topic,
@RequestParam("key") String key,
@RequestParam("message") String message) {
producerService.sendMessage(topic, key, message);
return "Message sent to Kafka topic";
}
}import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class ProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic , String key, String message) {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(topic, key, message + " " + i);
}
}
}
spring.application.name=consumer
server.port=8091
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
// 이 클래스는 Kafka 컨슈머 설정을 위한 Spring 설정 클래스입니다.
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션입니다.
@Configuration // Spring 설정 클래스로 선언하는 어노테이션입니다.
public class ConsumerApplicationKafkaConfig {
// Kafka 컨슈머 팩토리를 생성하는 빈을 정의합니다.
// ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용됩니다.
// 각 컨슈머는 이 팩토리를 통해 생성된 설정을 기반으로 작동합니다.
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// 컨슈머 팩토리 설정을 위한 맵을 생성합니다.
Map<String, Object> configProps = new HashMap<>();
// Kafka 브로커의 주소를 설정합니다.
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 메시지 키의 디시리얼라이저 클래스를 설정합니다.
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 값의 디시리얼라이저 클래스를 설정합니다.
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 설정된 프로퍼티로 DefaultKafkaConsumerFactory를 생성하여 반환합니다.
return new DefaultKafkaConsumerFactory<>(configProps);
}
// Kafka 리스너 컨테이너 팩토리를 생성하는 빈을 정의합니다.
// ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용됩니다.
// 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공합니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory를 생성합니다.
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 컨슈머 팩토리를 리스너 컨테이너 팩토리에 설정합니다.
factory.setConsumerFactory(consumerFactory());
// 설정된 리스너 컨테이너 팩토리를 반환합니다.
return factory;
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ConsumerService {
// 이 메서드는 Kafka에서 메시지를 소비하는 리스너 메서드입니다.
// @KafkaListener 어노테이션은 이 메서드를 Kafka 리스너로 설정합니다.
@KafkaListener(groupId = "group_a", topics = "topic1")
// Kafka 토픽 "test-topic"에서 메시지를 수신하면 이 메서드가 호출됩니다.
// groupId는 컨슈머 그룹을 지정하여 동일한 그룹에 속한 다른 컨슈머와 메시지를 분배받습니다.
public void consumeFromGroupA(String message) {
log.info("Group A consumed message from topic1: " + message);
}
// 동일한 토픽을 다른 그룹 ID로 소비하는 또 다른 리스너 메서드입니다.
@KafkaListener(groupId = "group_b", topics = "topic1")
public void consumeFromGroupB(String message) {
log.info("Group B consumed message from topic1: " + message);
}
// 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드입니다.
@KafkaListener(groupId = "group_c", topics = "topic2")
public void consumeFromTopicC(String message) {
log.info("Group C consumed message from topic2: " + message);
}
// 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드입니다.
@KafkaListener(groupId = "group_c", topics = "topic3")
public void consumeFromTopicD(String message) {
log.info("Group C consumed message from topic3: " + message);
}
@KafkaListener(groupId = "group_d", topics = "topic4")
public void consumeFromPartition0(String message) {
log.info("Group D consumed message from topic4: " + message);
}
}










