Spring WebClient로 비동기 호출을 해보자!

sinryuji·2025년 1월 17일
post-thumbnail

앞선 글에서 WebClient를 이용한 기본적인 API 호출을 알아보았으니 이제 비동기를 처리하는 부분에 대해서 알아보자!

💡 비동기란?

https://velog.io/@sinryuji/CS-%EB%8F%99%EA%B8%B0%EB%B9%84%EB%8F%99%EA%B8%B0-%EB%B8%94%EB%A1%9C%ED%82%B9%EB%85%BC%EB%B8%94%EB%A1%9C%ED%82%B9

왜 비동기인가?

단건으로 외부 API를 호출한다면 동기로 호출을 하여도 괜찮다. 그럴 경우에는 block()을 통해 비교전 간편하게 호출을 처리할 수 있다. 하지만 나의 경우에는 일단 API를 통해 글 리스트를 가져온 다음 그 리스트의 각 글에 대해 자세한 정보를 가져오기 위해 글마다 한 번씩 API를 호출해야 했다.

만약에 글의 개수가 100개라면 총 101번의 API의 호출이 필요한 셈!! 게다가 디테일 API에서 글의 이미지는 제공을 해주지 않아 글의 이미지는 스크래핑을 통해 긁어와야했다. 그럼 총 101번의 API 호출과 100번의 스크래핑이 수행이 되는 것이다. 모든 각 글마다 DB Insert 쿼리는 물론 AWS S3 Bucket으로의 이미지 업로드는 덤이었다.

테스트를 통해 이 모든 작업을 완전히 동기로 작업을 하면 생각보다 시간이 많이 소요됨을 확인하였다. 물론 스케쥴링이 하루에 사용자가 거의 없는 새벽 3시에 한 번만 돌아가기에 큰 부담은 없겠지만 이왕 WebClient를 사용하는김에 완전 비동기로 구현을 하는게 학습적인 측면에서도 좋을 것 같아 완전 비동기로 구현을 하기로 했다.

동기 vs 비동기 소요시간 비교

테스트 당시 S3 Bucket으로의 이미지 업로드는 아직 구현을 하지 않았어서 그 부분은 제외된 테스트이다. 모든 테스트에는 대략 90~100회의 API 호출이 이루어졌다.

동기 테스트

