우선 RestTemplateWebClient 에 대해 알기 전에 동기와 비동기, 그리고 블럭과 넌블럭에 대해 간략하게 알아보자. 블로그 글에 이해하기 쉽게 설명한 글이 있었다. 블로그 글 출처

동기와 비동기, 그리고 블럭과 넌블럭

Blocking & Synchronous

나 : 대표님, 개발자 좀 더 뽑아주세요..
대표님 : 오케이, 잠깐만 거기 계세요!
나 : …?!!
대표님 : (채용 공고 등록.. 지원자 연락.. 면접 진행.. 연봉 협상..)
나 : (과정 지켜봄.. 궁금함.. 어차피 내 일 하러는 못 가고 계속 서 있음)

Blocking & Asynchronous

나 : 대표님, 개발자 좀 더 뽑아주세요..
대표님 : 오케이, 잠깐만 거기 계세요!
나 : …?!!
대표님 : (채용 공고 등록.. 지원자 연락.. 면접 진행.. 연봉 협상..)
나 : (안 궁금함.. 지나가는 말로 여쭈었는데 붙잡혀버림.. 딴 생각.. 못 가고 계속 서 있음)

Non-blocking & Synchronous

나 : 대표님, 개발자 좀 더 뽑아주세요..
대표님 : 알겠습니다. 가서 볼 일 보세요.
나 : 넵!
대표님 : (채용 공고 등록.. 지원자 연락.. 면접 진행.. 연봉 협상..)
나 : 채용하셨나요?
대표님 : 아직요.
나 : 채용하셨나요?
대표님 : 아직요.
나 : 채용하셨나요?
대표님 : 아직요~!!!!!!

Non-blocking & Asynchronous

나 : 대표님, 개발자 좀 더 뽑아주세요..
대표님 : 알겠습니다. 가서 볼 일 보세요.
나 : 넵!
대표님 : (채용 공고 등록.. 지원자 연락.. 면접 진행.. 연봉 협상..)
나 : (열일중..)
대표님 : 한 분 모시기로 했습니다~!
나 : 😍

그럼 이제 통신 방법들에 대해 알았으니 어떤 HTTP 클라이언트를 사용해 REST API 통신을 하면 좋을지 Spring에서 많이 사용하는 RestTemplateWebClient 를 비교해보자.

RestTemplate

RestTemplateSpring 프레임워크에서 오랫동안 사용되어온 HTTP 클라이언트이다.

  • 특징

    • 동기적인 방식으로 동작하며, 기본적으로 블로킹 호출을 수행한다. 즉, blocking & Synchronous 방식을 사용한다는 것이다.

    • blocking & Synchronous 방식을 사용하므로, 요청과 응답이 완료될 때까지 대기한다.

    • 쓰레드 당 하나의 요청을 처리하는 방식이기 때문에, 쓰레드를 많이 사용하게 되면 확장성이 떨어질 수 있다.

RestTemplate은 기본적으로 Spring Boot에 포함되어 있지만, Spring 5부터는 deprecated 되었다. Spring 5에서는 WebClient 를 권장한다.

WebClient

WebClientSpring 5에서 도입된 비동기적이고 논블로킹으로 동작하는 HTTP 클라이언트이다.

  • 특징

    • 기본적으로 Non-Blocking I/O를 사용하여 요청과 응답을 비동기적으로 처리한다. 즉, Non-blocking & Asynchronous 방식으로 사용한다는 것이다.

    • Reactor와 함께 사용되어 Flux 및 Mono와 같은 리액티브 타입(Mono, Flux)을 반환한다.

    • Reactive Streams 표준을 준수하고, 비동기 및 병렬 처리를 지원하기 때문에 확장성이 좋다.

    • WebClientSpring WebFlux 프레임워크와 함께 사용되어 더 높은 성능과 확장성을 제공한다.

WebClientSpring 5부터 권장되는 HTTP 클라이언트이며, 기존의 RestTemplate보다 더욱 모던하고 유연한 방식으로 REST API를 호출할 수 있습니다.

잘못된 RestTemplate 사용 사례

