락 말고 동시성 문제 처리하기

텐저린티·2024년 9월 12일
0

🎯 사건의 발단

지난번 포스팅에서 Spring 의 @Transactional 어노테이션이 JPA 비관적 락에 주는 영향을 파악하고 트랜잭션의 전파속성을 REQUIRES_NEW 로 설정하여 동시성 문제를 해결했다.

하지만 놀랍게도 문제는 해결된 것이 아니었다. (놀라놀라워)
데드락이 발생했기 때문.

이번 포스팅에서는 아래 항목을 위주로 코드를 변경시켰다.

  • 서평픽 등록 시 포인트 지급 동시성 문제 해결
  • 데드락 발생 방지
  • 포인트 로직 동작 방식 변경

지난 포스팅에서 동시성 문제를 직면해 해결하고자 했던 문제는 서평 등록 시 서평 작성자에게 포인트 지급이라는 상황이었다.

이번 포스팅에서도 같은 문제를 처리할 것이다.
다만, 수정을 하면서 출석 체크 시 멤버에게 포인트 지급하는 상황에서 포인트 지급 로직이 트랜잭션 범위 바깥으로 넘어가 의도하지 않은 결과를 내었기에 이를 해결하는 문제도 함께 다뤘다.

🔍 톺아보기

문제는 끝나지 않았다

전파 속성을 수정하여 문제를 해결한 줄 알았으나 그렇지 않았다.
데드락이 발생했다.

데드락 발생 서순

  1. 서평픽 등록 로직에서 review 를 비관적 쓰기락으로 가져옴
  2. review.member().memberId() (리시버) 를 호출하면서 프록시 통해 락이 전파됨
  3. 포인트 로직에서 리시버에 다시 비관적 쓰기락 시도
  4. 이전 트랜잭션과 다른 트랜잭션으로 시작(REQUIRES_NEW)했기 때문에 비관적 락 수행
  5. 서로 쓰기를 위해서 서로의 읽기 락이 배제될때까지 점유 대기
  6. 점유대기/비선점/순환대기 3박자로 데드락 발생

결과적으로 전파속성을 활용해서 문제를 해결하는 것은 불가능했다.
문제를 해결할 다른 방법이 필요했다.

메시지 스트림을 활용한다면?

카프카나 레디스 스트림 같은 메시지 스트림 시스템은 Consumer 측에 버퍼라는 개념이 존재한다고 한다.
Producer 가 발송한 메시지들은 그 순서대로 Queue 처럼 Buffer 에 쌓이는 것이다.

순서보장!

딱 나에게 필요하다고 생각했다.
심지어 현재 서재 서비스에서 서점 서비스로 넘어갈때도 포인트 로직은 두 모듈에서 함께 사용할텐데,
이참에 모듈 의존성 줄일 수 있도록 스트림 시스템을 적용하는 것도 나쁠것 없어보였다.

그렇다면 문제는 Kafka 냐, Redis Streams 냐.

짜장이냐 짬뽕이냐

![[Pasted image 20240912112335.png]]
표를 보면 알 수 있듯이, 두 시스템은 목적과 지향점이 다르고, 그에따라 데이터가 저장되는 위치도 다른 것을 볼 수 있다.

우리는 대규모의 데이터를 다루는 것이 아니라, 그저 포인트를 지급할 트리거가 필요한 것 뿐이니 레디스 스트림이 조금 더 적합하다고 할 수 있다.

또한, 현재 AWS EC2에 서버를 배포하고 있는데 돈이 없어서 로드 밸런서를 두었음에도 단일 인스턴스만 사용하도록 설정해두었으므로 단일 노드에서의 퍼포먼스가 더욱 필요한 것도 사실이다.
카프카와 달리 레디스는 인메모리로 동작하기 때문에 인스턴스 내에서는 디스크보다 훨씬 빠를 수밖에 없을것이다.

암튼 비교표 읽어보면 레디스 스트림을 두고 카프카를 쓴다면 닭 잡는데 소칼 쓰는 기분이 드는 것 같았다.