코드

    @Operation(summary = "1365 API로 플로깅 리스트 정보 가져오기")  
    @GetMapping("/1365Api/list")  
    fun fetchPloggingEventList() {  
		val startTime = System.currentTimeMillis()  
		  
		this.ploggingEventService.fetchPloggingEventList("플로깅")  
		this.ploggingEventService.fetchPloggingEventList("줍깅")  
		  
		val stopTime = System.currentTimeMillis()  
		println("running time: ${stopTime - startTime}")
    }

	@Transactional
    override fun fetchPloggingEventList(keyword: String) {
        val oneYearAgoStart = this.getOneYearAgoStart()
        val oneYearLaterEnd = this.getOneYearLaterEnd()

        val response =
            this.webClient.get()
                .uri { uriBuilder ->
                    uriBuilder
                        .scheme("http")
                        .host(API_HOST)
                        .path(LIST_API_PATH)
                        .queryParam("serviceKey", apiKey)
                        .queryParam("progrmBgnde", oneYearAgoStart)
                        .queryParam("progrmEndde", oneYearLaterEnd)
                        .queryParam("adultPosblAt", "Y")
                        .queryParam("yngbgsPosblAt", "Y")
                        .queryParam("numOfRows", "100")
                        .queryParam("pageNo", "1")
                        .queryParam("keyword", keyword)
                        .queryParam("schCateGu", "all")
                        .queryParam("actBeginTm", "00")
                        .queryParam("actEndTm", "24")
                        .queryParam("noticeBgnde", oneYearAgoStart)
                        .queryParam("noticeEndde", oneYearLaterEnd)
                        .build()
                }
                .retrieve()
                .bodyToMono(VolunteeringListApiResponse::class.java)
                .onErrorResume {
                    throw GlobalException(GlobalErrorCode.PLOGGING_EVENT_FETCH_ERROR)
                }
                .block()

        if (response == null) {
            throw GlobalException(GlobalErrorCode.PLOGGING_EVENT_FETCH_ERROR)
        }

        val totalCount = response.body!!.totalCount!!
        if (totalCount > 0) {
            println(totalCount)
            val items = response.body.items!!.item!!
            items.forEach { println(it) }
            this.saveFetchedPloggingEventList(items)
        }
	}


	@Transactional  
	override fun saveFetchedPloggingEventList(itemList: List<VolunteeringListApiResponseItem>) {  
	    val savedNumberList = this.ploggingEventRepository.findAllProgramNumber()  
	    val newItemList =  
	        itemList.filterNot {  
	            it.programRegistrationNumber in savedNumberList  
	        }  
	  
	    newItemList.forEach { item ->  
	        val detailItem = this.fetchPloggingEvent(item.programRegistrationNumber!!)  
	        if (detailItem != null && detailItem.body!!.totalCount == 1) {  
	            this.saveFetchedPloggingEvent(detailItem.body.items!!.item!![0], item.url!!)  
	        }
	    }
	}

    override fun fetchPloggingEvent(programNumber: String): VolunteeringDetailApiResponse? {
        return this.webClient.get()
            .uri { uriBuilder ->
                uriBuilder
                    .scheme("http")
                    .host(API_HOST)
                    .path(DETAIL_API_PATH)
                    .queryParam("serviceKey", apiKey)
                    .queryParam("progrmRegistNo", programNumber)
                    .build()
            }
            .retrieve()
            .bodyToMono(VolunteeringDetailApiResponse::class.java)
            .onErrorResume {
                throw GlobalException(GlobalErrorCode.PLOGGING_EVENT_FETCH_ERROR)
            }
            .block()
    }


    @Transactional
    override fun saveFetchedPloggingEvent(
        item: VolunteeringDetailApiResponseItem,
        url: String,
    ): PloggingEvent {
        val ploggingEvent = PloggingEventConverter.toEntity(item, url)
        return this.ploggingEventRepository.save(ploggingEvent)
    }

소요 시간

  • 1차: 13243ms
  • 2차: 7735ms
  • 3차: 7547ms
  • 4차: 7367ms
  • 5차: 7599ms
  • 평균: 8698.2ms

API 서버에서 캐싱을 수행하는지 모든 테스트에서 1차 응답 시간이 가장 길었고 그 후로는 응답 시간이 단축되었다. 1차의 경우 13초, 평균 8.6초나 소요되었다.

코드를 보면 우선 리스트 API에서 가져온 결과를 순회하며 글들의 페이지 넘버를 통해 각각 상세 정보를 가져오는 API를 호출한다. 그 후 DB에 Insert를 하는 작업까지 마친 후에 다음 글로 넘어가 다시 상세 정보 API를 호출하는 식이다.

완전 동기로 작업하기 때문에 많은 시간이 소요되었다.

비동기 테스트

코드

    @Transactional  
    override fun saveFetchedPloggingEventList(itemList: List<VolunteeringListApiResponseItem>) {  
        val savedNumberList = this.ploggingEventRepository.findAllProgramNumber()  
        val newItemList =  
            itemList.filterNot {  
                it.programRegistrationNumber in savedNumberList  
            }  
  
        newItemList.forEach { item ->  
            this.fetchPloggingEvent(item.programRegistrationNumber!!).subscribe { res ->  
                if (res.body!!.totalCount == 1) {  
                    this.saveFetchedPloggingEvent(res.body.items!!.item!![0], item.url!!)  
                }  
            }  
        }    
    }

	override fun fetchPloggingEvent(programNumber: String): Mono<VolunteeringDetailApiResponse> {  
	    return this.webClient.get()  
	        .uri { uriBuilder ->  
	            uriBuilder  
	                .scheme("http")  
	                .host(API_HOST)  
	                .path(DETAIL_API_PATH)  
	                .queryParam("serviceKey", apiKey)  
	                .queryParam("progrmRegistNo", programNumber)  
	                .build()  
	        }  
	        .retrieve()  
	        .bodyToMono(VolunteeringDetailApiResponse::class.java)  
	        .onErrorResume {  
	            throw GlobalException(GlobalErrorCode.PLOGGING_EVENT_FETCH_ERROR)  
	        }  
	}

