현재 급하게 Spring Cloud
를 사용하여 MSA
환경을 구성하고 있습니다.
그런데 데이터 동기화를 위한 메시지큐가 필요하다고 느끼게 되었고 도입하는 과정을 그리려고 합니다.
처음에는 설치부터 시작하지만 이후에는 카프카에 대해서 더 자세히 알아보도록 하겠습니다.
version: "3" # docker-compose 버전 지정
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # kafka가 zookeeper에 커넥션하기 위한 대상을 지정
volumes:
- /var/run/docker.sock:/var/run/docker.sock
먼저 로컬 환경에서 테스트 하기 위해서 localhost
로 지정하 두고 작성을 하였습니다. 해당 파일이 있는 경로에서 docker-compose up -d
를 사용하여 실행합니다.
⚠️ Warning
현재 mac M1 에서 진행중이여서 주키퍼 이미지와의 OS가 일치하지 않는 현상이 발생하였습니다.
추후 실제 배포 환경에서는 EC2 환경에서 진행할 예정이므로 내용을 추가하도록 하겠습니다.
docker-compose
로 설치를 하였습니다. 이번에는 카프카에 접속한 후 토픽을 생성하고 조회와 같은 작업들을 해보도록 하겠습니다.
# Kafka 접속
docker exec -it <컨테이너> bash
# Kafka 버전 확인
kafka-topics.sh --version
## Result ---> 2.8.1 (Commit:839b886f9b732b15)
# 토픽 생성
kafka-topics.sh --create \
--topic sample_topic_1 \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1
## Result ---> Created topic sample_topic_1.
# 토픽 목록 조회
kafka-topics.sh --list --bootstrap-server localhost:9092
## Result ---> sample_topic_1
사진으로 보기
이제 위에서 생성한 토픽을 가지고 활용해보도록 하겠습니다.
# Producer
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test_topic
# Consumer
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test_topic
실제로 통신을 하는 내용을 확인할 수 있습니다.
이번에는 Spring Boot 와 연동을 해보도록 하겠습니다.
// build.gradle
dependencies {
...
// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
implementation 'org.springframework.kafka:spring-kafka:3.1.1'
}
먼저 Dependency(Spring Kafka Support) 를 프로젝트에 추가하도록 하겠습니다.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // Kafka 주소
properties.put(GROUP_ID_CONFIG, "consumerGroupId"); // Consumer 들을 그룹으로 묶을 수 있다.
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
위와 같이 KafkaConsumer 설정들을 지정하였습니다.
ConsumerFactory
에서는 "토픽"에 접속하기 위한 설정 파일들을 담아두기 위해 Map
을 사용하여 만듭니다.ConcurrentKafkaListenerContainerFactory
에서는 "토픽"의 변경사항을 파악할 수 있는 리스너를 만듭니다.@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // Kafka 주소
properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
이번에는 Producer 를 위한 설정 파일을 만들어 보았습니다.
producerFactory
에서는 "토픽"에 접속하기 위한 설정 파일들을 담아두기 위해 Map
을 사용하여 만듭니다. consumer
와 많이 비슷합니다.kafkaTemplate
에서는 내용을 보내기 위한 빈을 등록하였습니다.@RequiredArgsConstructor
@Service
@Slf4j
public class KafkaConsumer {
private final CatalogRepository repository;
@Transactional
@KafkaListener(topics = "example-catalog-topic")
public void updateQuantity(String kafkaMessage) {
log.info("Kafka Message : -> [{}]", kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {
});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
CatalogJpaEntity catalogJpaEntity = repository.findByProductId((String) map.get("productId"));
if (catalogJpaEntity != null) {
catalogJpaEntity.setStock(catalogJpaEntity.getStock() - (Integer) map.get("quantity"));
}
}
}
@KafkaListener
를 사용하여 해당 토픽에 구독합니다. 해당 토픽에 메시지가 쌓이면 해당 메서드가 실행됩니다.
@RequiredArgsConstructor
@Service
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderVo send(String topic, OrderVo orderVo) {
ObjectMapper mapper = new ObjectMapper();
String jsonIsString = "";
try {
jsonIsString = mapper.writeValueAsString(orderVo);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonIsString);
log.info("Kafka Producer sent data from the Order microservice: [{}]", orderVo);
return orderVo;
}
}
@Bean
으로 등록한 KafkaTemplate 를 사용하여 메시지를 전송하는 기능입니다. 객체를 문자열로 변환 한 뒤 해당 토픽으로 전송합니다.
Producer
Consumer
정상적으로 메시지를 구독하고 전송하는 것을 확인할 수 있습니다.