카프카 이외에도 RabbitMQ 같은 다른 도구들이 많았고, 심지어 경량/순서보장/인메모리 등등 내가 레디스를 선택한 목적에 더욱 부합하는 도구들이 있었다.

하지만, 나의 프리티어 EC2는 배포할때마다 메모리를 70%나 먹어버리는 괴물이기 때문에 추가적인 메모리 소비를 줄이고 싶었다.
레디스는 이미 JWT 토큰 관리에 사용하고 있어서 선택했다.

🏗️ 구조

Redis Stream 구조 큰틀

Redis Stream 도 여느 메시지 스트림 시스템과 마찬가지로 Producer, Consumer 개념이 있다.

프로듀서는 그저 메시지를 만들고, 이를 수신할 컨슈머 혹은 컨슈머 그룹을 지정해서 보내면 된다.
그 이후로는 신경쓰지 않아도 된다.

컨슈머 혹은 컨슈머 그룹은 XADD 된 메시지를 XREAD/XREADGROUP 으로 읽고 작업을 완료하면 XACK 을 보내어 메시지의 완료 여부를 알린다.

메시지들은 큐처럼 컨슈머(그룹)에 순서대로 쌓이게 되고, 컨슈머를 이를 순차적(FIFO) 처리하므로 나는 이를 활용해 동시성 문제를 해결하고자 한다.

함정이 하나 있는데, 컨슈머 그룹의 경우 그룹 내 여러 컨슈머가 있다면 해당 그룹으로 포워딩되는 메시지를 컨슈머가 각각 중복없이 나눠 갖는다.
이러한 경우에는 순서보장이 되지 않는다.
따라서 레디스 스트림을 활용한 동시성 처리에서는 컨슈머 그룹 당 하나의 컨슈머만 등록하여 사용해야 한다.
병렬성을 갖는 순간 동시성은 무너진다.(엄밀히 말하면 동시성 확보를 위한 추가 처리가 필요하다.)

Redis Stream 동작 구조

출처: https://jybaek.tistory.com/935

  • 조금 더 추상화한 버전
    출처: https://jybaek.tistory.com/935
    기본적인 동작 구조다.
  1. 프로듀서가 XADD 로 Stream Key, ConsumerGroup, Consumer, Message 를 보냄
  2. 각 메시지는 위의 정보 + offset(record Id) 를 활용해 식별
  3. XGROUPREAD / XREAD 로 컨슈머(그룹)이 메시지 수신
  4. 작업을 완료한 경우 ACK / 그렇지 않은 경우 PENDING으로 메시지 이동
  5. (추가사항) ACK된 메시지가 더이상 필요치 않거나, PENDING 된 메시지가 필요하지 않은 경우 XDEL 로 메시지 제거
    • XDEL 로 제거되지 않은 메시지는 계속 메모리에 잔류하기 때문에 명시적으로 지워줘야 함

🧳 준비물

build.gradle

dependencies {
	// REDIS  
	implementation('org.springframework.boot:spring-boot-starter-data-redis')  
	testImplementation('com.redis.testcontainers:testcontainers-redis-junit:1.6.4')
}

Config

@Configuration  
public class RedisConfig {  
  
  @Value("${spring.data.redis.host}")  
  private String host;  
  
  @Value("${spring.data.redis.port}")  
  private int port;  
  
  @Bean  
  public RedisConnectionFactory redisConnectionFactory() {  
    return new LettuceConnectionFactory(host, port);  
  }  
  
  @Bean  
  public RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory redisConnectionFactory) {  
    RedisTemplate<?, ?> redisTemplate = new RedisTemplate<>();  
    redisTemplate.setConnectionFactory(redisConnectionFactory);  
    redisTemplate.setKeySerializer(new StringRedisSerializer());  
    redisTemplate.setValueSerializer(new StringRedisSerializer());  
    return redisTemplate;  
  }  
}

