[Teemo-Project] [1] ElasticSearch로 소환사 대전기록 집계처리 초기 성능개선

Hayoon·2024년 5월 28일
0

Teemo-Project

목록 보기
2/2

토이 프로젝트를 진행하면서 발생했던 문제에 대한 본인의 생각과 고민을 기록한 글입니다.
기술한 내용이 공식 문서 내용과 상이할 수 있음을 밝힙니다.

트롤 분석을 위해서는 소환사의 상세한 정보들을 모두 수집해서 집계(aggregation)을 처리해야 한다.

ElasticSearch를 활용하여 대전 기록에 기반한 집계 처리와 데이터 분석 작업을 수행하여 데이터를 모델링하여 의미 있는 인사이트를 도출해야 하는게 나에게 주어진 과제다.

아직 RiotGames에서 ProductionKey 승인을 받지 못한 상태라 1분에 최대 20개의 API를 호출할 수 있다. 클라우드 서버로 이전하지 않고 로컬이라는 제약상황에서 할 수 있는 성능을 개선해보자.

SpringBoot 구조

Spring 프로젝트는 멀티 모듈로 틀을 잡았다. 각 모듈별 맡은 역할을 수행하고, 모듈 간 RPC통신으로 필요한 데이터를 주고받게 설계하였다. (MSA로 전환하기 위한 빌드업)

아키텍처 설계

3개의 Elasticsearch 노드(es01, es02, es03)가 하나의 클러스터를 구성한다. 각 노드에서 Metricbeat가 메트릭을 수집하고 이를 Elasticsearch 클러스터에 전송한다.Kibana는 이 클러스터에서 데이터를 시각화한다.
멀티노드 구성으로 클러스터의 확장성과 내구성을 제공하며, Metricbeat를 통해 수집된 데이터가 중앙에서 관리되고 분석될 수 있도록 설계해보았다.

데이터 모델링

구상은 아래와 같다.
최근 대전 기록 20개를 토대로 아래와 같이 데이터를 집계하려고 한다.

구조

  1. 최상위 집계: username.keyword 기준으로 그룹화
  2. 하위 집계: 각 그룹 내에서 kills, deaths, assists, kda의 평균값 계산
  3. 추가 하위 집계: championId 기준으로 또 다른 그룹화
  4. 해당 챔피언 그룹 내에서 kills, deaths, assists, kda의 평균값 계산
  5. 추가 하위 집계: lane(Position) 기준으로 또 다른 그룹화
  6. 해당 포지션 그룹 내에서 트롤 산정 알고리즘으로 점수 계산

왜 username.keyword를 사용하는가?

username 필드가 text 타입이라면, Elasticsearch는 이 필드를 분석하여 저장한다. 예를 들어, username 필드 값이 "Ha Yoon"이라면, 이는 "Ha"과 "Yoon"이라는 두 개의 토큰으로 분리된다. 이 경우, 전체 username 값을 기준으로 한 집계나 정확한 매칭이 어려워진다.

이를 해결하기 위해, multi-fields 기능을 사용하여 text 필드와 함께 keyword 하위 필드를 생성할 수 있다. 이 경우, username 필드의 원래 값도 keyword 타입으로 저장되어 "Ha Yoon"으로 정확한 매칭과 집계에 사용할 수 있다.

0. 소환사 데이터 수집

RiotGames에서 API를 호출하여 데이터를 record에 매핑하여 ElasticSearch에 저장하였다.
(전체 코드가 궁금하다면 깃허브 클릭)

Controller

@GetMapping("/ids/{gameName}/{tagLine}")
public ResponseEntity<List<SummonerPerformance>> analyzeSummonerPerformance(@PathVariable("gameName") String gameName,
                                                                            @PathVariable("tagLine") String tagLine) throws Exception {
    List<SummonerPerformance> matchIds = matchService.analyzeSummonerPerformanceForPersonalKey(gameName, tagLine);
    return ResponseEntity.ok(matchIds);
}

Record

@JsonIgnoreProperties(ignoreUnknown = true)
public record SummonerPerformanceRecord
	(String puuid, int targetIndex, String username, String lane, 
    double kda, double killParticipation, int kills, int deaths, int assists,
	int physicalDamageDealtToChampions, int magicDamageDealtToChampions,
	int stealthWardsPlaced, int wardTakedowns, int controlWardsPlaced,
	int detectorWardsPlaced, int damageDealtToBuildings, int dragonTakedowns, 
    int baronTakedowns, int teleportTakedowns, int totalMinionsKilled) {
}

Main Code

