Flux와 WebClient로 비동기 처리

hongo·2025년 4월 17일
0

상황 : 동기로 동작하는 로비폰 업데이트 로직 개선 필요

  • 업데이트 해야하는 로비폰들의 ip들을 받아온다.
  • Ip하나당 단지서버에 RestAPI 요청을 보내 로비폰을 업데이트 한다.(RestAPI 요청은 RestTemplate을 사용해 동기로 요청하고 있다.)

간략한 코드 예시

            for (String ip : ips) {
                param.put("ipAddr", ipData);

				// 단지서버에 REST API 요청
                // 내부적으로 RestTemplate을 사용한 동기 요청 중
                RestResult restResult = module.postToSiteServer(domainUrl, token, param);
public RestResult postToSiteServer(String domainUrl, String token, Object dto) throws Exception {

    	...

        HttpEntity<Object> requestEntity = new HttpEntity<>(dto, headers);

        ResponseEntity<RestResult> responseEntity =
                restTemplate.exchange(domainUrl, HttpMethod.POST, requestEntity, RestResult.class);

        RestResult restResult = responseEntity.getBody();

        if (restResult.isErrorCode()) {
            throw new Exception(SITE_POST_ERROR);
        }

        return restResult;
}

문제

여러개의 로비폰을 업데이트 해야할 경우, API 수행 시간이 오래 걸린다.

  • ips의 사이즈가 100일 경우, 정직하게 100번의 REST API 요청을 동기로 보내고 있다. ips 100개로 요청했을 때 평균 소요 시간 약 2분

RestTemplate 대신 WebClient를 사용한 비동기 처리로 변경

    public Mono<RestResult> siteServerAsyncPostData(String domainUrl, String token, Object dto) {
       
        return WebClient.builder()
                .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(15 * 1024 * 1024))
                .build()
                .post()
                .uri(domainUrl)
                .contentType(MediaType.APPLICATION_JSON)
                .header(Constants.tokenApi.TOKEN_KEY, token)
                .body(BodyInserters.fromValue(dto))
                .exchangeToMono(response -> response.bodyToMono(SSPRestResult.class))
                // 타임아웃등 네트워크 I/O 오류 발생 시 처리
                .doOnError(WebClientRequestException.class, ex -> 
                    log.error(
                          "단지서버로 비동기 POST 요청을 보내던 중 네트워크 I/O 에러 발생 : domainUrl={}, token={}, dto={}", 
                          domainUrl, token, dto
                    )
                )
                .onErrorMap(WebClientRequestException.class,
                        e -> new ExceptionBuilder()
                                .id(SITE_CONNECT_ERROR.getErrId())
                                .build());
    }

WebClient의 결과를 Flux로 모은다.

