Event-driven 아키텍처을 알아보고, MSA, Spring에서 어떻게 적용할 수 있을지 학습해보자
유연성
(단지 변경을 감지하고, 각자의 일에 집중하기 때문)유지보수성
(마이크로 서비스끼리는 서로 무슨일을 하는지 알 필요가 없기 때문)가용성
(서로 강한 결헙이 아니기 때문에, 하나의 서비스가 죽어도 다른 서비스에 영향을 미치지 않는다.)확장성
(개별 서비스는 필요시 규모를 늘리거나 줄이기 용이하다.)메시지 복제 및 보존
(수신자가 메시지를 꺼낼 때 까지 안전하게 보관된다.)확장 용이
(브로커를 여러 개 만들거나, 수평적으로 확장하여 트래픽에 대응하기 용이하다.)서비스간의 의존성 제거
(서비스끼리 직접 연결이 없기 때문에, 수신자가 가동 여부와 상관없이 송신자는 데이터를 보낼 수 있다. 반대의 경우도 마찬가지)전달 보장 메커니즘 존재
(송신 및 수신 측 서버에서 이를 따로 구현하지 않아도 된다.)메시지 처리 시점 제어
(브로커의 상태에 따라 프로그램을 제어할 수 있다.)병목현상
지점이 될 수 있다. (Bottleneck Point)단일 장애 지점
이 될 수 있다. (SPOF)복잡성 및 운영비용 증가
(별도의 메시지 브로커를 구축하고 운영해야 하기 때문)많은 메시지 브로커가 있지만, 필자는 Apache Kafka
를 아래와 같은 이유로 사용을 결정했다.
무료
참고 자료가 많음
많은 기업에서 사용중
메시지 비휘발성
(Kafka는 topic으로 메시지를 분류하고, 수신자가 메시지를 가져가더라도 topic을 유지한다.)무엇보다도, 메시지 브로커들이 비슷해서 하나를 알면 다른 서비스도 활용할 수 있다고 판단했다.
Kafka의 특징은 다른 글에서 다루고, 이 글에서는 Spring에서 Kafka를 이용하고 Event-driven 아키텍처를 적용하는 것을 실습해보겠다.
구성 : Spring 서버 A
, Spring 서버 B
, Kafka클러스터(zookeeper 1개, broker 3개)
환경 : Mac OS m1
, docker
목표 : Spring 서버 A
-> 메시지 produce
-> kafka
-> 메시지 consume
-> spring 서버 B
---
version: "3"
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:5.5.1
ports:
- "32181:32181"
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 3000
kafka-1:
image: confluentinc/cp-kafka:5.5.1
ports:
- "9092:9092" # 호스트 포트 : 컨테이너 내부 포트
depends_on:
- zookeeper-1 # 현재 정의된 서비스가 시작되기 전에 zookeeper-1 서비스가 시작되어야 함
environment:
KAFKA_BROKER_ID: 1 # 브로커 고유 ID
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181 # 카프카 브로커가 사용할 zookeeper 연결 정보
# 브로커의 리스너들 간의 보안 프로토콜 매핑 (PLAINTEXT, SSL, SASL)
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
# 브로커 간 통신에 사용될 리스너 이름 설정
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# 클라이언트가 브로커에 접속할 때 사용할 주소 설정
# 내부 : kafka-1:29092
# 외부 : localhost:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
# 복제본 개수 정의 (default : 1)
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
# 파티션 개수 정의 (default : 1)
KAFKA_NUM_PARTITIONS: 3
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
kafka-2:
image: confluentinc/cp-kafka:5.5.1
ports:
- "9093:9093"
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
kafka-3:
image: confluentinc/cp-kafka:5.5.1
ports:
- "9094:9094"
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
version: "2"
services:
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8989:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092,kafka-2:29093,kafka-3:29094
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181
application.yml
kafka:
bootstrap-servers: localhost:9092, localhost:9093, localhost:9094 # kafka 브로커 주소
KafkaProducerConfig.java : 프로듀서 환경설정
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 재시도
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000); // 타임아웃
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducerService.java : 전송 서비스
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message)
.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("전송 성공, {}", result);
}
@Override
public void onFailure(Throwable ex) {
if (ex instanceof KafkaProducerException) {
log.info("전송 실패");
}
}
});
}
}
KafkaController.java : 전송 컨트롤러
@RestController
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
@GetMapping("/kafka/health")
public String sendMessage() {
kafkaProducerService.sendMessage("health", "ok");
return "send message to health topic!";
}
}
application.yml
kafka:
bootstrap-servers: localhost:9092, localhost:9093, localhost:9094 # kafka 브로커 주소
KafkaConsumerConfig.java : 컨슈머 환경설정
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "health_group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
KafkaConsumerService.java : 수신 서비스
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumeService {
@KafkaListener(topics = "health", groupId = "health_group", containerFactory = "kafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
String message = record.value();
long offset = record.offset();
log.info("Consumed message: {}, offset : {}", message, offset);
acknowledgment.acknowledge();
}
}
참고 자료