MessageStreamer 클래스 구현

레디스 스트림의 동작에 매핑되는 메소드를 미리 구현해두었다.

@Slf4j  
@Component  
public class MessageStreamer {  
  
  public static final String ERROR_COUNT_KEY = "error_count";  
  private static final ObjectMapper objectMapper = new ObjectMapper();  
  
  private final RedisTemplate<String, String> redisTemplate;  
  
  public MessageStreamer(RedisTemplate<String, String> redisTemplate) {  
    this.redisTemplate = redisTemplate;  
  }  
  
  public Object getRedisValue(String key, String field) {  
    return this.redisTemplate.opsForHash().get(key, field);  
  }  
  
  public void increaseRedisValue(String key, String field) {  
    this.redisTemplate.opsForHash().increment(key, field, 1);  
  }  

  // XADD : 스트림에 메시지 추가
  public void addToStream(String streamKey, MessageEvent message) {  
    try {  
      this.redisTemplate.opsForStream()  
          .add(StreamRecords.newRecord()  
              .ofObject(objectMapper.writeValueAsString(message))  // String 으로 객체를 보내기 위해 JSON으로 변환해줬습니다.
              .withStreamKey(streamKey));  
  
      log.info("Message added to stream: key: {} / mss: {}", streamKey, message);  
    } catch (JsonProcessingException e) {  
      log.error("Failed to serialize message to Json - key:{} / mss: {}", streamKey, message);  
    }  
  }  

  // XACK : 메시지 처리 완료
  public void ackStream(  
    String streamKey, String consumerGroupName, ObjectRecord<String, String> message) {  
	this.redisTemplate.opsForStream().acknowledge(streamKey, consumerGroupName, message.getId());  
    log.info("Message responses ack: group: {} / mss: {}", consumerGroupName, message);  
  }

  // XCLAIM : Pending 메시지를 다시 수행
  public void claimStream(PendingMessage pendingMessage, String consumerName) {  
    RedisAsyncCommands commands =  
        (RedisAsyncCommands) Objects.requireNonNull(this.redisTemplate.getConnectionFactory())  
            .getConnection()  
            .getNativeConnection();  
  
    CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)  
        .add(pendingMessage.getIdAsString())  
        .add(pendingMessage.getGroupName())  
        .add(consumerName)  
        .add("20")  
        .add(pendingMessage.getIdAsString());  
  
    commands.dispatch(CommandType.XCLAIM, new StatusOutput<>(StringCodec.UTF8), args);  
  
