[Redis] Redission을 이용한 DistributedLockTemplate

0

현재 서비스 중인 관광 여행 플랫폼의 도메인 중 예약과 주문, 쿠폰이 있다.

서버는 여러대여도 바라보는 DB는 한대였기에 레코드 락으로 처리를 하였지만 관광 여행 플랫폼이다 보니 조회 비중이 많은 부분을 차지하고 있다.

엉망인 조회 쿼리도 개선해나가고 있지만 캐싱이 필요해졌고 Redis를 도입하기로 한 김에 DB I/O를 좀 덜고자 동시성 제어도 레디스로 변경하기로 마음 먹었다.

자료를 찾던 중 Spring AOP와 애너테이션으로 동작하게 만드는 기법이 있었는데 정말 훌륭하고 멋있었다.

락을 사용하는 주요 목적은 데이터의 일관성을 보장하기 위함

동시에 여러 스레드나 프로세스가 동일한 데이터에 접근하려고 할 때, 락을 통해 해당 데이터에 대한 접근을 동기화하여 데이터의 무결성을 보장하기 위해 사용한다.

락은 동시성을 제한하기 때문에, 너무 많은 락이나 긴 시간 동안의 락을 점유하고 있으면 서로 다른 스레드나 프로세스가 다른 순서로 락을 요청하면 데드락(Deadlock)이 발생할 수 있다.

    @DistributedLock(prefix = LOCK_NAME_PREFIX, key = "#productId")
    public void consumeStock(final Long productId, final int quantity) {
        Product product = findProductById(productId);
        product.decreaseStock(quantity);
    }

락은 최소한으로 잡아야한다. 위의 코드는 정상적으로 재고를 차감한다. 정말 간결하고 원하는 동시성 문제를 간편하게 해결했다.

하지만 락을 최소한으로 잡는건 좋았지만 실제 상황(?)에서는 요청 1개, 1개에 대한 처리만 하지 않는다.

예를 들면, 주문이던 예약이던 옵션이 하나가 아니다.

상품이라면 상품 옵션, 예약이라면 예약 기간, 객실, 옵션 등 처리해야할 요소들이 많다.

이 말은 즉슨 데이터베이스에 조회 쿼리가 N번만큼 날라간다는 뜻이다.

베타적 잠금을 획득하는건 좋았지만 DB I/O를 괴롭히는 것은 아니지 않나..?

적절한 트레이드오프인가? 일단 아니라고 생각했다.

    public void consumeStock(final List<OrderProduct> orderProducts) {

        Map<Long, Integer> productConsumeQuantities = orderProducts
                .stream()
                .collect(Collectors.toMap(OrderProduct::getProductId, OrderProduct::getQuantity, Integer::sum));

        List<Product> products = productRepository.findByIdIn(productConsumeQuantities.keySet());

        validateProductExistence(products, productConsumeQuantities);

        decreaseToStock(products, productConsumeQuantities);
    }
    
    private void validateProductExistence(final List<Product> products, final Map<Long, Integer> productConsumeQuantities) {
        Set<Long> productIds = products.stream().map(Product::getId).collect(Collectors.toSet());

        Set<Long> missingProductIds = productConsumeQuantities
                .keySet()
                .stream()
                .filter(productId -> !productIds.contains(productId))
                .collect(Collectors.toSet());

        if (!missingProductIds.isEmpty()) {
            throw new ProductNotFoundException("Product not found: " + missingProductIds);
        }
     }
     
	private void decreaseToStock(final List<Product> products, final Map<Long, Integer> productConsumeQuantities) {
        products.forEach(product -> {
            int quantityToDecrease = productConsumeQuantities.get(product.getId());
            product.decreaseStock(quantityToDecrease);
        });
    }  

간단한 주문 상품을 처리하는 코드다. 주문 상품 정보를 리스트로 받아 목록을 조회해오고, 검증 하고, 상태를 변경한다.

나는 여기서 떠오른 생각이 RestTemplate, TransactionTemplate와 같이 스프링에서 제공해주는 구상 클래스들 처럼 만들고 싶었다.

그래서 만들었다. DistributedLockTemplate

public interface DistributedLockOperations {

    <T> T execute(Supplier<T> supplier);

