[프로젝트] 비동기 처리 적용기

dohyun-dev·2023년 8월 2일
1

프로젝트

목록 보기
4/4

기존 시스템의 문제점

기존 휴대폰 인증 기능은 위 FlowChart와 같이 요청을 진행한다.

위 시스템은 대규모 트래픽 요청을 받았을 때 문제점이 있다.

문제점

  1. coolSmsServer 장애시 RTT가 증가한다.
    • 외부 API 지연으로 요청을 받은 Thread는 다른 요청을 처리하지 못하게 된다. -> TPS 감소
    • Client는 요청을 기다린다.

Scale-up, Scale-out 하는 방법으로 해결하면 좋겠지만 외부 API의 문제이기 때문에 해당 방법으로 해결을 하지 못한다.

해결방안

해당 문제를 해결하기 위해서 생각해낸 방안은 2가지가 있다.

  1. Thread 의 개수를 늘린다.
  2. 비동기 처리를 진행한다.

쓰레드의 개수를 늘리면 발생하는 문제점은 자원의 낭비가 발생한다는 점이다. 외부 API의 지연을 이유 하나로 쓰레드를 늘리면 자원 낭비가 발생한다는 점은 필연적일 것이다. 비동기 처리를 하는 것이 맞는 방법으로 보인다.

비동기 처리 방법

비동기 처리를 하는 방법에도 여러 가지가 있다.

  1. 별도의 쓰레드로 비동기 처리를 하는 방법
  2. 메시지 큐를 활용하여 비동기 처리를 하는 방법

나는 메시지 큐를 활용하는 방법의 확장성을 보고 메시지큐를 활용하는 방법을 택했다

적용기

메시지 큐로는 RabbitMQ, Kafka가 있지만 대용량 처리에 더 적합한 Kafka를 이용해서 사용하기로 했다.

Kafka는 RabbitMQ보다 데이터 유실의 위험성이 있지만 대용량 처리에 적합하다.

다양한 휴대폰 인증 서비스를 이용해보면서 재전송이라는 버튼을 두어 데이터 유실에 대비하는 것처럼 보였다.

먼저 카프카 클러스터를 구성하고 브로커의 partition이 2이고 replication이 3으로 구성되게 하였다.

version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - "22181:2181"

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - "32181:2181"

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - "42181:2181"
  
  kafka-1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - 19092:19092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:9092,EXTERNAL://localhost:19092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 2
      
  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:9092,EXTERNAL://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 2

  kafka-3:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "39092:39092"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:9092,EXTERNAL://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 2

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "9000:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181

SpringKafka를 연동하였다.

spring:
  kafka:
    bootstrap-servers: kafka-1:9092, kafka-2:9092, kafka-3:9092
    consumer:
      group-id: spring-app
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
---
spring:
  config:
    activate:
      on-profile: kafka-test
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers:localhost:9092}
  • SpringBootAutoConfiguration 기능을 적극적으로 활용하였다. 😁
  • KafkaTransaction 설정이 있던데 나중에 공부해봐야겠다. 😥
@Service
@RequiredArgsConstructor
public class MessageSendKafkaProducer {

    private static final String topicName = KafkaProperties.SEND_MESSAGE;
    private final ObjectMapper objectMapper;
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(PhoneAuthSendRequest request) {
        String json = "";
        try {
             json = objectMapper.writeValueAsString(request);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        kafkaTemplate.send(topicName, json);
    }
}

카프카로 Message를 전송할 Producer를 구현하였다.

@Slf4j
@Service
@RequiredArgsConstructor
public class MessageSendKafkaConsumer {

    private final PhoneAuthService phoneAuthService;
    private final ObjectMapper objectMapper;

    @KafkaListener(
            topics = KafkaProperties.SEND_MESSAGE,
            groupId = KafkaProperties.CONSUMER_GROUP_ID,
            concurrency = KafkaProperties.DEFAULT_THREAD_COUNT
    )
    public void sendSms(String message) {
        PhoneAuthSendRequest request = null;
        try {
            request = objectMapper.readValue(message, PhoneAuthSendRequest.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        phoneAuthService.sendMessage(request.getRecipientNumber());
    }
}

해당 기능을 구현하면서 ObjectMapperCheckedException 의 처리에 대해서 고민을 많이 했다.

해당 Exception을 Throws하지 않고 해당 클래스에서 처리하는 방법으로 결정했다 ㅎㅎ

package com.ssafy.aejimeongji.global.util;

public interface KafkaProperties {
    String SEND_MESSAGE = "send_message";
    String CONSUMER_GROUP_ID = "spring-app";
    String DEFAULT_THREAD_COUNT = "2";
}

그리고 상수와 같은 값들은 하드코딩해서 박아두지 않고 interface 를 사용한 방법으로 결합도를 낮췄다. 유지 보수성을 높였다. Enum Type을 사용하려고 하였으나. Enum Type을 사용하면 KafkaListener 어노테이션에 해당 값들을 사용하지를 못한다 ㅠ

결과

기존 동기 처리를 Kafka를 통해 비동기 처리로 바꾸면서 서버 TPS를 증가시킬 수 있었다.

또한 Kafka Topic을 사용해 다른 서비스를 개발할 수 있는 장점도 있다.

1개의 댓글

comment-user-thumbnail
2023년 8월 2일

개발자로서 성장하는 데 큰 도움이 된 글이었습니다. 감사합니다.

답글 달기