    log.info("Message claimed: id: {} / group: {} / consumer: {}",  
        pendingMessage.getIdAsString(),  
        pendingMessage.getGroupName(),  
        consumerName);  
  }  

  // XREAD one : 스트림에 있는 메시지 하나를 읽음
  public ObjectRecord<String, String> findStreamMessageById(String streamKey, String id) {  
    List<ObjectRecord<String, String>> objectRecords =  
        this.findStreamMessageByRange(streamKey, id, id);  // XREAD의 기본방식인 range 탐색 방식을 활용해서 단건 조회 구현
    if (objectRecords.isEmpty()) {  
      return null;  
    }  
    return objectRecords.getFirst();  
  }  

  // XREAD range : 스트림에 있는 메시지를 아이디를 기준으로 범위로 읽음
  public List<ObjectRecord<String, String>> findStreamMessageByRange(  
      String streamKey, String startId, String endId) {  
    return this.redisTemplate.opsForStream()  
        .range(String.class, streamKey, Range.closed(startId, endId));  
  }  

  // XDEL
  public void deleteFromStream(String streamKey, RecordId id) {  
    this.redisTemplate.opsForStream().delete(streamKey, id);  
    log.info("Message deleted: key: {} / id: {}", streamKey, id);  
  }  

  // XPENDING
  public PendingMessages findStreamPendingMessages(  
      String streamKey, String consumerGroupName, String consumerName) {  
    return this.redisTemplate.opsForStream()  
        .pending(  
            streamKey,  
            Consumer.from(consumerGroupName, consumerName),  
            Range.unbounded(),  
            100L);  
  }  
  
  public boolean isStreamConsumerGroupExist(String streamKey, String consumerGroupName) {  
    Iterator<StreamInfo.XInfoGroup> iterator = this.redisTemplate  
        .opsForStream().groups(streamKey).stream().iterator();  
  
    while (iterator.hasNext()) {  
      StreamInfo.XInfoGroup xInfoGroup = iterator.next();  
      if (xInfoGroup.groupName().equals(consumerGroupName)) {  
        return true;  
      }  
    }  
  
    return false;  
  }  

  // 컨슈머 그룹 생성
  public void createStreamConsumerGroup(String streamKey, String consumerGroupName) {  
    if (Boolean.TRUE.equals(this.redisTemplate.hasKey(streamKey))) {  
      if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {  
        this.redisTemplate.opsForStream()  
            .createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);  
        log.info("Consumer group created: key: {} / group: {}", streamKey, consumerGroupName);  
      }  
      log.info("Consumer group already exists: key: {} / group: {}", streamKey, consumerGroupName);  
      return;  
    }  
  
    RedisAsyncCommands commands =  
        (RedisAsyncCommands) Objects.requireNonNull(this.redisTemplate.getConnectionFactory())  
            .getConnection()  
            .getNativeConnection();  
  
    CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)  
        .add(CommandKeyword.CREATE)  
        .add(streamKey)  
        .add(consumerGroupName)  
        .add("0")  
        .add("MKSTREAM");  
  
    commands.dispatch(CommandType.XGROUP, new StatusOutput<>(StringCodec.UTF8), args);  
  
    log.info("Consumer group created: key: {} / group: {}", streamKey, consumerGroupName);  
  }  

  // 컨슈머 리스너 컨테이너 등록
  public StreamMessageListenerContainer createStreamMessageListenerContainer() {  
    log.info("StreamMessageListenerContainer created");  
    return StreamMessageListenerContainer.create(  
        Objects.requireNonNull(this.redisTemplate.getConnectionFactory()),  
        StreamMessageListenerContainer  
            .StreamMessageListenerContainerOptions.builder()  
            .targetType(String.class)  
            .pollTimeout(Duration.ofMillis(20))  
            .build()  
    );  
  }  
}

📺 진행과정

Producer

컨슈머가 소비할 메시지를 만들어내는 역할을 한다.

@Service  
public class PointEventProducer {  
  
  private final MessageStreamer streamer;  
  
  public PointEventProducer(MessageStreamer streamer) {  
    this.streamer = streamer;  
  }  
  
  public void creditPointForReviewPick(long pickerMemberId, long receiverMemberId) {  
  streamer.addToStream(  
      StreamKey.POINT_EVENT.getKey(),  
      new PointEvent("CREDIT", "REVIEW_PICK_PICKER", pickerMemberId));  
  streamer.addToStream(  
      StreamKey.POINT_EVENT.getKey(),  
      new PointEvent("CREDIT", "REVIEW_PICK_RECEIVER", receiverMemberId));  
  }
  
  public void creditPointForAttendance(long loginId, int attendedCount) {  
    streamer.addToStream(StreamKey.POINT_EVENT.getKey(), new PointEvent("CREDIT", "ATTENDANCE_DAILY", loginId));  
  }   
}

Consumer

Consumer 추상 클래스

컨슈머 빈을 생성하고, 제거하는 과정에서 리스너, 컨슈머 그룹, 구독을 설정하고 종료해야 한다.
이러한 작업들이 모든 컨슈머에게 해당되므로, 추상 클래스를 만들어서 상속하도록 했다.