Flux란? 비동기/논블로킹으로 데이터를 방출할 수 있는 스트림. 소비자가 해당 데이터를 소비한 후의 결과도 모아올 수 있다.
(으아악Mono랑Flux뭐가이렇게많아공부할거천지다진짜)

    private List<String> sendDeviceUpdateRequestToSiteServer(DeviceUpdateReqDTO reqDTO) {
        ConcurrentLinkedQueue<String> failedIps = new ConcurrentLinkedQueue<>();
        List<String> ipList = reqDTO.getIpList();
        List<SSPRestResult> result = Flux.fromIterable(ipList) // 파라미터안의 원소들을 차례대로 방출
                .flatMap(ip -> {
                    // param 복사 (같은 map 사용시 동시성 이슈가 생길 수 있음)
                    Map<String, Object> paramPerIp = new HashMap<>(reqDTO.getParam());
                    paramPerIp.put("ipAddr", ip);
                    return module.siteServerAsyncPostData(
                                    reqDTO.getDomainUrl() + Constants.ModuleValleyHNSINBASEUrlWebApi.HNS_INBASE_12,
                                    reqDTO.getToken(),
                                    paramPerIp
                            )
                            .flatMap(restResult -> {
                                if (restResult.isSuccessCode()) {
                                    return Mono.just(restResult);
                                }
                                return Mono.error(new RuntimeException("[ERROR] FailToUpdateTarInSiteServer 단지서버로부터 실패 응답을 받았습니다. code=" + restResult.getCode() + ", message= " + restResult.getMessage()));
                            }, 20) // 한 번에 보낼 수 있는 최대 개수 제한 지정 가능
                            .onErrorResume(e -> { // 에러가 발생하면 Flux흐름 끊김. 중간에 에러가 생기더라도 모든 ip들에 대해 비동기 처리를 하고싶다면, 에러를 받아서 Mono로 반환해주셈.
                                log.error("[ERROR] FailToUpdateTarInSiteServer 단지서버에 대상장치 업데이트 요청 중 예외 발생 ip : \"{}\", message: \"{}\"", ip, e.getMessage());
                                failedIps.add(ip);
                                return Mono.empty();
                            });
                })
                .collectList()
                .block(); // 각 ip에 대한 REST API 요청은 동시에 보내되, 각 API에 대한 결과는 한 번에 받아 응답해야했기에 block 사용함 

        List<String> failIps = new ArrayList<>(failedIps);
        return failIps;
    }
  • flatMap( ... , 20) 과 같이 동시 요청 최대 제한 개수를 정해줄 수 있다. 이 경우 Flux에서 최대 20개까지 요청을 한 번에 보내고 onComplete()이벤트가 발생하는 요청이 생기면 그 이후에 요청을 이어서 보낸다.
    • API 요청을 받는 서버측의 부하를 고려하고 싶다면 위 옵션 활용 가능.

부록- 이것저것테스트

        ConcurrentLinkedQueue<String> failIpList = new ConcurrentLinkedQueue<>();
        List<String> ipList = infmgrSmartManageUpTarReqDTO.getIpList();
        List<RestResult> result = Flux.fromIterable(ipList)
                .parallel(20) // Flux 병렬 처리 가능
                .runOn(Schedulers.parallel())
                .flatMap(ip -> {
                    // param을 복사하지 않고 그대로 사용 - 동시성이슈발생
                    param.put("ipAddr", ip);
                    try {
                        // 의도적으로 딜레이 (타이밍 충돌 유도)
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                    }
                    log.info("[INFO] ip 업데이트 요청중");
                    return module.siteServerAsyncPostData(
                                    domainUrl + Constants.ModuleValleyHNSINBASEUrlWebApi.HNS_INBASE_12,
                                    token,
                                    paramPerIp
                            )
                            .onErrorResume(e -> {
                                log.error("[ERROR] FailToUpdateTarInSiteServer 단지서버에 대상장치 업데이트 요청 중 예외 발생 ip {}, message: {}", ip, e.getMessage());
                                failIpList.add(ip);
                                return Mono.empty();
                            });
                })
                .sequential()
                .collectList()
                .block();
        List<String> failIps = new ArrayList<>(failIpList);

1. parallel로 바꿨을 때,

바꾸기 전 : http-nio-9000-exec 스레드 1개, reactor-http-nio 스레드 11개.
11개는 챗지피티가 cpu코어개수 * 2 라는데 맞는 거 같음.

parallel로 바꿨을 때 : parallel스레드 11개, reactor-http-nio 스레드 여러개

굳이 parallel로 안바꿔도 되는 이유 : io작업이 긴 애이기 때문에 멀티스레드로 돌릴 이유가 없음. 어차피 io대기해야함. ip 900개로 테스트했을 때도 수행시간 3~4초로 비슷함.

2. param복사하지 않고 바로 사용했을 때,

위 케이스에서 동시성 이슈 발생 : 당연함. 멀티스레드라서 ㅋㅋ. parallel옵션 빼고도 그냥 비동기 상태에서 동시성 이슈 생기는지 궁금했는데, ip1000개로 10번정도 돌렸을 때 한 번도 동시성 이슈 생기지는 않음. 그러나 챗지피티말로는 동시성 이슈가 발생가능한 상황이라함. cpu 성능이 좋아서 동시성 케이스가 발견이 안됐을지도...

profile
https://github.com/hgo641

0개의 댓글