따릉이 네비게이션 서비스를 만들어보는 과정에서 겪었던 내용을 정리합니다.
초기 코드이다. 기존에 외부 api 결과 값을 가져다 사용하는 기능을 작성했을 때의 코드를 베이스로 아래와 같은 코드를 작성했었다.
외부 api는 따릉이 대여소 정보를 가져와서 대여소ID, 위도경도, 간단한 주소 정보를 저장하는 로직이다.
대여소는 약 3200개 정도이고 1회 요청마다 1000개의 값만을 가져올 수 있다.url path의 1/1000, 1001/2000
부분을 보면 해당 정보가 어떻게 pagination이 되어있는 지 알 수 있다.
for문을 통하여 순차적으로 api 요청을 날리는 것을 볼 수 있다.
jackon 라이브러리를 통하여 JSON 결과를 파싱하여 Station 객체를 만들고, 이를 JPA save메서드를 통하여 DB에 저장하고 있다.
이 때 소요되던 시간은 약 1분30초~ 2분 정도였다.
@Service
public class StationService {
public ResponseEntity<?> initStation() throws IOException {
String BASE_URL = "http://openapi.seoul.go.kr:8088";
String API_KEY = key;
String MASTER_PATH = "/json/bikeStationMaster";
String[] TARGET_LIST = {"/1/1000", "/1001/2000", "/2001/3000", "/3001/4000"};
for (String target : TARGET_LIST) {
URL MASTER_URL = new URL(BASE_URL + API_KEY + MASTER_PATH + target);
try {
BufferedReader br = new BufferedReader(new InputStreamReader(MASTER_URL.openStream(), StandardCharsets.UTF_8));
String result = br.readLine();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.readTree(result).get("bikeStationMaster").get("row").forEach(station -> {
try {
Station newStation = Station.builder()
.stationId(station.get("LENDPLACE_ID").asText())
.stationAddress1(station.get("STATN_ADDR1").asText())
.stationAddress2(station.get("STATN_ADDR2").asText())
.stationLatitude(station.get("STATN_LAT").asDouble())
.stationLongitude(station.get("STATN_LNT").asDouble())
.build();
stationRepository.save(newStation);
} catch (Exception e) {
e.printStackTrace();
}
});
// latitude, longitude "" 거르기
} catch (Exception e) {
return response.fail(String.valueOf(e), HttpStatus.BAD_REQUEST);
}
}
return response.success();
}
}
일단 이렇게나 오래 걸리는 것 보다는 성공했다는 것에 의의가 있었지만 해당 작업이 이렇게나 오랜 시간을 요구하는 것이 잘 이해가 되지 않아서 개선해보자 하게 되었다.
비동기, 병렬 작업을 할 수 있게 끔 개선해보자.
@Service
public class StationService {
private final Response response;
private final StationRepository stationRepository;
private final ExecutorService executorService;
public StationService(Response response, StationRepository stationRepository) {
this.response = response;
this.stationRepository = stationRepository;
this.executorService = Executors.newFixedThreadPool(4);
}
public ResponseEntity<?> initStation() {
String BASE_URL = "http://openapi.seoul.go.kr:8088";
String API_KEY = key;
String MASTER_PATH = "/json/tbCycleStationInfo";
String[] TARGET_LIST = {"/1/1000", "/1001/2000", "/2001/3000", "/3001/4000"};
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String target : TARGET_LIST) {
URL MASTER_URL;
try {
MASTER_URL = new URL(BASE_URL + API_KEY + MASTER_PATH + target);
} catch (MalformedURLException e) {
return response.fail("Invalid URL", HttpStatus.BAD_REQUEST);
}
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try (BufferedReader br = new BufferedReader(
new InputStreamReader(MASTER_URL.openStream(), StandardCharsets.UTF_8))) {
String result = br.readLine();
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result).get("stationInfo").get("row");
for (JsonNode station : jsonNode) {
if (station.get("STA_LAT").asText().startsWith("0") || station.get("STA_LONG").asText()
.startsWith("0")) {
continue;
}
Station newStation = Station.builder()
.stationNumber(station.get("RENT_NO").asText())
.stationName(station.get("RENT_NM").asText())
.stationNumberName(station.get("RENT_ID_NM").asText())
.stationAddress1(station.get("STA_ADD1").asText())
.stationAddress2(station.get("STA_ADD2").asText())
.stationLatitude(station.get("STA_LAT").asDouble())
.stationLongitude(station.get("STA_LONG").asDouble())
.build();
stationRepository.save(newStation);
}
} catch (Exception e) {
e.printStackTrace();
}
}, executorService);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return response.success();
}
}
기존의 코드에서 달라진 점은, ExecutorService
와 CompletableFuture
를 이용한 점이다.
CompletableFuture를 이용하게 되면, 비동기 작업을 병렬로 실행하고, 그 결과를 조합하거나 가공할 수 있게 된다.
사실 CompletableFuture에 대한 이해도가 부족한 상태이지만 무작정 코드를 작성해본 결과이다.
생성자에서 api 콜의 개수인 4개의 쓰레드를 생성하여 기존의 작업을 async로 동작하게 만들었다.
이 시점에서 동작 시간이 약 37초 정도로 즉, 1/3 ~ 1/4정도 시간을 단축시킬 수 있었다.
하지만 여기서 더 성능을 개선할 수 있지 않을까 하는 고민을 해보게 되었다.
우리의 코드에서는 raw Node를 탐색하면서 Station 객체를 생성하고 이를 매번 save를 하는 구조이다. 즉 Station의 객체 수 만큼 트랜잭션을 생성하고 있던 것이다.
station 객체를 담는 자료구조를 하나 생성하고, 이를 bulk로 save하게 된다면 트랜잭션 수를 1/1000배 또는 1/3000배 할 수 있지 않을까하는 고민을 하게 되었다.
JPA의 saveAll 메서드를 찾을 수 있었고 이를 코드를 통해 보게되면,
@Service
public class StationService {
@Value("${SEOUL_API_KEY}")
private String SEOUL_API_KEY;
private final Response response;
private final StationRepository stationRepository;
private final ExecutorService executorService;
public StationService(Response response, StationRepository stationRepository) {
this.response = response;
this.stationRepository = stationRepository;
this.executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
public ResponseEntity<?> initStation() {
String BASE_URL = "http://openapi.seoul.go.kr:8088";
String API_KEY = SEOUL_API_KEY;
String MASTER_PATH = "/json/tbCycleStationInfo";
String[] TARGET_LIST = {"/1/1000", "/1001/2000", "/2001/3000", "/3001/4000"};
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String target : TARGET_LIST) {
URL MASTER_URL;
try {
MASTER_URL = new URL(BASE_URL + API_KEY + MASTER_PATH + target);
} catch (MalformedURLException e) {
return response.fail("Invalid URL", HttpStatus.BAD_REQUEST);
}
futures.add(CompletableFuture.runAsync(() -> {
try (BufferedReader br = new BufferedReader(
new InputStreamReader(MASTER_URL.openStream(), StandardCharsets.UTF_8))) {
String result = br.readLine();
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result).get("stationInfo").get("row");
List<Station> stationList = new ArrayList<>();
for (JsonNode station : jsonNode) {
if (station.get("STA_LAT").asText().startsWith("0") || station.get("STA_LONG").asText()
.startsWith("0")) {
continue;
}
Station newStation = Station.builder()
.stationNumber(station.get("RENT_NO").asText())
.stationName(station.get("RENT_NM").asText())
.stationNumberName(station.get("RENT_ID_NM").asText())
.stationAddress1(station.get("STA_ADD1").asText())
.stationAddress2(station.get("STA_ADD2").asText())
.stationLatitude(station.get("STA_LAT").asDouble())
.stationLongitude(station.get("STA_LONG").asDouble())
.build();
stationList.add(newStation);
}
stationRepository.saveAll(stationList);
} catch (Exception e) {
e.printStackTrace();
}
}, executorService));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return response.success();
}
}
위와 같은데, 말 그대로 Station 객체를 생성 후 이를 List에 담은 후, 각 aync 작업이 끝날 때마다 saveAll 메서드를 통하여 1000개씩 벌크로 저장하게 하였다.
또한, 로컬과는 다르게 서버는 2core로 구성되어 있어서 기존 로직에서는 4개의 쓰레드를 만들어 동작하던 것을 서버의 상황에 맞게 dynamic하게 동작할 수 있도록 런타임 시점에서 availableProcessors
개수에 맞게 동작하게 바꾸었다.
단순하게 1000개가 아니라 모든 Station 객체를 담는 List를 만들고, 모든 비동기 작업이 끝난후 3천개에 대해 saveAll을 통하여 저장하면 트랜잭션 단 1번에 해결할 수 있지 않을까? 하는 생각에 코드로 작성해보았지만 1000개씩 3~4번에 나누어 저장하는 것보다 그 속도는 빠르지 않았다.
추측하건데
위와 같은 원인이 사료된다. 아마 추가적으로 비교분석과 JPA의 내부 동작을 찾아보아야 할 것 같다.
2가 맞았다!
또한 서버의 상황에 맞게 availableProcessors를 사용하였지만, 이게 정확한 분석을 통해 사용되지 않고 단순히 그렇지 않을까?
하는 추측으로 구성된 코드이기에 조금 더 근거가 필요하다.
결과적으로 아직까지 문제가 많은 코드이지만 약 3초내로 응답을 받을 수 있다.
즉 맨 처음 1분 30초~ 2분이 걸리던 로직을 엄청나게 단축시킬 수 있었다.
CompletableFuture
를 제대로 사용하고 있지 않은 가도 생각을 해봐야 한다.현재의 코드는 CompletableFuture 작업이 끝나고 단순히 이를 join하는 것으로 끝난다. 즉, 비동기 작업에 대한 결과물을 조작하지 않고 단순하게 병렬 작업용도로만 사용중인 것 같다.
FeignClient
나 WebClient
를 이용해보는 것을 생각중이다.restTemplate
의 경우 Spring 5이후 WebClient
를 권장한다고 하여, 제외하였다.max_allowed_packet
과 배치 작업의 상관 관계에 대해서 공부가 필요하다. bulk 작업과의 연관성을 찾아야한다.