@Getter  
public abstract class StreamConsumer  
    implements StreamListener<String, ObjectRecord<String, String>>,  
    InitializingBean, DisposableBean {  
  
  private StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer;  
  private Subscription subscription;  
  
  private String streamKey;  
  private String consumerGroupName;  
  private String consumerName;  
  
  @Override  
  public void destroy() throws Exception {  
    if (this.subscription != null) {  
      this.subscription.cancel();  
    }  
    if (this.listenerContainer != null) {  
      this.listenerContainer.stop();  
    }  
  }  

  protected void setUpAndStartConsumer(  
      String streamKey, String consumerGroupName, String consumerName,  
      StreamMessageListenerContainer listenerContainer) throws InterruptedException {  
    this.streamKey = streamKey;  
    this.consumerGroupName = consumerGroupName;  
    this.consumerName = consumerName;  
    this.listenerContainer = listenerContainer;  
    this.subscription = getListenerContainer()  
        .receive(  
            Consumer.from(this.getConsumerGroupName(), this.getConsumerName()),  
            StreamOffset.create(this.getStreamKey(), ReadOffset.lastConsumed()),  
            this);  
  
    this.getSubscription().await(Duration.ofSeconds(2));  
    this.getListenerContainer().start();  
  }  
}

컨슈머 구현 클래스

StreamListener 클래스를 상속하면 꼭 onMessage() 메소드를 구현해야 한다.
레디스 스트림을 통해 컨슈머가 메시지를 수신하고 소비하는 메소드다.

@Service  
public class PointEventConsumer extends StreamConsumer {  
  
  private static final Logger log = LoggerFactory.getLogger(PointEventConsumer.class);  
  private static final ObjectMapper objectMapper = new ObjectMapper();  
  
  private final MessageStreamer streamer;  
  private final PointService pointService;  
  
  public PointEventConsumer(MessageStreamer streamer, PointService pointService) {  
    this.streamer = streamer;  
    this.pointService = pointService;  
  }  
  
  @Override  
  public void afterPropertiesSet() throws Exception {  
    streamer.createStreamConsumerGroup(  
        StreamKey.POINT_EVENT.getKey(), ConsumerGroup.POINT_EVENT_GROUP.getGroup());  
    this.setUpAndStartConsumer(  
        StreamKey.POINT_EVENT.getKey(),  
        ConsumerGroup.POINT_EVENT_GROUP.getGroup(),  
        ConsumerGroup.POINT_EVENT_GROUP.getGroup() + "-consumer",  
        streamer.createStreamMessageListenerContainer());  
  }  
  
  @Override  
  public void onMessage(ObjectRecord<String, String> message) {  
    try {  
      PointEvent pointEvent = objectMapper.readValue(message.getValue(), PointEvent.class);  
      // 수행할 로직 (포인트 로직)
      streamer.ackStream(this.getStreamKey(), message);  // XACK
      streamer.deleteFromStream(this.getStreamKey(), message.getId());  // XDEL
    } catch (NullPointerException e) { 
      log.warn("Invalid Point Event Message", e);  
    } catch (Exception e) {  
      log.warn("Failed to process point event", e);  
    }  
  }  
}

MapRecord

public interface MapRecord<S, K, V> extends Record<S, Map<K, V>>, Iterable<Map.Entry<K, V>> {}

위에서 사용한 ObjectRecord 말고 MapRecord 라는게 있다.
일반적인 Map 과는 달리, 세개의 값을 Map 에 넣을 수 있다.
Redis 의 stream 패키지 안에 있으며, Stream 데이터 처리를 위해 만들어졌기 때문에 공식문서에서는 이 객체 사용을 권장한다고 한다.

왜째서 사용하지 않았는가

하지만 나는 이 객체를 사용하면 멀쩡한 key, value 에 갑자기 byte 코드가 붙어버려서 ObjectRecord로 했다.
바이트코드는 변환해봤더니 이런값이 붙어있었다.
괜히 처리하기 어려워서 Json 으로 객체를 변환해서 보냈다.