소요 시간

  • 1차: 1265ms
  • 2차: 723ms
  • 3차: 725ms
  • 4차: 721 ms
  • 5차: 768 ms
  • 평균: 1026.2ms

1차의 경우 1.2초, 평균 1초로 굉장히 많은 시간이 단축되었다!

코드를 보면 다른 부분은 모두 똑같지만 상세 정보를 가져오는 API를 비동기로 수정하였다. subscribe()를 통해 호출이 완료되면 DB에 Insert 하기로 했다. 이렇게 되면 앞선 상세 정보 API 호출의 결과를 기다리지 않고 바로 다음 글의 상세 정보 API를 호출하게 되므로 시간을 크게 단축 할 수 있는 것이다!

대략 100건의 API 호출의 경우에도 8배나 소요 시간의 차이가 발생했다. 그런데 만약 데이터가 1000건이라면? 10000건이라면? 데이터가 많아질수록 비동기 처리는 선택이 아닌 필수가 된다.

Reactor 스타일 비동기 처리

    @Transactional
    override fun fetchPloggingEventList(keyword: String): Mono<Void> {
        val oneYearAgoStart = this.getOneYearAgoStart()
        val oneYearLaterEnd = this.getOneYearLaterEnd()

        return this.webClient.get()
            .uri { uriBuilder ->
                uriBuilder
                    .scheme("http")
                    .host(API_HOST)
                    .path(LIST_API_PATH)
                    .queryParam("serviceKey", apiKey)
                    .queryParam("progrmBgnde", oneYearAgoStart)
                    .queryParam("progrmEndde", oneYearLaterEnd)
                    .queryParam("adultPosblAt", "Y")
                    .queryParam("yngbgsPosblAt", "Y")
                    .queryParam("numOfRows", "1000")
                    .queryParam("pageNo", "1")
                    .queryParam("keyword", keyword)
                    .queryParam("schCateGu", "all")
                    .queryParam("actBeginTm", "00")
                    .queryParam("actEndTm", "24")
                    .queryParam("noticeBgnde", oneYearAgoStart)
                    .queryParam("noticeEndde", oneYearLaterEnd)
                    .build()
            }
            .retrieve()
            .bodyToMono(VolunteeringListApiResponse::class.java)
            .flatMap { response ->
                if ((response.body?.totalCount ?: 0) > 0) {
                    val items = response.body!!.items?.item ?: emptyList()

                    // 새 데이터를 필터링하고 저장
                    saveFetchedPloggingEventList(items)
                } else {
                    Mono.empty<Void>()
                }
            }
            .onErrorResume { error ->
                throw GlobalException(GlobalErrorCode.PLOGGING_EVENT_FETCH_ERROR)
            }
    }

    override fun fetchPloggingEvent(programNumber: String): Mono<VolunteeringDetailApiResponse> {
        return this.webClient.get()
            .uri { uriBuilder ->
                uriBuilder
                    .scheme("http")
                    .host(API_HOST)
                    .path(DETAIL_API_PATH)
                    .queryParam("serviceKey", apiKey)
                    .queryParam("progrmRegistNo", programNumber)
                    .build()
            }
            .retrieve()
            .bodyToMono(VolunteeringDetailApiResponse::class.java)
            .onErrorResume {
                throw GlobalException(GlobalErrorCode.PLOGGING_EVENT_FETCH_ERROR)
            }
    }

    @Transactional
    override fun saveFetchedPloggingEventList(itemList: List<VolunteeringListApiResponseItem>): Mono<Void> {
        return Mono.fromCallable {
            // 블로킹 호출 처리
            this.ploggingEventRepository.findAllProgramNumber()
        }
            .subscribeOn(Schedulers.boundedElastic()) // 블로킹 작업 실행을 위한 스레드 풀 설정
            .map { savedNumberList ->
                itemList.filterNot { it.programRegistrationNumber in savedNumberList }
            }
            .flatMapMany { newItemList -> Flux.fromIterable(newItemList) }
            .flatMap { item ->
                this.fetchPloggingEvent(item.programRegistrationNumber!!)
                    .flatMap { res ->
                        if (res.body!!.totalCount == 1) {
                            this.saveFetchedPloggingEvent(res.body.items!!.item!![0], item.url!!)
                        } else {
                            Mono.empty()
                        }
                    }
            }
            .then()
    }

    @Transactional
    override fun saveFetchedPloggingEvent(
        item: VolunteeringDetailApiResponseItem,
        url: String,
    ): Mono<PloggingEvent> {
        return Mono.fromCallable {
            val imageUrls = this.ploggingEventScrapingService.scrapingPloggingEventImage(url)
            val ploggingEvent = PloggingEventConverter.toEntity(item, url)
            ploggingEvent.imageList =
                imageUrls.mapIndexed { index, s ->
                    ImageConverter.toEntityWithPloggingEvent(
                        s,
                        null,
                        ploggingEvent,
                        index,
                    )
                }
            this.ploggingEventRepository.save(ploggingEvent)
        }
            .subscribeOn(Schedulers.boundedElastic()) // 블로킹 작업 처리
    }

