[4][아비터(Arbit-er) 프로젝트] 과도한 이벤트 발행 문제해결 (🔥93% 감소)

sonjiseokk·2025년 3월 9일
0
post-thumbnail

📌 개요

아주 치명적인 결함을 발견했다.

바로 업비트 소켓을 통해 들어오는 가격 데이터가 매우 많다는 것.. (해당 이미지는 실제로 10초동안 켜둔 상황이다)
것도 업비트만을 한정했을 때, 10초에 1000개인 것인데 바이낸스까지 들어온다면... 절대 감당을 하지 못할 것이다.

1️⃣ 기존 코드의 문제점

기존의 코드를 살펴보자

@Override  
public void onOpen(WebSocket webSocket, Response response) {  
    log.info("업비트 WebSocket 연결 성공");  
  
    try {  
        // 모든 KRW 마켓 종목 조회  
        List<String> markets = upbitMarketSearchClient.getAllKRWMarkets();  
          
        // 구독 메시지 생성  
        String subscribeMessage = objectMapper.writeValueAsString(Arrays.asList(  
                new TicketField("ARBITER-PRICE"),  
                new TypeField("ticker", markets)  
        ));  
  
        webSocket.send(subscribeMessage);  
        log.info("구독 요청 완료");  
    } catch (Exception e) {  
        log.error("구독 요청 실패", e);  
    }  
}

대략적인 설명은 다음과 같다.
1. 업비트와 소켓이 연결됨과 동시에 REST API로 모든 KRW 마켓을 조회한다.
2. 조회한 KRW 마켓 리스트를 TypeField에 넣고, 해당 마켓을 구독한다.

즉, 2025년 3월 9일 기준 161개의 KRW 마켓을 지원하고 있는데, 이 모든 코인의 시세를 구독하고 있는 것이다.
심지어 그 모든 자산들이 한 틱씩만 움직여도 이벤트가 발행되니, 이게 말도 안될 정도로 많이 발행될 수 밖에 없는 것이다.
그리고 제일 중요한 핵심은 그렇게 많이 발생한 이벤트가 쓰잘데기가 없다는게 문제다. (대부분의 시간은 큰 변동을 갖지 않기 때문에)

그래서 내가 추려낸 해결 방안은 다음과 같다.

2️⃣ 타협점

첫번째, 주요 코인들만을 타겟하자.

김프 거래의 핵심은 국내와 해외의 괴리다. 그러나 잡코인의 경우 이러한 괴리가 실제로 발생하기 보다는 호가 차이로 인해 발생하는 경향이 있고, 실제로 발생한 괴리도 유동성이 부족하기 때문에 충분한 거래를 하지 못한다는 단점이 있다.

따라서 유동성이 풍부한 시가총액 10위권 내의 코인들만을 기준으로 서비스를 제공한다면, 김프 매매의 본질을 잃지 않는 것이라 생각했다.

두번째, 변동성을 가지는 상황만을 이벤트로 만들자

많은 이벤트가 발생하는 주요 원인은 가격이 조금만 움직여도 price_topic 이벤트가 발행됨에 있었다.
특히나, 실제로 김프 매매는 가격의 상승폭이 괴랄할 때 크게 요동치는 편인데, 이때가 중요한거지 평상시의 상황은 중요하지 않다고 생각했다.

따라서 해당 코인의 가격을 redis나 메모리에 저장해두고, 5~10분마다 갱신해가며 유효한 가격 움직임이 발생했을 때만 price_topic 이벤트를 발행한다면, 수많은 이벤트 발생 문제를 해결할 수 있다고 생각했다.

3️⃣ 발생한 문제점

특정 코인 가격을 어떻게 저장할 것인가

일단 최초에는 ConcurrentHashMap을 생각했다.
이유는 첫번째 타협점과도 관련이 있는데, 주요 코인만을 타겟팅하기로 했기에 메모리에 저장해도 충분할 정도라고 생각했다.
또, 카프카 컨슈머는 기본적으로 멀티쓰레드를 통해 이벤트를 컨슘하기에 동시성 문제를 방지하기 위해서다.

그러나 구현 과정에서 Redis에 저장할지 메모리에 저장할지가 아닌 다른 곳에서 문제점이 발생하게되었다.

해시맵을 어떻게 업데이트할 것인가

두번째 타협점에서 말했듯, 5~10분 간격을 두고 해당 해시맵의 데이터를 업데이트 하고자 했다.

최초에는 그냥 업비트로부터 API를 호출하면 바로 받아와 업데이트하면 되겠다 생각했지만, 우리 서비스는 이미 업비트로부터 소켓 통신을 하고 있고, 그 과정에서 REST API로 갱신하는 과정은 말이 안된다고 생각했다. (물론 그게 더 안정적이긴 하겠지만 말이다.)

따라서, 기존의 aggregator 모듈의 redis 캐시를 사용하고자 했다.
어그리게이터 모듈은 이미 이벤트로부터 가격정보를 받아온 후, redis에 지속적으로 sync 해오고있었기 때문이다.
이를 사용하면 REST API 호출 없이 ConcurrentHashMap을 업데이트 할 수 있을 것처럼 보인다.