// 원본
-3 -1 -3 -1 0 0 5 0 117 0 114 0 0 0 2 0 91 0 66 0 -3 -1 -3 -1 23 0 -3 -1 6 0 8 0 84 0 -3 -1 2 0 0 0 0 0 120 0 112 0 0 0 0 0 0 0 17 0 66 0 79 0 79 0 75 0 95 0 82 0 69 0 71 0 73 0 83 0 84 0 82 0 65 0 84 0 73 0 79 0 78 0

// 변환 (String)
In the end, it all comes down to 0 and 1.

PointService

// 서평픽 등록 시 서평 작성자에게 포인트 지급
public void creditPointForReviewReceiver(long receiverMemberId) {  
  pointRepository.creditPoints(receiverMemberId, REVIEW_PICK_RECEIVER);  
}

// 출석체크 시 멤버에게 포인트 지급
public void creditPointForAttendance(long loginId, Activity activity) {  
  if (activity != ATTENDANCE_DAILY) {  
    pointRepository.creditPoints(loginId, activity);  
  }  
  pointRepository.creditPoints(loginId, ATTENDANCE_DAILY);  
}

PointRepository

@Transactional  
public void creditPoints(long memberId, Activity activity) {  
  MemberEntity memberEntity = getMember(memberId);  
  memberEntity.addPoints(activity.getPoints());  
  pointHistoryEntityJpaRepository.save(  
      new PointHistoryEntity(  
          null, memberEntity, activity.getDescription(), activity.getPoints()));  
}

PendingMessageScheduler

@Slf4j  
@EnableScheduling  
@Component  
public class PendingMessageScheduler implements InitializingBean {  
  
  private static final ObjectMapper objectMapper = new ObjectMapper();  
  
  private String streamKey;  
  private String consumerGroupName;  
  private String consumerName;  
  
  private final MessageStreamer streamer;  
  private final PointEventConsumer consumer;  
  
  public PendingMessageScheduler(MessageStreamer streamer, PointEventConsumer consumer) {  
    this.streamer = streamer;  
    this.consumer = consumer;  
  }  
  
  @Override  
  public void afterPropertiesSet() throws Exception {  
    this.streamKey = consumer.getStreamKey();  
    this.consumerGroupName = consumer.getConsumerGroupName();  
    this.consumerName = consumer.getConsumerName();  
  }  
  
  @Scheduled(fixedRate = 10_000)  
  public void processPendingMessage() {  
    PendingMessages pendingMessages =  
        streamer.findStreamPendingMessages(streamKey, consumerGroupName, consumerName);  
  
    for (PendingMessage pendingMessage : pendingMessages) {  
      try {  
        ObjectRecord<String, String> messageToProcess =  
            streamer.findStreamMessageById(this.streamKey, pendingMessage.getIdAsString());  
  
        if (messageToProcess == null) {  
          continue;  
        }  
  
        streamer.claimStream(pendingMessage, consumerName);  
  
        int errorCount =  
            (int) streamer.getRedisValue(ERROR_COUNT_KEY, pendingMessage.getIdAsString());  
        if (errorCount >= 5) {  
          log.warn("재처리 시도 제한 초과 - message: {}", messageToProcess.getValue());  
          streamer.deleteFromStream(streamKey, pendingMessage.getId());  
        } else if (pendingMessage.getTotalDeliveryCount() >= 2) {  
          log.warn("delivery 제한 횟수 초과 - message: {}", messageToProcess.getValue());  
          streamer.deleteFromStream(streamKey, pendingMessage.getId());  
        } else {  
          PointEvent pointEvent =  
              objectMapper.readValue(messageToProcess.getValue(), PointEvent.class);  
          consumer.forwardByCreditType(pointEvent);  
        }  
  
        streamer.ackStream(streamKey, consumerGroupName, messageToProcess);  
        streamer.deleteFromStream(streamKey, pendingMessage.getId());  // 이거 왜 있는지 후술함
      } catch (Exception e) {  
        streamer.increaseRedisValue(ERROR_COUNT_KEY, pendingMessage.getIdAsString());  
        log.warn(  
            "Failed to process pending message. Remained PendingMessages Size: {}",  
            pendingMessages.size());  
      }  
    }  
  }  
}