private final RestClient restClient;
    private final ConfigProperties configProperties;
    private final SummonerPerformanceRepository summonerPerformanceRepository;

    private final int PAGE_SIZE = 20;

    public List<SummonerPerformance> getItemByUsername(String username) {
        return summonerPerformanceRepository.findByUsername(username);
    }

    public List<SummonerPerformance> analyzeSummonerPerformance(String gameName, String tagLine) throws Exception {
        try {
            Account account = getPuuid(gameName, tagLine);
            String username = String.format("%s#%s", gameName, tagLine);

            List<String> matchIds = getMatchIds(account.puuid());

            List<SummonerPerformance> matchDetails = new ArrayList<>();
            for (String matchId : matchIds) {
                String matchDetail = getMatchDetail(matchId);

                matchDetails.add(jsonToPlayerPerformance(matchDetail, username, account.puuid()));
            }

            log.info(String.valueOf(account));
            log.info(String.valueOf(matchIds));

            storeToElasticsearch(matchDetails);

            return matchDetails;

        } catch (Exception e) {
            e.printStackTrace();
        }
        throw new Exception("error");
    }

1. 소환사명으로 PUUID 가져오기

  private Account getPuuid(String gameName, String tagLine) {
  	return this.restClient.get()
              .uri(uriBuilder -> uriBuilder
                      .path("/riot/account/v1/accounts/by-riot-id/{gameName}/{tagLine}")
                      .queryParam("api_key", configProperties.riot().apikey())
                  .build(gameName, tagLine))
              .retrieve()
              .body(Account.class);
  }

2. PUUID로 매칭ID 가져오기

private List<String> getMatchIds(String puuid) {
    return this.restClient.get()
            .uri(uriBuilder -> uriBuilder
                    .path("/lol/match/v5/matches/by-puuid/{puuid}/ids")
                    .queryParam("start", 0)
                    .queryParam("count", PAGE_SIZE)
                    .queryParam("api_key", configProperties.riot().apikey())
                    .build(puuid))
            .retrieve()
            .body(new ParameterizedTypeReference<>() {
            });
}

3. 매칭ID로 상세 매칭 데이터 가져오기

private String getMatchDetail(String matchId) {
    return this.restClient.get()
            .uri(uriBuilder -> uriBuilder
                    .path("/lol/match/v5/matches/{matchId}")
                    .queryParam("api_key", configProperties.riot().apikey())
                    .build(matchId))
           .retrieve()
           .body(String.class);
}

문제점

현재 외부 API를 총 22번을 호출해야한다. (소환사명으로 PUUID 가져오기 호출 1번 + PUUID로 매칭ID 가져오기 호출 1번 + 매칭ID로 상세 매칭 데이터 가져오기 호출 20번 = 22번)

  1. 만약 100개의 대전 기록을 가져오고 인게임 내 10명의 사용자의 모든 정보를 집계처리한다면?
    → (1 + 1 + 100) * 100 = 10200번 호출이 필요하다.

  2. 동시에 100명의 사용자가 트롤 검색 요청을 한다면?
    → 10200 * 100 = 약 10^6 = 100만번

API 호출이 모두 동기식 처리라 작업이 종료될때까지 다른 작업을 수행하지 못하고 대기해야 한다.

  • 순차적 API 호출: 현재 코드는 각 매치 ID에 대해 getMatchDetail 메서드를 호출하여 매치 세부 정보를 가져온다. 호출은 순차적으로 이루어지기 때문에, 각 호출이 완료되기 전까지 다음 호출을 시작하지 않는다.
  • I/O 블로킹: 동기식 호출에서는 네트워크 I/O 작업이 완료될 때까지 현재 스레드가 블로킹된다. 여러 개의 호출을 동시에 처리하지 못하고, 하나의 호출이 완료될 때까지 기다려야 한다.

따라서, 단건 트롤 검색 로직을 최우선으로 개선시켜야 한다고 판단했다. 성능을 개선한다면 이후 10명의 소환사를 검색하는데에도 latency가 유의미하게 단축될거라고 보여진다.

해결방안

문제를 해결하기 위해 비동기 처리를 도입하면, 외부 API 호출을 병렬로 수행하여 응답 시간을 줄일 수 있을거라 판단했다. 비동기 처리는 I/O 작업이 완료될 때까지 기다리지 않고, 다른 작업을 계속 진행할 수 있다.


비동기 및 비블로킹 I/O vs 비동기 및 블로킹 I/O

CompletableFuture + RestClient

블로킹 I/O: RestClient는 블로킹 I/O를 사용한다. 즉, HTTP 요청을 보내고 응답을 받을 때까지 해당 쓰레드는 블로킹 상태가 된다.