    <T> T execute(String prefix, Set<Long> keys, Supplier<T> supplier);

    default void execute(String prefix, Set<Long> keys, Runnable runnable) {
        execute(prefix, keys, () -> {
            runnable.run();
            return null;
        });
    }

    default void execute(Runnable runnable) {
        execute(() -> {
            runnable.run();
            return null;
        });
    }
}

위와 같은 인터페이스를 지원한다.

사용방법

1. 환경설정

기본 설정을 프로퍼티로 설정할 수 있다.

PropertySource 지원

@ConfigurationProperties(prefix = "distributed-lock")
public record DistributedLockProperties(
        long leaseTime,
        long waitTime,
        TimeUnit timeUnit
) {}

application.yml

distributed-lock:
  lease-time: 3
  wait-time: 5
  time-unit: seconds

Application Bean Configuration

아래와 같이 Bean을 등록하면 사용할 준비는 끝이났다.

@Configuration(proxyBeanMethods = false)
public class ApplicationBeanConfiguration {

    @Bean
    public DistributedLockDefinition distributedLockDefinition(DistributedLockProperties distributedLockProperties) {
        return new DefaultDistributedLockDefinition(
                distributedLockProperties.waitTime(),
                distributedLockProperties.leaseTime(),
                distributedLockProperties.timeUnit()
        );
    }

    @Bean
    public DistributedLockTemplate distributedLockTemplate(
            RedissonClient redissonClient,
            DistributedLockDefinition distributedLockDefinition,
            TransactionTemplate transactionTemplate
    ) {
        return new DistributedLockTemplate(redissonClient, distributedLockDefinition, transactionTemplate);
    }
}

API

// 단건 처리
 distributedLockTemplate
	.key("LOCK_KEY" + "-" + Thread.currentThread().getName())
    .waitTime(2)
    .leaseTime(3)
    .withInTransaction(true)
    .execute(() -> {});

// 리스트 처리
 distributedLockTemplate 
    .execute(PREFIX, keySet(), () -> {});    


@Service
@RequiredArgsConstructor
public class OrderProductService {

    private final OrderProductOperations orderProductOperations;

    public void orderProcessing(OrderForm orderForm) {

        List<Product> products = orderProductOperations.consumeStock(orderForm.getOrderProducts());

        try {
            OrderSheet orderSheet = new OrderSheet(products);
            
            // ...
        } catch (OrderProcessingException e) {
            orderProductOperations.restock(orderForm.getOrderProducts());
            throw e;
        }
    }
}

@Service
public class ProductService implements OrderProductOperations {

    @Override
    public List<Product> consumeStock(final List<OrderProduct> orderProducts) {
        return executeStockOperation(orderProducts, this::decreaseToStock);
    }

    @Override
    public List<Product> restock(final List<OrderProduct> orderProducts) {
        return executeStockOperation(orderProducts, this::increaseToStock);
    }

    private List<Product> executeStockOperation(List<OrderProduct> orderProducts, StockOperations stockOperations) {
        Map<Long, Integer> productConsumeQuantities = calculateProductConsumeQuantities(orderProducts);

        return distributedLockTemplate.execute(LOCK_NAME_PREFIX, productConsumeQuantities.keySet(), () -> {
            List<Product> products = findProductByIds(productConsumeQuantities);
            validateProductExistence(products, productConsumeQuantities);
            stockOperations.apply(products, productConsumeQuantities);
            return products;
        });
    }
    // ...
}

Distributed Lock Template에 예제코드와 테스트 코드가 있습니다. 궁금하신 분들은 참고하세요.

위에서도 말했지만 락은 최소한으로 잡아야한다. 주문 프로세스를 처리하는데 주문 처리 메서드가 시작되고 거대한 하나의 트랜잭션으로 시작해야할까?

락 뿐 아니라 트랜잭션도 마찬가지다. 작업단위를 최소로 가져가려고 하는 습관을 들여야된다고 생각한다.

재고 바꾸다 터지면 바로 주문 프로세스 종료, 재고를 먼저 선처리하고 잠금을 반환하며 포스트 프로세스가 돌다 터지면 보상 트랜잭션을 실행한다.

레디스에 재고를 올려놓고 처리해도 될 것 같다. 나중에 해봐야지..

0개의 댓글