[async] 외부 API 호출에 논블로킹 적용

maxxyoung·2025년 5월 29일

Async API

목록 보기
3/5

문제 상황

가격비교를 위해서는 외부 연동 API를 호출해야한다. 만약 N개의 업체가 있다면 하나의 product의 price를 얻기 위해서 외부 연동 API가 총 N번이 호출되어야한다. 만약 블로킹 동작한다면 유저는 외부 API 연동이 모두 끝날 때 까지 기다려야한다.

GitHub

관련 코드

해결 방법

현재 동작하는 FLOW

우선 나의 외부 API호출 가정을 이해해야한다.

1. 유저가 가격비교 요청 시 기본적으로 캐시에서 그 결과를 찾는다.
2. 캐시 미스가 났다면 가격 비교 요청 API를 호출한다. 이 때 리턴 값은 API 요청 실행 여부 상태값 정도만 받는다.
3. 가격비교 사이트에서 나의 가격 비교 결과 전달 API를 호출한다.

[Kafka producer server] -------------> [Kafka consume server]
1번 요청은 producer 서버에서 2번 가격비교는 consume 서버에서 동작한다.

가격 정보를 받기 위해 총 2번의 API가 실행된다. 비교 요청 API와 결과 전달 API 호출로 2번의 실행이 있다. 현재 이야기하고 있는 곳은 비교 요청 API이다.

요청의 경우 각 가격비교 사이트 별로 API 연동 방식이 다를 수 있다. 따라서 연동 자체를 interface로 추상화 하고 그에 맞는 구현체를 만들었다.
-> 최종적으로 호출하는 서비스에서 adapter와 factory 패턴을 가지고 그에 맞는 adapter 구현체를 찾아주었다.

또한 Kafaka consum 서버에서 효율적으로 가격비교 API 실행을 위해 병렬 실행 + 논블로킹으로 구현했다.
-> 코루틴을 활용하여 구현했다.

adapter + factory 패턴 적용

Adapter 인터페이스

먼저, 각 외부 가격비교 사이트와의 API 연동은 모두 구조가 다르기 때문에, 연동 동작을 하나의 인터페이스로 추상화했다.

interface IntegrationSiteAdapter {
    suspend fun compareProductPrice(priceComparisonRequest: PriceComparisonRequest)
}

구현체 예시 – ZmarketAdapter

구현체 중 하나인 Zmarket이다. WebClient를 사용해 외부 API를 호출하였다. WebClient를 사용한 이유는 아래서 더 자세히 설명하겠다.

@Component("ZMARKET")
class ZmarketAdapter: IntegrationSiteAdapter{
    override suspend fun compareProductPrice(priceComparisonRequest: PriceComparisonRequest) {
        val product = priceComparisonRequest.product
        val integrationSite = priceComparisonRequest.integrationSite
        try {
            WebClient.builder()
                .baseUrl(integrationSite.baseUrl)
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build()
                .post()
                .uri(integrationSite.apiEndPoint, product.id) //협력사용 productId 변환 필요
                .retrieve()
                .toBodilessEntity()
                .awaitSingle()

        } catch (e: WebClientResponseException) {
            throw ExternalPriceApiException(e)
        }
    }
}

Factory – 코드로 Adapter 선택

사이트마다 구현체가 다르기 때문에, IntegrationSiteCode(예: ZMARKET, COUPUNG 등)를 key로 삼고, Spring이 주입한 Map<String, IntegrationSiteAdapter>에서 구현체를 동적으로 찾는다.

@Component
class IntegrationSiteAdapterFactory(
    private val adapters: Map<String, IntegrationSiteAdapter>
) {
    fun getAdapter(code: IntegrationSiteCode): IntegrationSiteAdapter =
        adapters[code.name] ?: throw IllegalArgumentException("지원하지 않는 사이트입니다: $code")
}

실제 사용 예 – 서비스 계층

서비스 로직에서는 사이트 코드에 따라 적절한 Adapter를 주입받아 가격 비교 요청을 수행한다.

val adapter = integrationSiteAdapterFactory.getAdapter(site.code)
val comparisonRequest = PriceComparisonRequest(product = product, integrationSite = site)
adapter.compareProductPrice(comparisonRequest)

코루틴 의존성 추가

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
//WebClient를 사용하기 위해 추가
implementation("org.springframework.boot:spring-boot-starter-webflux")
//코루틴 테스트를 위한 의존성
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test")

코루틴 사용을 위해 다음과 같이 의존성을 추가해주었다.

코루틴의 scope 설정

개발 환경과 스택 선택 이유

내가 개발하고 있는 환경은 SpringMVC + kotlin이다.

"논블로킹을 말하면서 왜 WebFlux를 안 쓰냐?"는 질문이 있을 수 있지만,
DB는 JPA로 블로킹 I/O, 외부 API만 논블로킹으로 처리할 예정이기 때문에
굳이 WebFlux로 전체를 구성할 필요는 없다고 판단했다.

코루틴 스코프 설정 전략

코루틴 scope의 경우 스프링에서 관리하는 scope을 사용했다. controller에서 부터 suspend fun을 사용하여 스프링이 요청마다 코루틴 컨텍스트와 job을 생성한다.
참고: Spring 공식문서의 코루틴 지원 설명

즉, Spring이 생성한 부모 Job 안에서 코루틴이 실행되고, 요청이 끝나면 그 자식 Job들도 자동으로 cancel된다.

예제 코드 – supervisorScope + launch

suspend fun startComparison(productId: Long) = supervisorScope {
	launch(Dispatchers.IO) {
    	...
    }
}

코루틴의 가장 큰 특징 중 하나는 구조적 동시성(Structured Concurrency)이다.
→ 자식 코루틴에서 예외가 발생하면, 부모 + 다른 형제 코루틴도 모두 중단된다.