MSA 아키텍처의 프로젝트를 유지 보수하는 과정 속에서 RestTemplate 를 사용하는 코드를 보고 의문점이 생겼다.

  • OAuthService

    • OAuth 서버에 HTTP 통신을 보내 AccessToken 을 가져오는 로직과 KakaoHTTP 통신을 보내 프로필 정보를 가져오는 로직이며 이때 HTTP 클라이언트로 RestTemplate 를 사용하고 있었다.
@Slf4j
@Service
public class OAuthService {
    private RestTemplate rt = new RestTemplate();

    public void setRestTemplate(RestTemplate rt) {
        this.rt = rt;
    }
    
    ...
    
    public OauthToken getAccessToken(String code, String provider) {
        HttpEntity<MultiValueMap<String, String>> tokenRequest = getTokenRequest(code, provider);

        ResponseEntity<String> tokenResponse = rt.exchange(
                getProviderTokenUrl(provider),
                HttpMethod.POST,
                tokenRequest,
                String.class
        );

        return getOAuthToken(provider, tokenResponse);
    }
    
    ...
    
    private KakaoProfile findKakaoProfile(HttpHeaders headers) {
    	
        ...
        
        // Http 요청 (POST 방식) 후, response 변수에 응답을 받음
        ResponseEntity<String> kakaoProfileResponse = rt.exchange(
                "https://kapi.kakao.com/v2/user/me",
                HttpMethod.POST,
                kakaoProfileRequest,
                String.class
        );
        
        ...
    
}

private RestTemplate rt = new RestTemplate();RestTemplate 인스턴스를 생성하고 이를 rt.exchange를 통해 HTTP 요청을 보내고 있다.

위의 코드를 보면 여러가지 문제점이 있다.

  • 스레드 안정성 : RestTemplate 는 스레드 안전하지 않다. 이는 여러 스레드에서 동시에 RestTemplate를 사용하면 예기치 않은 동작이 발생할 수 있다는 것을 의미한다.

    OAuthServiceSingleton 으로 관리되므로 여러 스레드에서 동시에 이 인스턴스를 사용하게 된다. 따라서, RestTemplate 의 스레드 안전하지 않은 특성 때문에 예상치 못한 문제가 발생할 수 있다.

    • 스레드 안전하지 않은 문제를 해결하는 방법이 있다.

      • 메소드 내부에서 RestTemplate 생성 : 필요한 메소드 내부에서 RestTemplate 인스터스를 생성하여 사용하는 방식이다. 이 방식을 사용하면, 각 메소드 호출마다 새로운 RestTemplate를 생성하여 사용하므로 스레드 안전 문제를 해결할 수 있다.
    • 그러나 이러한 방법들은 RestTemplatedeprecated 되었다는 문제를 해결하지 못한다. 또한 메소드 내부마다 RestTemplate 생성하는 것은 메모리 사용량과 GC(Garbage Collector) 부하를 증가 시킬 수 있다.

  • Blocking I/O : RestTemplate블로킹 방식을 사용한다고 하였다. 이는 요청을 보낸 후 응답이 올 때까지 스레드가 다른 일을 못하고 대기 하는 방식이다. 이러한 방식은 높은 트래픽의 웹 애플리케이션에서는 비효율적일 수 있다.

    • 블로킹 방식을 회피하는 방법이 있다.

      • JAVACompletableFuture 를 사용하면 된다. 하지만 이는 RestTemplate블로킹 특성을 완전히 제거하는 것은 아니며, 단지 비동기 방식으로 요청을 처리하여 특정 스레드가 블로킹 되는 것을 최소화 한다.

Spring Cloud 에서는 RestTemplate 대신 WebClient를 사용하는 것을 권장하고 있다. WebClient비동기 및 논블로킹 방식으로 HTTP 요청을 처리하며, 동시성 문제를 피할 수 있다. 또한 WebClientMSA 아키텍처에 적합한 기능과 확장성을 제공한다.

WebClient 설정

  • build.gradle 의존성 추가

    // WebClient
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    
  • WebClientConfig 생성

@Configuration
public class WebClientConfig {
    private final ObjectMapper objectMapper;
    private final String httpScheme = "https://";

