[MSA] - Event-driven 아키텍처

chancehee·2023년 11월 11일
0

MSA

목록 보기
1/2
post-thumbnail

[ 개요 ]

Event-driven 아키텍처을 알아보고, MSA, Spring에서 어떻게 적용할 수 있을지 학습해보자

[ Event-driven 아키텍처 ]

  • 소프트웨어 디자인 패턴 중 하나로, 분리된 애플리케이션이 이벤트 브로커/메시지 브로커를 통해 비동기적으로 발행/게시할 수 있는 소프트웨어 설계 패턴이다.
  • 이벤트 기반 앱은 언어가 아닌 프로그래밍 접근 방식이므로, 모든 프로그래밍 언어로 만들 수 있다.

[ EDA 장점 ]

  • 유연성 (단지 변경을 감지하고, 각자의 일에 집중하기 때문)
  • 유지보수성 (마이크로 서비스끼리는 서로 무슨일을 하는지 알 필요가 없기 때문)
  • 가용성 (서로 강한 결헙이 아니기 때문에, 하나의 서비스가 죽어도 다른 서비스에 영향을 미치지 않는다.)
  • 확장성 (개별 서비스는 필요시 규모를 늘리거나 줄이기 용이하다.)

[ 메시지 브로커 ]

  • MSA에서는 주로 메시지 브로커를 사용한다.
  • 직접 서버끼리 비동기로 통신하는 것도 가능하지만, 메시지 브로커를 통해 아래와 같은 이점을 얻을 수 있다.

[ 메시지 브로커 장점 ]

  • 메시지 복제 및 보존 (수신자가 메시지를 꺼낼 때 까지 안전하게 보관된다.)
  • 확장 용이 (브로커를 여러 개 만들거나, 수평적으로 확장하여 트래픽에 대응하기 용이하다.)
  • 서비스간의 의존성 제거 (서비스끼리 직접 연결이 없기 때문에, 수신자가 가동 여부와 상관없이 송신자는 데이터를 보낼 수 있다. 반대의 경우도 마찬가지)
  • 전달 보장 메커니즘 존재 (송신 및 수신 측 서버에서 이를 따로 구현하지 않아도 된다.)
  • 메시지 처리 시점 제어 (브로커의 상태에 따라 프로그램을 제어할 수 있다.)
    ex1) 메시지가 큐에 도착하는 즉시, 수신
    ex2) 큐의 메시지가 10개가 될 때까지 수신을 하지 않음

[ 메시지 브로커 단점 ]

  • 병목현상 지점이 될 수 있다. (Bottleneck Point)
  • 단일 장애 지점이 될 수 있다. (SPOF)
  • 복잡성 및 운영비용 증가 (별도의 메시지 브로커를 구축하고 운영해야 하기 때문)

[ 메시지 브로커 종류 ]

  • Apache Kafka
  • RabbitMQ
  • Apache ActiveMQ
  • Amazon SQS
  • Google Cloud pub/sub
  • Redis pub/sub

[ Apache Kafka ]

많은 메시지 브로커가 있지만, 필자는 Apache Kafka를 아래와 같은 이유로 사용을 결정했다.

  • 무료
  • 참고 자료가 많음
  • 많은 기업에서 사용중
  • 메시지 비휘발성 (Kafka는 topic으로 메시지를 분류하고, 수신자가 메시지를 가져가더라도 topic을 유지한다.)

무엇보다도, 메시지 브로커들이 비슷해서 하나를 알면 다른 서비스도 활용할 수 있다고 판단했다.
Kafka의 특징은 다른 글에서 다루고, 이 글에서는 Spring에서 Kafka를 이용하고 Event-driven 아키텍처를 적용하는 것을 실습해보겠다.

[ 실습 : MSA + Kafka ]

구성 : Spring 서버 A, Spring 서버 B, Kafka클러스터(zookeeper 1개, broker 3개)
환경 : Mac OS m1, docker
목표 : Spring 서버 A -> 메시지 produce -> kafka -> 메시지 consume -> spring 서버 B

[ docker-compose.yml ]

---
version: "3"
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:5.5.1
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 3000

  kafka-1:
    image: confluentinc/cp-kafka:5.5.1
    ports:
      - "9092:9092" # 호스트 포트 : 컨테이너 내부 포트
    depends_on:
      - zookeeper-1 # 현재 정의된 서비스가 시작되기 전에 zookeeper-1 서비스가 시작되어야 함
    environment:
      KAFKA_BROKER_ID: 1 # 브로커 고유 ID
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181 # 카프카 브로커가 사용할 zookeeper 연결 정보
			# 브로커의 리스너들 간의 보안 프로토콜 매핑 (PLAINTEXT, SSL, SASL)
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
			# 브로커 간 통신에 사용될 리스너 이름 설정
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
			# 클라이언트가 브로커에 접속할 때 사용할 주소 설정
			# 내부 : kafka-1:29092
			# 외부 : localhost:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      # 복제본 개수 정의 (default : 1)
			KAFKA_DEFAULT_REPLICATION_FACTOR: 3
			# 파티션 개수 정의 (default : 1)
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3

  kafka-2:
    image: confluentinc/cp-kafka:5.5.1
    ports:
      - "9093:9093"
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3

  kafka-3:
    image: confluentinc/cp-kafka:5.5.1
    ports:
      - "9094:9094"
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3

[ Kafka UI ]

version: "2"
services:
  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8989:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092,kafka-2:29093,kafka-3:29094
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181

[ Spring Server A : Producer ]

application.yml

kafka:
  bootstrap-servers: localhost:9092, localhost:9093, localhost:9094 # kafka 브로커 주소

KafkaProducerConfig.java : 프로듀서 환경설정

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 재시도
        configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000); // 타임아웃
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaProducerService.java : 전송 서비스

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message)
                .addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                    @Override
                    public void onSuccess(SendResult<String, String> result) {
                        log.info("전송 성공, {}", result);
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        if (ex instanceof KafkaProducerException) {
                            log.info("전송 실패");
                        }
                    }
                });
    }
}

KafkaController.java : 전송 컨트롤러

@RestController
@RequiredArgsConstructor
public class KafkaController {

    private final KafkaProducerService kafkaProducerService;

    @GetMapping("/kafka/health")
    public String sendMessage() {

        kafkaProducerService.sendMessage("health", "ok");
        return "send message to health topic!";
    }
}

[ Spring Server B : Consumer ]

application.yml

kafka:
  bootstrap-servers: localhost:9092, localhost:9093, localhost:9094 # kafka 브로커 주소

KafkaConsumerConfig.java : 컨슈머 환경설정

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "health_group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

KafkaConsumerService.java : 수신 서비스

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumeService {

    @KafkaListener(topics = "health", groupId = "health_group", containerFactory = "kafkaListenerContainerFactory")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        String message = record.value();
        long offset = record.offset();

        log.info("Consumed message: {}, offset : {}", message, offset);

        acknowledgment.acknowledge();
    }
}

참고 자료

0개의 댓글