쓰레드 풀: CompletableFuture를 사용하면 작업이 비동기로 처리되지만, 여전히 각 요청은 별도의 쓰레드를 차지하며, 블로킹 I/O 특성상 응답을 기다리는 동안 쓰레드가 waiting 상태가 된다. 따라서 많은 동시 요청이 있을 경우 쓰레드 풀이 모두 사용 중일 수 있으며, 이로 인해 작업 처리량이 제한된다.

private CompletableFuture<Account> getPuuidAsync(String gameName, String tagLine) {
    return CompletableFuture.supplyAsync(() -> restClient.get()
            .uri(uriBuilder -> uriBuilder
                    .path("/riot/account/v1/accounts/by-riot-id/{gameName}/{tagLine}")
                    .queryParam("api_key", configProperties.riot().apikey())
                    .build(gameName, tagLine))
            .retrieve()
            .body(Account.class));
}

WebClient

비블로킹 I/O: WebClient는 비블로킹 I/O를 사용한다. 즉, HTTP 요청을 보낼 때 호출 쓰레드는 즉시 반환되고, 응답이 도착하면 콜백 메커니즘을 통해 처리된다.

Reactor 기반: WebClient는 Netty, Jetty 등 비블로킹 I/O 라이브러리를 기반으로 하여, 적은 수의 쓰레드로 많은 동시 요청을 처리할 수 있다. 이는 특히 네트워크 지연이 큰 외부 API 호출을 처리할 때 효율적이다.

private Mono<Account> getPuuid(String gameName, String tagLine) {
    return webClient.get()
            .uri(uriBuilder -> uriBuilder
                    .path("/riot/account/v1/accounts/by-riot-id/{gameName}/{tagLine}")
                    .queryParam("api_key", configProperties.riot().apikey())
                    .build(gameName, tagLine))
            .retrieve()
            .bodyToMono(Account.class);
}

결론

WebClient를 사용하면 비블로킹 I/O의 특성 덕분에 동시 요청 처리 성능이 크게 향상되며, 네트워크 요청 시 발생하는 블로킹 문제를 해결할 수 있을 것 같다. 고성능의 비동기 HTTP 클라이언트를 필요로 하는 경우 WebClient가 더 적합하다고 판단하여 이를 적용하였다.

코드에 적용해보자

public List<SummonerPerformance> analyzeSummonerPerformance(String gameName, String tagLine) throws Exception {
    try {
        Account account = getPuuid(gameName, tagLine).block();
        String username = String.format("%s#%s", gameName, tagLine);

        List<String> matchIds = getMatchIds(account.puuid()).block();

        List<CompletableFuture<SummonerPerformance>> futures = matchIds.stream()
                .map(matchId -> getMatchDetail(matchId)
                        .map(matchDetail -> jsonToPlayerPerformance(matchDetail, username, account.puuid()))
                        .toFuture()).toList();

        List<SummonerPerformance> matchDetails = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(toList()))
                .join();

        log.info(String.valueOf(account));
        log.info(String.valueOf(matchIds));

        storePerformancesToElasticsearch(matchDetails);

        return matchDetails;

    } catch (Exception e) {
        e.printStackTrace();
    }
    throw new Exception("error");
}

private Mono<Account> getPuuid(String gameName, String tagLine) {
    return this.webClient.get()
            .uri(uriBuilder -> uriBuilder
                    .path("/riot/account/v1/accounts/by-riot-id/{gameName}/{tagLine}")
                    .queryParam("api_key", configProperties.riot().apikey())
                    .build(gameName, tagLine))
            .retrieve()
            .bodyToMono(Account.class);
}

private Mono<List<String>> getMatchIds(String puuid) {
    return this.webClient.get()
            .uri(uriBuilder -> uriBuilder
                    .path("/lol/match/v5/matches/by-puuid/{puuid}/ids")
                    .queryParam("start", 0)
                    .queryParam("count", PAGE_SIZE)
                    .queryParam("api_key", configProperties.riot().apikey())
                    .build(puuid))
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<List<String>>() {
            });
}

private Mono<String> getMatchDetailForPersonalKey(String matchId) {
    return Mono.defer(() -> {
        CheckedFunction0<String> decoratedSupplier = RateLimiter.decorateCheckedSupplier(rateLimiter, () ->
                this.webClient.get()
                        .uri(uriBuilder -> uriBuilder
                                .path("/lol/match/v5/matches/{matchId}")
                                .queryParam("api_key", configProperties.riot().apikey())
                                .build(matchId))
                        .retrieve()
                        .bodyToMono(String.class)
                        .block()
        );
        try {
            return Mono.just(decoratedSupplier.apply());
        } catch (Throwable throwable) {
            return Mono.error(throwable);
        }
    });
}
  1. Stream을 통해 비동기 작업 생성: matchIds.stream().map(...)를 사용하여 각 matchId에 대해 비동기 작업(CompletableFuture)을 생성한다.

  2. 비동기 작업을 리스트로 수집: 생성된 CompletableFuture 객체들을 리스트로 수집하여 futures 리스트에 저장한다.

  3. 모든 비동기 작업 완료 대기: CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))를 사용하여 모든 비동기 작업이 완료될 때까지 대기한다. thenApply를 사용하여 모든 작업이 완료된 후 futures 리스트의 각 CompletableFuture의 결과를 join하여 수집한다. join()은 각 CompletableFuture의 결과를 반환한다.