정상적으로 처리되지 못하고 Pending 처리된 메시지를 처리하기 위해 구현했다.
스프링 스케줄러를 이용해서 10초(=10,000ms) 마다 Pending 메시지를 조회해서 존재하면 재시도하는 방식으로 구현했다.

재처리한 XACK을 통해 처리가 완료된 메시지를 왜 굳이 deleteFromStream 메소드로 지워주었다.

레디스 스트림에서 모든 메시지는 XADD 후 Pending 상태가 되어, XACK 까지 메모리에 저장된다.
이후 XACK 을 받으면 자동으로 해당 메시지가 제거되는것이 아니라 상태만 변화한다.
로그처럼 모든 메시지가 누적된다는 것이다.

나의 경우에는 포인트 지급 로직을 실행하는 트리거로서의 역할만 수행하고, 포인트 지급에 대해서는 별도로 DB에 저장하고 있기 때문에 굳이 데이터를 남길 이유가 없었다.

그래서 이렇게 지워주었다.

그래서 동시성 문제는 해결됐나용?

해결했습니다.

JPA 썼을때보다 훨씬 깔끔하게 해결됐다.
심지어 메시지 스트림 시스템 자체가 비동기로 동작하는 것이기 때문에 성능 상에서도 이점이 있었다.

테스트 수행

K6 로 테스트를 수행했다.
체감상 nGrinder 보다 빠르고 가벼운 느낌이다.
다만 nGrinder 는 테스트 수행시 바로 GUI 에서 확인이 가능한다.
K6의 경우 따로 grafana 와 연동해줘야 한다.

K6 스크립트

import http from "k6/http";  
import {check} from "k6";  
  
export let options = {  
  vus: 100,  
  iterations: 100,  
};  
  
// 토큰 발급을 위한 함수  
function getToken() {  
  const authUrl = 'http://localhost:8080/api/test/login';  
  const authRes = http.post(authUrl);  
  return JSON.parse(authRes.body).appToken;  
}  
  
export default function() {  
  const reviewId = 4;  
  
  const url = `http://localhost:8080/api/reviews/${reviewId}/picks`;  
  const params = {  
    headers: {  
      'Content-Type': 'application/json',  
      'Authorization': `Bearer ${getToken()}`,  
    }  
  }  
  
  const res = http.post(url, null, params);  
  check(res, {  
    "status is 201": (r) => r.status === 201  
  });  
}

결과

서평픽 등록 시 건당 5 포인트 지급되야 한다.
위의 스크립트를 보면 가상 유저(vus)가 100명이므로 총 500 포인트가 쌓여있는 것을 볼 수 있다.
그외에 포인트 내역 레코드도 잘 쌓이는 것을 볼 수 있다.
동시성 문제를 해결한 것이다.
락을 사용하지 않았기 때문에 데드락 발생도 원천 차단한 것

BeforeAfter

|

통계 데이터

K6 통계 데이터다.
위의 스크립트에서 설정한 201 상태코드 체크를 100%로 통과한 것을 볼 수 있다.
따라서 모든 요청이 의도대로 동작했음을 알 수 있다.

또한, http_reqs 는 200이나, 이는 스크립트 시나리오가 토큰 발급 -> 테스트 수행이기 때문에 vus 당 두개의 요청을 실행해서 그런것이다.

http_req_duration 은 평균 180ms 으로, 락을 사용했을때의 요청시간인 440ms 보다 빨라졌다.
또한 90%, 95%의 대부분의 요청들에서 동시성 문제 뿐만 아니라 성능 상의 이점을 확보했다.

락 사용메시지 스트림 사용

추가

메시지 스트림을 사용하면서 JPA 비관적 락을 걸면 어떻게 되나 궁금해서 한 번 해봄.

예상대로 동시성 문제는 없었고, 성능에서 락으로만 해결한 것과 비슷한 수치가 나옴.
이로써 유추가능한 것이 메시지 스트림을 활용해서 메시지를 주고 받는데 걸리는 오버헤드는 많아야 100ms 보다 작다는 것.