    public WebClientConfig() {
        this.objectMapper = new ObjectMapper()
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                .registerModule(new JavaTimeModule());
    }

    @Bean
    public WebClient webClient() {
        final int bufferSize = 16 * 1024 * 1024;  // 16MB
        final ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                .codecs(configurer -> {
                    configurer.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON));
                    configurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON));
                    configurer.defaultCodecs().maxInMemorySize(bufferSize);
                })
                .build();

        return WebClient.builder()
                .baseUrl(httpScheme)
                .exchangeStrategies(exchangeStrategies)
                .filter(logRequest())
                .filter(errorHandlingFilter())
                .build();
    }

    public static ExchangeFilterFunction logRequest() {
        return (clientRequest, next) -> {
            log.info("Request: {} {}", clientRequest.method(), clientRequest.url());
            clientRequest.headers().forEach((name, values) -> values.forEach(value -> log.info("{}={}", name, value)));
            return next.exchange(clientRequest);
        };
    }

    public static ExchangeFilterFunction errorHandlingFilter() {
        return (clientRequest, next) -> next.exchange(clientRequest)
                .flatMap(clientResponse -> {
                    if (clientResponse.statusCode().isError()) {
                        return clientResponse.bodyToMono(String.class)
                                .flatMap(errorBody -> {
                                    String errorMessage = String.format("Error response from server. Status code: %s, body: %s, headers: %s",
                                            clientResponse.statusCode(), errorBody, clientResponse.headers().asHttpHeaders());
                                    return Mono.error(new RuntimeException(errorMessage));
                                });
                    }
                    return Mono.just(clientResponse);
                })
                .onErrorResume(e -> {
                    String errorMessage = String.format("Error sending request to server. Error: %s", e.getMessage());
                    return Mono.error(new RuntimeException(errorMessage, e));
                });
    }
}

  • 커스텀 ObjectMapper 설정 JSON 변환에 적용
private final ObjectMapper objectMapper;

public WebClientConfig() {
	this.objectMapper = new ObjectMapper()
    	.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        .registerModule(new JavaTimeModule());
}
  • DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES : 역직렬화 과정에서 객체에 없는 프로퍼티가 있을 경우 무시하여 에러를 발생하지 않도록(false)
  • JavaTimeModule() : JAVA 8의 날짜와 시간 타입처리 지원

  • in-memory buffer 값 변경 및
    @Bean
    public WebClient webClient() {
        final int bufferSize = 16 * 1024 * 1024;  // 16MB
        final ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
        		.codecs(configurer -> {
                    configurer.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON));
                    configurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON));
                    configurer.defaultCodecs().maxInMemorySize(bufferSize);
                })
                .build();
                
		return WebClient.builder()
                .exchangeStrategies(exchangeStrategies)
                
                ...

Spring WebClient 에서는 애플리케이션 메모리 문제를 피하기 위해 in-memory buffer 값이 256KB로 기본설정 되어 있다.


  • Connection, Response, Read, Write timeout 설정
@Bean
public WebClient webClient() {
	final HttpClient httpClient = HttpClient.create(connectionProvider())
    		.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)  // Connection timeout
            .responseTimeout(Duration.ofMillis(5000))  // Response timeout
            .doOnConnected(conn ->
            		conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))  // Read timeout
                    	.addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))  // Write timeout
			);

	        return WebClient.builder()
            		.exchangeStrategies(exchangeStrategies)
                	.clientConnector(new ReactorClientHttpConnector(httpClient)) // Custom HttpClient
                    
                    ...
  • .clientConnector(new ReactorClientHttpConnector(httpClient))WebClient.builder() 뒤에 추가해 Custom Http 를 사용하도록 설정한다.

  • HTTP 연결 관리 설정 (최대 연결 수, 대기 시간, 유휴시간)