하지만 이 프로젝트에서는, 하나의 외부 API가 실패하더라도, 다른 API 호출은 계속 진행되어야하기 때문에 supervisorScope를 사용했다.

supervisorScope는 자식 코루틴 간 예외 전파를 막아, 하나의 실패가 다른 자식들에게 영향을 주지 않게 한다. 부모는 자식의 실패를 감지하지만, 자식들의 실행에는 개입하지 않는다.

코루틴은 내부적으로 별도의 스레드 풀(Dispatcher)을 사용한다. 내가 사용하는 Dispatchers.IO는 입출력 작업에 최적화된 스레드 풀로, 많은 코루틴을 처리할 수 있도록 설계돼 있다. 스프링 MVC에서는 컨트롤러에 suspend 함수가 붙으면 스프링에서 corutineContext와 job을 만들고 이 job이 부모가 되며, 코루틴 내부 로직(async, launch, withContext(Dispatchers.IO))은 명시된 Dispatcher가 있다면 별도 스레드 풀에서 실행된다. 이때 생성된 코루틴들은 모두 부모 Job을 상속받아 구조적 동시성을 유지한다.

코루틴 적용

코루틴은 다음과 같이 적용하였다.

suspend fun startComparison(productId: Long) = supervisorScope {
        val product = productFinder.getProductById(productId)
        val integrationSites = integrationSiteRepository.findAll()

        integrationSites.forEach { site ->
            alarmLaunch(site, Dispatchers.IO, log) {
                val adapter = integrationSiteAdapterFactory.getAdapter(site.code)
                val comparisonRequest = PriceComparisonRequest(product = product, integrationSite = site)
                adapter.compareProductPrice(comparisonRequest)

            }
        }
    }
fun CoroutineScope.alarmLaunch(
    site: IntegrationSite,
    context: CoroutineContext = Dispatchers.Default,
    log: org.slf4j.Logger = LoggerFactory.getLogger("CoroutineExtensions"),
    block: suspend CoroutineScope.() -> Unit
): Job {
    return launch(context + CoroutineExceptionHandler { _, e ->
        log.error("가격 요청 실패 [site=${site.code}] : ${e.message}", e)
        //TODO: 모니터링 시스템 연동
    }) {
        block()
    }
}

supervisorScope는 내부적으로 모든 launch가 끝날 때까지 join()과 유사하게 대기한다. 따라서 launch의 작업이 모두 끝나야 startCompare함수가 끝이 난다.

코루틴 빌더 함수 중에 launch를 사용하였다. 이유는 외부 연동 API를 호출해서 받는 값으로 처리하는 형태가 없고 그냥 요청만 하면 되기 때문에 launch를 사용하였다.

예외처리는 launch에서 사용할 수 있는 CoroutineExceptionHandler를 사용해 처리했다. 확장함수를 사용해 깔끔하게 처리하도록 해보았다.

WebClient를 사용한 이유

외부 API를 호출하는데 사용할 수 있는 라이브러리 중 많이 사용하는 것은 RestTemplate과 WebClient가 있다. 흔이 RestTemplate은 블로킹 방식으로 동작하고 WebClient는 논블로킹 방식으로 동작한다고 알고 있다. 이번 프로젝트를 진행하면서 왜 논블로킹에선는 WebClient를 사용해야하는지 알 수 있었다.

코루틴의 경우 논블로킹 방식으로 동작하는데 만약 내가 호출하는 suspend 함수가 블로킹 방식으로 동작한다면 내가 열심히 설계하고 만들어 놓은 함수가 내 의도대로 움직이지 않는다. 따라서 논블로킹 방식으로 동작하게 하려면 라이브러리 또한 논블로킹으로 구현된 라이브러리를 써야한다.

테스트 코드 작성

테스트코드는 총 2가지의 시나리오로 유닛 테스트로 구성했다.

  • 외부연동 API에서 예외가 발생했을 때 로그를 잘 짝는지
  • 외부연동 API에서 N번이 호출 됐을 때 그 중 하나라도 실패했을 경우 나머지 API에 영향이 없는지

테스트 중 하나를 가져와봤다. 코드의 작성은 이러하다.

@Test
    fun `여러 개의 사이트들에서 가격을 가져오는 도중 하나의 사이트에서 예외가 발생해도 나머지 사이트는 정상작동한다`() = runTest {
        // given
        val fakeProduct = ProductFixtures.createProduct()
        val fakeSites = IntegrationSiteFixtures.createAllFakeSites()

        stubRepositories(fakeProduct, fakeSites)

        val adapterMap = AdapterStubs.withFirstAdapterFailing(fakeSites, fakeProduct)
        stubAdapterFactory(adapterMap)

        val sut = createService()

        // when
        sut.startComparison(1L)
        advanceUntilIdle()

        // then
        verifyEachAdapterWasCalled(adapterMap)
    }

여기서 중요하게 볼 표현은 advanceUntilIdle()이다 코루틴을 testScope에서 진행하기 때문에 테스트 로직이 테스트하려는 로직보다 먼저 끝나버릴 수 있다. 이것을 미연에 방지하기위해 다음 함수를 꼭 호출해주어야한다.

coEvery { adapter.compareProductPrice(request) } throws ExternalPriceApiException(RuntimeException("외부 api 호출 실패"))

coVerify(exactly = 1) { adapter.compareProductPrice(match {it.integrationSite.code == code } ) }

mock객체를 stub을 해줄 때 suspend함수는 coEvery, coVerify라는 코루틴용 함수를 써야한다.

profile
오직 나만을 위한 글. 틀린 부분 말씀해 주시면 감사드립니다.

0개의 댓글