간략한 코드 예시
for (String ip : ips) {
param.put("ipAddr", ipData);
// 단지서버에 REST API 요청
// 내부적으로 RestTemplate을 사용한 동기 요청 중
RestResult restResult = module.postToSiteServer(domainUrl, token, param);
public RestResult postToSiteServer(String domainUrl, String token, Object dto) throws Exception {
...
HttpEntity<Object> requestEntity = new HttpEntity<>(dto, headers);
ResponseEntity<RestResult> responseEntity =
restTemplate.exchange(domainUrl, HttpMethod.POST, requestEntity, RestResult.class);
RestResult restResult = responseEntity.getBody();
if (restResult.isErrorCode()) {
throw new Exception(SITE_POST_ERROR);
}
return restResult;
}
여러개의 로비폰을 업데이트 해야할 경우, API 수행 시간이 오래 걸린다.
public Mono<RestResult> siteServerAsyncPostData(String domainUrl, String token, Object dto) {
return WebClient.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(15 * 1024 * 1024))
.build()
.post()
.uri(domainUrl)
.contentType(MediaType.APPLICATION_JSON)
.header(Constants.tokenApi.TOKEN_KEY, token)
.body(BodyInserters.fromValue(dto))
.exchangeToMono(response -> response.bodyToMono(SSPRestResult.class))
// 타임아웃등 네트워크 I/O 오류 발생 시 처리
.doOnError(WebClientRequestException.class, ex ->
log.error(
"단지서버로 비동기 POST 요청을 보내던 중 네트워크 I/O 에러 발생 : domainUrl={}, token={}, dto={}",
domainUrl, token, dto
)
)
.onErrorMap(WebClientRequestException.class,
e -> new ExceptionBuilder()
.id(SITE_CONNECT_ERROR.getErrId())
.build());
}
Flux란? 비동기/논블로킹으로 데이터를 방출할 수 있는 스트림. 소비자가 해당 데이터를 소비한 후의 결과도 모아올 수 있다.
(으아악Mono랑Flux뭐가이렇게많아공부할거천지다진짜)
private List<String> sendDeviceUpdateRequestToSiteServer(DeviceUpdateReqDTO reqDTO) {
ConcurrentLinkedQueue<String> failedIps = new ConcurrentLinkedQueue<>();
List<String> ipList = reqDTO.getIpList();
List<SSPRestResult> result = Flux.fromIterable(ipList) // 파라미터안의 원소들을 차례대로 방출
.flatMap(ip -> {
// param 복사 (같은 map 사용시 동시성 이슈가 생길 수 있음)
Map<String, Object> paramPerIp = new HashMap<>(reqDTO.getParam());
paramPerIp.put("ipAddr", ip);
return module.siteServerAsyncPostData(
reqDTO.getDomainUrl() + Constants.ModuleValleyHNSINBASEUrlWebApi.HNS_INBASE_12,
reqDTO.getToken(),
paramPerIp
)
.flatMap(restResult -> {
if (restResult.isSuccessCode()) {
return Mono.just(restResult);
}
return Mono.error(new RuntimeException("[ERROR] FailToUpdateTarInSiteServer 단지서버로부터 실패 응답을 받았습니다. code=" + restResult.getCode() + ", message= " + restResult.getMessage()));
}, 20) // 한 번에 보낼 수 있는 최대 개수 제한 지정 가능
.onErrorResume(e -> { // 에러가 발생하면 Flux흐름 끊김. 중간에 에러가 생기더라도 모든 ip들에 대해 비동기 처리를 하고싶다면, 에러를 받아서 Mono로 반환해주셈.
log.error("[ERROR] FailToUpdateTarInSiteServer 단지서버에 대상장치 업데이트 요청 중 예외 발생 ip : \"{}\", message: \"{}\"", ip, e.getMessage());
failedIps.add(ip);
return Mono.empty();
});
})
.collectList()
.block(); // 각 ip에 대한 REST API 요청은 동시에 보내되, 각 API에 대한 결과는 한 번에 받아 응답해야했기에 block 사용함
List<String> failIps = new ArrayList<>(failedIps);
return failIps;
}
ConcurrentLinkedQueue<String> failIpList = new ConcurrentLinkedQueue<>();
List<String> ipList = infmgrSmartManageUpTarReqDTO.getIpList();
List<RestResult> result = Flux.fromIterable(ipList)
.parallel(20) // Flux 병렬 처리 가능
.runOn(Schedulers.parallel())
.flatMap(ip -> {
// param을 복사하지 않고 그대로 사용 - 동시성이슈발생
param.put("ipAddr", ip);
try {
// 의도적으로 딜레이 (타이밍 충돌 유도)
Thread.sleep(100);
} catch (InterruptedException e) {
}
log.info("[INFO] ip 업데이트 요청중");
return module.siteServerAsyncPostData(
domainUrl + Constants.ModuleValleyHNSINBASEUrlWebApi.HNS_INBASE_12,
token,
paramPerIp
)
.onErrorResume(e -> {
log.error("[ERROR] FailToUpdateTarInSiteServer 단지서버에 대상장치 업데이트 요청 중 예외 발생 ip {}, message: {}", ip, e.getMessage());
failIpList.add(ip);
return Mono.empty();
});
})
.sequential()
.collectList()
.block();
List<String> failIps = new ArrayList<>(failIpList);
바꾸기 전 : http-nio-9000-exec 스레드 1개, reactor-http-nio 스레드 11개.
11개는 챗지피티가 cpu코어개수 * 2 라는데 맞는 거 같음.
parallel로 바꿨을 때 : parallel스레드 11개, reactor-http-nio 스레드 여러개
굳이 parallel로 안바꿔도 되는 이유 : io작업이 긴 애이기 때문에 멀티스레드로 돌릴 이유가 없음. 어차피 io대기해야함. ip 900개로 테스트했을 때도 수행시간 3~4초로 비슷함.
위 케이스에서 동시성 이슈 발생 : 당연함. 멀티스레드라서 ㅋㅋ. parallel옵션 빼고도 그냥 비동기 상태에서 동시성 이슈 생기는지 궁금했는데, ip1000개로 10번정도 돌렸을 때 한 번도 동시성 이슈 생기지는 않음. 그러나 챗지피티말로는 동시성 이슈가 발생가능한 상황이라함. cpu 성능이 좋아서 동시성 케이스가 발견이 안됐을지도...