@Bean
public ConnectionProvider connectionProvider() {
	return ConnectionProvider.builder("connection-pool")
                .maxConnections(100)                    // 커넥션의 개수
                .pendingAcquireTimeout(Duration.ofMillis(0)) // 커넥션 풀에서 커넥션을 얻기 위해 기다리는 최대 시간
                .pendingAcquireMaxCount(-1)             // 커넥션 풀에서 커넥션을 가져오는 시도 횟수 (-1: no limit)
                .maxIdleTime(Duration.ofMillis(2000L))        // 커넥션 풀에서 idle 상태의 커넥션을 유지하는 시간
                .build();
    }
  • Connection Pool은 클라이언트와 서버간에 연결을 맺어 놓은 상태(3 Way HandShaking 완료 상태)를 여러개 유지하고 필요할 때 하나씩 사용하고 반납하는 형태
  • API 통신 과정시 timeout 되어 해당 서버와의 연결이 끊긴 경우 Connection prematurely closed BEFORE response 에러 방지를 위해 커넥션풀 설정

WebClient로 변경

@Slf4j
@Service
@RequiredArgsConstructor
public class OAuthService {
    private final UserRepository userRepository;
    private final WebClient webClient;
    private final ObjectMapper mapper;

우선 WebClientObjcetMapper를 싱글톤으로 관리하기 위해 lombok@RequiredArgsConstructor로 스프링 빈에 등록한다.

그 후 RestTemplate 로 보냈던 HTTP 요청을 WebClient를 사용하는 코드로 수정한다.