내가 최종적으로 작성한 코드는 위와 같다. 앞서 테스트한 코드를 좀 더 디벨롭하여 최대한 Reactor스럽게(?) 코드를 작성해보았다. 효율성 측면에선 앞선 비동기 테스트 코드와 별반 차이가 없다. 그래서 코드만 복잡해졌다고 볼 수 있지만 학습적인 측면에서 이렇게 짜게 되었다. 다시 한 번 설명하지만 절대 이렇게 짜지 않아도 된다. Reactor 학습이 목적이 아니라 단순 비동기 호출이 구현이라면 앞선 비동기 테스트 코드로도 충분하다. 하나씩 설명을 해보겠다.

  • .flatMap(): 일반적인 Map과 유사하게 결과를 순회하며 각 요소에 대해 어떠한 작업을 할 수 있는 Operator이다. 다건의 결과에 적합한 Flux와 사용을 할 때 더욱 유용한 녀석이지만 Mono와도 사용할 수 있고 에러 핸들링이 편하고 코드 또한 가독성 좋게 짤 수 있어 사용하게 되었다.

  • .onErrorResume(): 에러 발생시 해당 블록이 실행되게 된다. try-catch에서 catch와 같은 역할을 수행한다.

  • Mono.fromCallable { this.ploggingEventRepository.findAllProgramNumber() }: 이미 DB에 저장되어있는 글들은 필터링하기 위해 DB에 저장된 글들의 페이지 넘버 리스트를 동기로 가져온다. 리액티브 방식의 비동기 DB를 사용하고 있지 않기 때문에 DB 작업의 경우에는 동기로 이루어질 수 밖에 없다.

  • .subscribeOn(Schedulers.boundedElastic()): 위의 동기 작업을 별도의 스레드에서 처리하기 위해 스레드 풀을 제공하는 것이다. 비동기로 동작하는 Reactor의 Event Loop에 영향을 주지 않기 위해 블로킹 작업의 경우에는 별도의 스레드에서 처리하는 것이 안전하다.

  • .flatMapMany { newItemList -> Flux.fromIterable(newItemList) }.flagMap {}: 필터링된 리스트를 flagMap으로 순회하기 위해 Flux로 변환한다. 그 후 flagMap으로 각 요소를 순회하며 saveFetchedPloggingEvent()를 호출한다.

  • fun saveFetchedPloggingEvent(): 각 요소를 DB에 저장하는 함수이다. 역시에 DB를 처리하는 함수이므로 동기로 동작을 하며 그렇기 때문에 역시나 .subscribeOn(Schedulers.boundedElastic())로 별도의 스레드에서 처리를 하도록 했다.

후기

이번에 WebClient의 비동기 처리를 구현하면서 어렴풋이만 알고 있던 동기/비동기 & 블로킹/논블로킹의 개념에 대해서도 다시 한 번 확실하게 공부하게 되었고, 추가로 리액티브 프로그래밍, Reacotr에 대해서도 학습을 할 수 있었다. 다음에 관련된 개발을 할 상황이 온다면 도움이 될 것 같다!

profile
응애 개발자입니다.

0개의 댓글