RateLimiter

@Configuration
public class RateLimitConfig {

    @Bean
    public RateLimiter rateLimiter() {
        RateLimiterConfig config = custom()
            .limitRefreshPeriod(Duration.ofSeconds(1)) // 속도 제한을 1초 단위로 갱신
            .limitForPeriod(20) // 1초당 최대 20개의 요청을 허용
            .timeoutDuration(Duration.ofMillis(500)) // 제한에 걸릴 경우 500밀리초 동안 대기
            .build();
        RateLimiterRegistry registry = RateLimiterRegistry.of(config);
        return registry.rateLimiter("matchServiceLimiter");
    }
}

테스트

  • 동기식 소환사 검색
  • 비동기식 소환사 검색

5357ms 3614ms로 약 1.7초 개선되었다. 순차적 API 요청이 동시에 여러 요청을 병렬로 처리하면서 대기 시간이 중첩되지 않아 총 응답 시간이 줄어든 것 같다.

요약

  • RestClient 기반 코드: 비동기적으로 각 요청을 수행하지만, 기본적으로 블로킹 I/O를 사용하므로 많은 동시 요청 시 쓰레드 풀이 제한적

  • WebClient와 RateLimiter: WebClient는 비블로킹 I/O를 사용하여 더 많은 동시 요청을 효율적으로 처리할 수 있으며, RateLimiter를 사용하여 요청 속도를 제한

  • RateLimiterConfig: RateLimiter 설정을 통해 요청 속도를 제어하여, 과도한 요청으로 인한 성능 저하를 방지

추가)

1주일만에 드디어 RiotGames에서 우리 토이프로젝트가 승인이 되어서 ProductionKey를 받았다!
기존까지는 1분에 최대 20개 API 호출이 가능했었는데, 이제는 최대 1만개까지 API 호출이 가능해졌다. 따라서 RateLimit의 제한이 해제되었다.

기존 코드

List<CompletableFuture<SummonerPerformance>> futures = matchIds.stream()
  .map(matchId -> getMatchDetail(matchId)
      .map(matchDetail -> jsonToPlayerPerformance(matchDetail, username, account.puuid()))
      .toFuture()).toList();

List<SummonerPerformance> matchDetails = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
  .thenApply(v -> futures.stream()
      .map(CompletableFuture::join)
      .collect(toList()))
  .join();

storePerformancesToElasticsearch(matchDetails);

개선 코드

List<CompletableFuture<SummonerPerformance>> futures = matchIds.stream()
  .map(matchId -> getMatchDetail(matchId)
      .map(matchDetail -> {
          SummonerPerformance performance = jsonToPlayerPerformance(matchDetail, username, account.puuid());
          return storePerformanceToElasticsearch(performance);
      })
      .toFuture()).toList();

List<SummonerPerformance> matchDetails = futures.stream()
  .map(CompletableFuture::join)
  .collect(toList());

처음보다 3초 이상 줄었다. (막 0.1초대 될 줄)

성능 향상의 주요 이유

I/O 작업의 병렬 처리
원래 코드에서는 모든 match detail을 처리한 후에 Elasticsearch에 저장한다. 변경된 코드에서는 각 matchDetail을 처리할 때마다 Elasticsearch에 바로 저장한다. 이는 I/O 작업이 병렬로 실행됨에 따라 전체 처리 시간이 줄어드는 효과를 가져온다. 즉, 각 matchDetail이 비동기적으로 저장되므로 전체 처리 과정에서 I/O 대기 시간이 줄어들게 되는 것이다.

CompletableFuture의 join 사용
원래 코드에서는 CompletableFuture.allOf를 사용하여 모든 future가 완료되기를 기다린 후 결과를 수집했다.
변경된 코드에서는 각 future의 join을 사용하여 개별적으로 결과를 즉시 수집한다. 이는 각 future가 완료되면 바로 결과를 수집하고 저장할 수 있으므로, 전체적인 대기 시간을 줄인다.

profile
Junior Developer

0개의 댓글

관련 채용 정보