    public OauthToken getOAuthToken(String code, String providerString) {
        Provider provider = getProvider(providerString);
        String tokenResponse = webClient.post()
                .uri(uriBuilder -> uriBuilder.path(provider.getRequestTokenUrl()).build())
                .bodyValue(getTokenRequest(code, provider))
                .retrieve()
                .bodyToMono(String.class)
                .block();

        return parseToOAuthToken(provider, tokenResponse);
    }

코드를 리팩토링 하면서 메소드 이름은 변경되었지만, 여기서 볼 부분은 webClient.postHTTP 요청을 보내는 부분이다.

이 코드 수정을 통해 이제 Webclient 를 사용하여 스레드 안전성을 챙길 수 있다.(싱글톤 관리)

현재 코드를 보면 .bodyToMono(String.class) 를 사용하여 응답 본문을 Mono<String>으로 변환하고 있다. 마지막으로 .block() 메서드를 사용하여 응답을 블로킹 방식으로 대기하고 결과를 받아온다. 즉, 아직 Blocking I/O 이라는 비효율적인 방법은 아직 사용되고 있다는 것이다.

리액티브 프로그래밍

block() 메서드를 사용하면 리액티브 프로그래밍의 이점을 잃게 되므로, 가능하다면 이를 사용하지 않고 Mono를 그대로 반환하거나 다른 리액티브 연산자를 사용하는 것이 좋다.

public Mono<OauthToken> getOAuthToken(String code, String providerString) {
    Provider provider = getProvider(providerString);
    return webClient.post()
            .uri(uriBuilder -> uriBuilder.path(provider.getRequestTokenUrl()).build())
            .bodyValue(getTokenRequest(code, provider))
            .retrieve()
            .bodyToMono(String.class)
            .map(tokenResponse -> parseToOAuthToken(provider, tokenResponse));
}

이제 getOAuthToken() 메서드는 Mono<OauthToken>을 반환한다. 이 MonoOauthToken을 포함하거나 에러를 발생시키거나 아무것도 포함하지 않을 수 있다. 이 Mono를 구독하면 OauthToken을 얻을 수 있다.

Monomap 연산자를 사용하여 OauthToken으로 변환된다. map 연산자는 Mono가 포함하는 값을 변환하는데 사용된다. 여기서는 parseToOAuthToken(provider, tokenResponse)를 호출하여 String 응답을 OauthToken으로 변환한다.

이렇게 하면 이 메서드를 호출하는 측에서는 Mono 를 구독하여 비동기적으로 결과를 처리할 수 있다. 이 과정을 통해 이제 리액티브 프로그래밍의 이점을 살릴 수 있다.

subscribe

이제 저 Mono를 구독하여 비동기적으로 결과를 처리하는 코드를 보자.

getOAuthToken(code, providerString)
    .subscribe(oauthToken -> System.out.println("Received token: " + oauthToken));

getOAuthToken(code, providerString) 메소드를 사용하고 있는 다른 서비스의 코드이다.

이 서비스에서 getOAuthToken(code, providerString) 의 호출 값으로 Mono를 구독하여 비동기적으로 결과를 처리한다. 이 코드에서는 Mono 에 담긴 OauthToken의 반환 값이 비동기적으로 처리가 완료되어 도착을 하면 System.out.println("Received token: " + oauthToken) 을 실행한다.

즉, 만약에 저 System.out.println("Received token: " + oauthToken) 메소드 이전에 다른 코드들이 있으며 getOAuthToken(code, providerString) 의 값과 상관 없이 작동하는 코드라면 getOAuthToken(code, providerString) 의 결과 값을 기다리지 않고 비동기적으로 작동할 수 있다는 것이다.

WebClient 에러 핸들러

출처

ExchangeFilterFunction을 사용을 하여서 WebClientHTTP 요청을 보내거나 응답을 받을 때마다 ExchangeFilterFunction 을 호출하여서 요청이나 응답을 조작하거나 에러를 처리하는 등의 작업을 수행할 수 있다.

public static ExchangeFilterFunction errorHandlingFilter() {
    return (clientRequest, next) -> next.exchange(clientRequest).flatMap(clientResponse -> {
        if (clientResponse.statusCode().isError()) {
            return clientResponse.bodyToMono(String.class)
                    .flatMap(errorBody -> {
                        String errorMessage = String.format("Error response from server. Status code: %s, body: %s, headers: %s",
                                clientResponse.statusCode(), errorBody, clientResponse.headers().asHttpHeaders());
                        return Mono.error(new RuntimeException(errorMessage));
                    });
        }
        return Mono.just(clientResponse);
    });
}

위의 코드는 WebClientHTTP 응답을 처리하는 ExchangeFilterFunction을 정의하고 있다. 이 함수는 HTTP 응답의 상태 코드를 검사하여 에러 상태 코드인 경우 에러를 발생시킨다.

현재 RuntimeException을 사용하여 에러를 발생시키고 있는데, 이는 커스텀 에러 클래스를 정의하고 사용하면 에러 처리를 더욱 명확하게 할 수 있다.

이 함수는 WebClien에서 HTTP 요청을 보낼 때마다 호출되며, HTTP 응답을 받은 후에 호출된다.

하지만 이 필터는 응답의 상태코드가 에러를 나타내는 경우에만 에러를 처리하고 있다. 요청 자체가 실패하는 경우에 대한 처리도 해주자.

    
    public static ExchangeFilterFunction errorHandlingFilter() {
        return (clientRequest, next) -> next.exchange(clientRequest)
                .flatMap(clientResponse -> {
                    if (clientResponse.statusCode().isError()) {
                        return clientResponse.bodyToMono(String.class)
                                .flatMap(errorBody -> {
                                    String errorMessage = String.format("Error response from server. Status code: %s, body: %s, headers: %s",
                                            clientResponse.statusCode(), errorBody, clientResponse.headers().asHttpHeaders());
                                    return Mono.error(new RuntimeException(errorMessage));
                                });
                    }
                    return Mono.just(clientResponse);
                })
                .onErrorResume(e -> {
                    String errorMessage = String.format("Error sending request to server. Error: %s", e.getMessage());
                    return Mono.error(new RuntimeException(errorMessage, e));
                });
    }

이제 이 필터를 WebClientConfig 설정 파일에서 WebClient의 필터로 등록해주면 된다.

return WebClient.builder()
	.baseUrl(httpScheme)
    .exchangeStrategies(exchangeStrategies)
    .filter(logRequest())
    .filter(errorHandlingFilter())
    .build();

현재 필터에는 HTTP 요청에 대해 모두 log 를 찍고 있다. 이는 운영환경에서 성능에 영향을 미칠 수 있으니 제거도 염두해야 한다.

profile
가오리의 개발 이야기

1개의 댓글

comment-user-thumbnail
2024년 12월 18일

ㅋㅋㅋㅋㅋㅋㅋ예시가 재밌어서 잘 보고 갑니다!

답글 달기