트랜잭션 범위는 어떻게 할 것인가

최상단 사건의 발단 부분에서 언급했던 출석체크 시 멤버에게 포인트 지급 상황에 대한 처리를 하고자 한다.

문제의 코드부터 보고 오자.

@Transactional  
public Attendance register(long loginId, LocalDate date) {  
  if (!date.isEqual(LocalDate.now())) {  
    throw new IllegalArgumentException("오늘 날짜만 출석 체크가 가능합니다.");  
  }  
  
  int attendedCount = attendanceRepository.readThisMonth(loginId);  
  
  Member member =  
      memberRepository.update(  
          memberRepository.read(loginId).creditBadgeForAttendance(attendedCount + 1));  
  pointEventProducer.creditPointForAttendance(loginId, attendedCount + 1);  
  return attendanceRepository.create(new Attendance(null, member, null));  
}

ACID 라고, 트랜잭션이 꼭 가져야할 특성을 말한다.
그중에서 문제가 되는 것은 Atomicity, 원자성이다.

만약에 멤버 조회에 실패하거나, 출석 등록에 실패한 경우 위 트랜잭션의 모든 동작은 롤백된다.
하지만, 레디스 스트림으로 호출된 포인트 지급 로직은 트랜잭션의 영향을 받지 않는다.
원래 메소드가 롤백되건, 커밋되건 상관없이 포인트 지급에 성공해버린다.

나는 출석 체크가 모두 성공한다면 포인트를 지급하고, 그렇지 않다면 포인트를 지급하지 않아야 한다.
다시 말해서 트랜잭션이 커밋된 이후에야 포인트 지급 로직을 수행할 수 있다.

@Transactional  
public Attendance register(long loginId, LocalDate date) {  
  if (!date.isEqual(LocalDate.now())) {  
    throw new IllegalArgumentException("오늘 날짜만 출석 체크가 가능합니다.");  
  }  
  
  int attendedCount = attendanceRepository.readThisMonth(loginId);  
  
  Member member =  
      memberRepository.update(  
          memberRepository.read(loginId).creditBadgeForAttendance(attendedCount + 1));  
  
  Attendance attendance = attendanceRepository.create(new Attendance(null, member, null));  

  // 트랜잭션 동기화 관리 클래스 선언
  TransactionSynchronizationManager.registerSynchronization(  
      new TransactionSynchronization() {  
        @Override  
        public void afterCommit() {  // 트랜잭션이 커밋된 후에 해당 메소드 실행
          pointEventProducer.creditPointForAttendance(loginId, attendedCount + 1);  
        }  
      });  
  
  return attendance;  
}

이러한 방식으로 커밋 이후에 로직을 수행할 수 있다.

🔑 결론

메시지 스트림은 원래 비동기성, 병렬성이 핵심이라고 한다.
카프카 같은 경우에는 링크드인에서 의존성 관리하기가 하도 어려워서 만들었다고 하던데, 어플리케이션 간 의존성 낮추는 용도도 핵심이라고 할 수 있다.

그러한 점에서 동시성 문제를 해결하고자 하는 목적이 최우선으로 적용한것이 과연 옳은가에 대한 고민은 있다.

하지만, 옛날에 동료들이었나 멘토였나가 "동시성 문제가 있다고 꼭 락을 쓸 필요는 없어요." 라는 말을 했는데, 이런 방법을 이야기했던게 아닌가 싶다.

내 결론은

  • 모듈 의존성을 낮추고 싶다
  • 순서보장으로 락 없이 동시성 처리하고 싶다
  • 반환값이 필요 없는 경우 비동기로 성능 올리고 싶다
    이러한 상황인 경우 메시지 스트림을 적용하는 것이 상당한 이점이 된다는 것.

ㄱㅅㄱㅅ

profile
개발하고 말테야

0개의 댓글