근데 이러면 다음과 같은 문제가 생긴다.

현재 구조는 다음과 같은데, 만약 여기서 ConcurrentHashMap을 업데이트하기 위해 저 레디스 캐시를 사용하려고 한다고 가정해보자.

정리하자면, price_topic 이벤트에 필터링을 걸기 위해 실시간 시세를 이전 시세와 비교하는 용도로 사용하는 ConcurrentHashMap 을 주기적으로 업데이트하기 위해서 레디스 캐시에서 데이터를 꺼내 오려는 상황이다.

이렇게 구현하게 되면 사실, 잘못된 아키텍처가 된다.

  1. CQRS 패턴에 의하면 가격 데이터 수집과 이를 활용한 기능은 분리되어야한다.
  2. 그러나 어그리게이터의 레디스 캐시를 사용하게 되면, Price-collector는 "데이터 수집" 만이 아니라 어그리게이터와 상호작용하는 모듈이 되어버린다.

즉, 이러한 그림의 상황이 되어버린다. 필터링을 하려다보니 서로 의존성을 갖게 된 모습이다.

이에 따라, ConcurrentHashMap을 업데이트하기 위한 수단으로 어그리게이터의 레디스 캐시를 사용하는 것은 좋지 못한 선택이라는 결론에 이르렀다.


🎿 발상의 전환

그렇게 생각을 하던 중, 갑자기 좋은 아이디어가 떠올랐다.

어처피 발행되는 이벤트가 낮은 변동성을 가져 중복 발행되는 경우가 더 많잖아?
Price-collector에서 업데이트하자

그렇다. 만약 업비트로부터 발행되는 이벤트 수가 적다면, 이러한 점에 대해 더 깊게 고민해봐야겠지만, 아키텍처 상으로 데이터는 실시간으로 들어오고 있었다.

따라서, 그냥 해시맵을 초기화하면 자동으로 현재 시점의 가격 데이터를 해시맵에 업데이트할 수 있었다. (가격 데이터가 없을 시에는 해시맵에 데이터를 put하기 때문)

그렇게 만들어진 코드는 다음과 같다. 해당 로직대로면 10분전보다 0.5% 이상 변동성을 가지지 못한다면 이벤트로 발행되지 않기 때문에 최초의 수많은 이벤트 발행 문제를 해결할 수 있다.

public class VolatilityService {  
    // 시장 상황에 따라 조절
    private static final double CHANGE_THRESHOLD = 0.1;
  
    private final Map<String, Double> localPriceCache = new ConcurrentHashMap<>();  
  
    public boolean isVolatilityChanged(UpbitMessageDto upbitMessageDto) {  
        String ticker = upbitMessageDto.getTicker();  
        Double oldPrice = localPriceCache.get(ticker);  
        Double newPrice = upbitMessageDto.getPrice();  
  
        // 처음 들어온 가격 정보인 경우  
        if (oldPrice == null) {  
            log.info("[VolatilityService] Old price is null. Update Cache");  
            // 로컬 캐시에 해당 정보 기입  
            localPriceCache.put(ticker, newPrice);  
            return false;  
        }  
  
        // 현재 가격과 이전 가격 비교 (변동성 측정)  
        double priceChange = Math.abs((newPrice - oldPrice) / oldPrice * 100);  
  
        // 변동성이 임계치보다 높게 발생했다면  
        if (priceChange >= CHANGE_THRESHOLD) {  
            log.info("[VolatilityService] Old price change is {}", priceChange);  
            return true;  
        } else {  
            return false;  
        }  
    }  
  
    // 10분마다 해시맵을 강제로 비움.  
    @Scheduled(fixedRate = 600000)  
    public void resetCache() {  
        log.info("[VolatilityService] Reset cache");  
        localPriceCache.clear();  
    }  
}

UpbitWebSocketListener.java

@Override  
public void onMessage(WebSocket webSocket, okio.ByteString bytes) {  
    try {  
        String text = bytes.utf8();  
        UpbitMessageDto upbitMessageDto = objectMapper.readValue(text, UpbitMessageDto.class);  
  
        if (volatilityService.isVolatilityChanged(upbitMessageDto)) {  
            PriceMessageDto priceMessageDto = new PriceMessageDto();  
            priceMessageDto.setPrice(upbitMessageDto.getPrice());  
            priceMessageDto.setTicker(upbitMessageDto.getTicker());  
            priceMessageDto.setTimestamp(upbitMessageDto.getTimestamp());  
            priceMessageDto.setMarket("UPBIT");  
  
            kafkaProducerService.sendPrice(priceMessageDto);  
        }  
    } catch (Exception e) {  
        log.error("메시지 처리 중 오류", e);  
    }  
}

🔚 결론

해당 필터링을 적용한 이후, 10초에 70개 정도로 확 줄어든 모습을 볼 수 있음.. (1000개에서 70개면 🔥 무려 93% 줄였다.)

바이낸스 콜렉터도 비슷한 방식으로 필터링을 적용해야겠음. (다음 편에)

profile
Software Engineer

0개의 댓글