이번 포스팅에서는 다음과 같은 시스템 구성을 예제로 사용하겠습니다:
Frontend → Order Service → Payment Service → Auth Service
↓ ↓
Inventory Service Notification Service
@Service
@RequiredArgsConstructor
@Slf4j
public class AuthTokenService {
private final WebClient authWebClient;
private final RedisTemplate<String, String> redisTemplate;
private static final String TOKEN_PREFIX = "auth:token:";
private static final Duration TOKEN_CACHE_DURATION = Duration.ofMinutes(50);
/**
* 인증 서버에서 새로운 토큰 발급
*/
public Mono<TokenResponse> issueToken(TokenRequest request) {
return authWebClient.post()
.uri("/auth/token")
.bodyValue(request)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response ->
response.bodyToMono(ErrorResponse.class)
.flatMap(error -> Mono.error(new AuthenticationException(error.getMessage())))
)
.bodyToMono(TokenResponse.class)
.doOnNext(this::cacheToken)
.doOnError(error -> log.error("토큰 발급 실패: {}", error.getMessage()));
}
/**
* 토큰 유효성 검증
*/
public Mono<Boolean> validateToken(String accessToken) {
// 1. 캐시에서 먼저 확인
String cachedToken = redisTemplate.opsForValue().get(TOKEN_PREFIX + accessToken);
if (cachedToken != null) {
return Mono.just(true);
}
// 2. 인증 서버에 검증 요청
return authWebClient.post()
.uri("/auth/validate")
.header("Authorization", "Bearer " + accessToken)
.retrieve()
.onStatus(HttpStatus::isError, response ->
Mono.just(false).cast(Void.class)
)
.bodyToMono(ValidationResponse.class)
.map(ValidationResponse::isValid)
.onErrorReturn(false);
}
/**
* Refresh Token으로 새 Access Token 발급
*/
public Mono<TokenResponse> refreshToken(String refreshToken) {
RefreshTokenRequest request = RefreshTokenRequest.builder()
.refreshToken(refreshToken)
.build();
return authWebClient.post()
.uri("/auth/refresh")
.bodyValue(request)
.retrieve()
.bodyToMono(TokenResponse.class)
.doOnNext(this::cacheToken);
}
private void cacheToken(TokenResponse tokenResponse) {
redisTemplate.opsForValue().set(
TOKEN_PREFIX + tokenResponse.getAccessToken(),
tokenResponse.getUserId(),
TOKEN_CACHE_DURATION
);
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class JwtAuthenticationFilter implements WebFilter {
private final AuthTokenService authTokenService;
private static final List<String> EXCLUDED_PATHS = Arrays.asList(
"/health", "/metrics", "/auth/login", "/auth/register"
);
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String path = exchange.getRequest().getPath().value();
// 인증이 필요 없는 경로는 통과
if (EXCLUDED_PATHS.stream().anyMatch(path::startsWith)) {
return chain.filter(exchange);
}
return extractToken(exchange.getRequest())
.flatMap(this::validateAndProcessToken)
.flatMap(userContext -> {
// 인증된 사용자 정보를 요청 컨텍스트에 추가
ServerWebExchange mutatedExchange = exchange.mutate()
.request(builder -> builder.header("X-User-Id", userContext.getUserId()))
.build();
return chain.filter(mutatedExchange);
})
.onErrorResume(this::handleAuthenticationError);
}
private Mono<String> extractToken(ServerHttpRequest request) {
String authHeader = request.getHeaders().getFirst("Authorization");
if (authHeader != null && authHeader.startsWith("Bearer ")) {
return Mono.just(authHeader.substring(7));
}
// 쿠키에서도 토큰 확인
MultiValueMap<String, HttpCookie> cookies = request.getCookies();
if (cookies.containsKey("accessToken")) {
return Mono.just(cookies.getFirst("accessToken").getValue());
}
return Mono.error(new AuthenticationException("토큰이 없습니다."));
}
private Mono<UserContext> validateAndProcessToken(String token) {
return authTokenService.validateToken(token)
.flatMap(isValid -> {
if (isValid) {
return Mono.just(UserContext.builder()
.userId(extractUserIdFromToken(token))
.token(token)
.build());
} else {
return Mono.error(new AuthenticationException("유효하지 않은 토큰입니다."));
}
});
}
private Mono<Void> handleAuthenticationError(Throwable error) {
log.warn("인증 실패: {}", error.getMessage());
// 401 Unauthorized 응답 반환
return Mono.error(new ResponseStatusException(HttpStatus.UNAUTHORIZED, "인증이 필요합니다."));
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderService {
private final WebClient paymentWebClient;
private final WebClient inventoryWebClient;
private final OrderRepository orderRepository;
/**
* 주문 처리 - 여러 서비스와 연동
*/
@Transactional
public Mono<OrderResponse> processOrder(OrderRequest request, String userId) {
return validateInventory(request)
.flatMap(inventory -> createOrder(request, userId))
.flatMap(order -> processPayment(order)
.flatMap(paymentResult -> updateOrderStatus(order, paymentResult))
)
.flatMap(this::sendNotification)
.doOnError(error -> log.error("주문 처리 실패: {}", error.getMessage()));
}
/**
* 재고 확인
*/
private Mono<InventoryResponse> validateInventory(OrderRequest request) {
InventoryCheckRequest inventoryRequest = InventoryCheckRequest.builder()
.productId(request.getProductId())
.quantity(request.getQuantity())
.build();
return inventoryWebClient.post()
.uri("/inventory/check")
.bodyValue(inventoryRequest)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response ->
response.bodyToMono(ErrorResponse.class)
.flatMap(error -> Mono.error(new InsufficientStockException(error.getMessage())))
)
.bodyToMono(InventoryResponse.class)
.timeout(Duration.ofSeconds(5));
}
/**
* 결제 처리
*/
private Mono<PaymentResponse> processPayment(Order order) {
PaymentRequest paymentRequest = PaymentRequest.builder()
.orderId(order.getId())
.amount(order.getTotalAmount())
.paymentMethod(order.getPaymentMethod())
.customerId(order.getCustomerId())
.build();
return paymentWebClient.post()
.uri("/payment/process")
.bodyValue(paymentRequest)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response ->
response.bodyToMono(ErrorResponse.class)
.flatMap(error -> Mono.error(new PaymentException(error.getMessage())))
)
.onStatus(HttpStatus::is5xxServerError, response ->
Mono.error(new PaymentSystemException("결제 서비스 오류"))
)
.bodyToMono(PaymentResponse.class)
.timeout(Duration.ofSeconds(10));
}
/**
* 주문 상태 업데이트
*/
private Mono<Order> updateOrderStatus(Order order, PaymentResponse paymentResult) {
if (paymentResult.isSuccess()) {
order.setStatus(OrderStatus.PAID);
order.setPaymentId(paymentResult.getPaymentId());
} else {
order.setStatus(OrderStatus.PAYMENT_FAILED);
}
return orderRepository.save(order);
}
/**
* 알림 발송
*/
private Mono<OrderResponse> sendNotification(Order order) {
NotificationRequest notificationRequest = NotificationRequest.builder()
.userId(order.getCustomerId())
.type(NotificationType.ORDER_CONFIRMATION)
.orderId(order.getId())
.build();
return Mono.fromRunnable(() -> {
// 비동기로 알림 발송 (실패해도 주문 처리에 영향 없음)
sendNotificationAsync(notificationRequest);
}).then(Mono.just(OrderResponse.from(order)));
}
private void sendNotificationAsync(NotificationRequest request) {
WebClient.create("http://notification-service")
.post()
.uri("/notifications/send")
.bodyValue(request)
.retrieve()
.bodyToMono(Void.class)
.subscribe(
result -> log.info("알림 발송 성공"),
error -> log.warn("알림 발송 실패: {}", error.getMessage())
);
}
}
@Service
@RequiredArgsConstructor
public class ProductService {
private final WebClient reviewWebClient;
private final WebClient recommendationWebClient;
private final WebClient inventoryWebClient;
/**
* 상품 상세 정보 조회 - 여러 서비스에서 병렬로 데이터 수집
*/
public Mono<ProductDetailResponse> getProductDetail(Long productId) {
// 병렬로 여러 서비스 호출
Mono<Product> productMono = getProduct(productId);
Mono<List<Review>> reviewsMono = getProductReviews(productId);
Mono<List<Product>> recommendationsMono = getRecommendations(productId);
Mono<InventoryInfo> inventoryMono = getInventoryInfo(productId);
// 모든 결과를 조합
return Mono.zip(productMono, reviewsMono, recommendationsMono, inventoryMono)
.map(tuple -> ProductDetailResponse.builder()
.product(tuple.getT1())
.reviews(tuple.getT2())
.recommendations(tuple.getT3())
.inventory(tuple.getT4())
.build())
.timeout(Duration.ofSeconds(3));
}
private Mono<List<Review>> getProductReviews(Long productId) {
return reviewWebClient.get()
.uri("/reviews/product/{productId}", productId)
.retrieve()
.bodyToFlux(Review.class)
.collectList()
.onErrorReturn(Collections.emptyList()); // 실패 시 빈 리스트 반환
}
private Mono<List<Product>> getRecommendations(Long productId) {
return recommendationWebClient.get()
.uri("/recommendations/{productId}", productId)
.retrieve()
.bodyToFlux(Product.class)
.collectList()
.onErrorReturn(Collections.emptyList());
}
private Mono<InventoryInfo> getInventoryInfo(Long productId) {
return inventoryWebClient.get()
.uri("/inventory/product/{productId}", productId)
.retrieve()
.bodyToMono(InventoryInfo.class)
.onErrorReturn(InventoryInfo.unavailable()); // 기본값 반환
}
}
@Configuration
@EnableConfigurationProperties(WebClientProperties.class)
public class WebClientConfig {
@Bean
@Primary
public WebClient.Builder webClientBuilder() {
return WebClient.builder()
.codecs(configurer -> {
configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024); // 2MB
configurer.defaultCodecs().enableLoggingRequestDetails(true);
});
}
@Bean
@Qualifier("authWebClient")
public WebClient authWebClient(WebClient.Builder builder, WebClientProperties properties) {
return builder
.baseUrl(properties.getAuthService().getBaseUrl())
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.clientConnector(createConnector(properties.getAuthService()))
.filter(loggingFilter("AUTH"))
.build();
}
@Bean
@Qualifier("paymentWebClient")
public WebClient paymentWebClient(WebClient.Builder builder, WebClientProperties properties) {
return builder
.baseUrl(properties.getPaymentService().getBaseUrl())
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.clientConnector(createConnector(properties.getPaymentService()))
.filter(loggingFilter("PAYMENT"))
.filter(retryFilter())
.build();
}
private ReactorClientHttpConnector createConnector(ServiceConfig config) {
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout())
.responseTimeout(Duration.ofSeconds(config.getResponseTimeout()))
.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(config.getReadTimeout()))
.addHandlerLast(new WriteTimeoutHandler(config.getWriteTimeout()))
);
return new ReactorClientHttpConnector(httpClient);
}
/**
* 로깅 필터
*/
private ExchangeFilterFunction loggingFilter(String serviceName) {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
log.info("[{}] 요청: {} {}", serviceName, clientRequest.method(), clientRequest.url());
return Mono.just(clientRequest);
});
}
/**
* 재시도 필터
*/
private ExchangeFilterFunction retryFilter() {
return (request, next) -> {
return next.exchange(request)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof ConnectTimeoutException ||
throwable instanceof ReadTimeoutException)
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
new ServiceUnavailableException("서비스 응답 없음")
)
);
};
}
}
@ConfigurationProperties(prefix = "webclient")
@Data
public class WebClientProperties {
private ServiceConfig authService = new ServiceConfig();
private ServiceConfig paymentService = new ServiceConfig();
private ServiceConfig inventoryService = new ServiceConfig();
@Data
public static class ServiceConfig {
private String baseUrl;
private int connectTimeout = 5000;
private int responseTimeout = 10;
private int readTimeout = 10;
private int writeTimeout = 10;
private boolean enableRetry = true;
private int maxRetryAttempts = 3;
}
}
webclient:
auth-service:
base-url: ${AUTH_SERVICE_URL:http://auth-service:8080}
connect-timeout: 3000
response-timeout: 5
read-timeout: 5
write-timeout: 5
payment-service:
base-url: ${PAYMENT_SERVICE_URL:http://payment-service:8080}
connect-timeout: 5000
response-timeout: 15
read-timeout: 15
write-timeout: 15
enable-retry: true
max-retry-attempts: 3
inventory-service:
base-url: ${INVENTORY_SERVICE_URL:http://inventory-service:8080}
connect-timeout: 3000
response-timeout: 8
@Component
@RequiredArgsConstructor
public class WebClientMetricsFilter implements ExchangeFilterFunction {
private final MeterRegistry meterRegistry;
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
Timer.Sample sample = Timer.start(meterRegistry);
return next.exchange(request)
.doOnNext(response -> {
sample.stop(Timer.builder("webclient.request")
.tag("method", request.method().name())
.tag("uri", getUriTemplate(request))
.tag("status", String.valueOf(response.statusCode().value()))
.register(meterRegistry));
})
.doOnError(error -> {
sample.stop(Timer.builder("webclient.request")
.tag("method", request.method().name())
.tag("uri", getUriTemplate(request))
.tag("status", "error")
.tag("exception", error.getClass().getSimpleName())
.register(meterRegistry));
});
}
private String getUriTemplate(ClientRequest request) {
return request.url().getPath();
}
}
@Slf4j
@Component
public class WebClientLoggingFilter implements ExchangeFilterFunction {
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
long startTime = System.currentTimeMillis();
String traceId = MDC.get("traceId");
logRequest(request, traceId);
return next.exchange(request)
.doOnNext(response -> logResponse(request, response, startTime, traceId))
.doOnError(error -> logError(request, error, startTime, traceId));
}
private void logRequest(ClientRequest request, String traceId) {
log.info("WebClient 요청 시작 - traceId: {}, method: {}, uri: {}",
traceId, request.method(), request.url());
}
private void logResponse(ClientRequest request, ClientResponse response,
long startTime, String traceId) {
long duration = System.currentTimeMillis() - startTime;
log.info("WebClient 응답 완료 - traceId: {}, method: {}, uri: {}, status: {}, duration: {}ms",
traceId, request.method(), request.url(), response.statusCode(), duration);
}
private void logError(ClientRequest request, Throwable error,
long startTime, String traceId) {
long duration = System.currentTimeMillis() - startTime;
log.error("WebClient 요청 실패 - traceId: {}, method: {}, uri: {}, error: {}, duration: {}ms",
traceId, request.method(), request.url(), error.getMessage(), duration);
}
}
@Component
@RequiredArgsConstructor
public class CircuitBreakerFilter implements ExchangeFilterFunction {
private final CircuitBreakerRegistry circuitBreakerRegistry;
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
String serviceName = extractServiceName(request.url());
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(serviceName);
return Mono.fromCallable(() -> circuitBreaker.executeSupplier(() -> next.exchange(request)))
.flatMap(mono -> mono)
.onErrorMap(CallNotPermittedException.class,
ex -> new ServiceUnavailableException("서비스가 일시적으로 사용 불가능합니다."));
}
private String extractServiceName(URI uri) {
return uri.getHost();
}
}
@Component
@Slf4j
public class WebClientErrorHandler {
/**
* 표준화된 에러 처리
*/
public <T> Mono<T> handleError(ClientResponse response, Class<T> responseType) {
if (response.statusCode().is4xxClientError()) {
return handle4xxError(response);
} else if (response.statusCode().is5xxServerError()) {
return handle5xxError(response);
}
return response.bodyToMono(responseType);
}
private <T> Mono<T> handle4xxError(ClientResponse response) {
return response.bodyToMono(ErrorResponse.class)
.flatMap(errorResponse -> {
HttpStatus status = response.statusCode();
switch (status) {
case UNAUTHORIZED:
return Mono.error(new AuthenticationException(errorResponse.getMessage()));
case FORBIDDEN:
return Mono.error(new AuthorizationException(errorResponse.getMessage()));
case NOT_FOUND:
return Mono.error(new ResourceNotFoundException(errorResponse.getMessage()));
case BAD_REQUEST:
return Mono.error(new ValidationException(errorResponse.getMessage()));
default:
return Mono.error(new ClientException(errorResponse.getMessage()));
}
});
}
private <T> Mono<T> handle5xxError(ClientResponse response) {
return response.bodyToMono(String.class)
.flatMap(errorBody -> {
log.error("서버 에러 발생: status={}, body={}", response.statusCode(), errorBody);
return Mono.error(new ServerException("서버에서 오류가 발생했습니다."));
});
}
}
@ExtendWith(MockitoExtension.class)
class OrderServiceTest {
@Mock
private WebClient paymentWebClient;
@Mock
private WebClient.RequestBodyUriSpec requestBodyUriSpec;
@Mock
private WebClient.RequestBodySpec requestBodySpec;
@Mock
private WebClient.ResponseSpec responseSpec;
@InjectMocks
private OrderService orderService;
@Test
void 결제_처리_성공_테스트() {
// Given
PaymentRequest paymentRequest = PaymentRequest.builder()
.orderId(1L)
.amount(BigDecimal.valueOf(10000))
.build();
PaymentResponse paymentResponse = PaymentResponse.builder()
.paymentId("PAY123")
.success(true)
.build();
when(paymentWebClient.post()).thenReturn(requestBodyUriSpec);
when(requestBodyUriSpec.uri("/payment/process")).thenReturn(requestBodySpec);
when(requestBodySpec.bodyValue(any())).thenReturn(requestBodySpec);
when(requestBodySpec.retrieve()).thenReturn(responseSpec);
when(responseSpec.bodyToMono(PaymentResponse.class)).thenReturn(Mono.just(paymentResponse));
// When
Mono<PaymentResponse> result = orderService.processPayment(createTestOrder());
// Then
StepVerifier.create(result)
.expectNext(paymentResponse)
.verifyComplete();
}
@Test
void 결제_처리_실패_테스트() {
// Given
when(paymentWebClient.post()).thenReturn(requestBodyUriSpec);
when(requestBodyUriSpec.uri("/payment/process")).thenReturn(requestBodySpec);
when(requestBodySpec.bodyValue(any())).thenReturn(requestBodySpec);
when(requestBodySpec.retrieve()).thenReturn(responseSpec);
when(responseSpec.bodyToMono(PaymentResponse.class))
.thenReturn(Mono.error(new PaymentException("결제 실패")));
// When
Mono<PaymentResponse> result = orderService.processPayment(createTestOrder());
// Then
StepVerifier.create(result)
.expectError(PaymentException.class)
.verify();
}
}
@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
class OrderIntegrationTest {
@RegisterExtension
static WireMockExtension wireMock = WireMockExtension.newInstance()
.options(wireMockConfig().port(8089))
.build();
@Autowired
private OrderService orderService;
@Test
@Order(1)
void 주문_처리_통합_테스트() {
// Given
setupWireMockStubs();
OrderRequest request = OrderRequest.builder()
.productId(1L)
.quantity(2)
.paymentMethod("CARD")
.build();
// When
Mono<OrderResponse> result = orderService.processOrder(request, "user123");
// Then
StepVerifier.create(result)
.assertNext(response -> {
assertThat(response.getOrderId()).isNotNull();
assertThat(response.getStatus()).isEqualTo(OrderStatus.PAID);
})
.verifyComplete();
// 호출 검증
wireMock.verify(postRequestedFor(urlEqualTo("/inventory/check")));
wireMock.verify(postRequestedFor(urlEqualTo("/payment/process")));
}
private void setupWireMockStubs() {
// 재고 확인 성공 응답
wireMock.stubFor(post(urlEqualTo("/inventory/check"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("{\"available\": true, \"stock\": 100}")));
// 결제 처리 성공 응답
wireMock.stubFor(post(urlEqualTo("/payment/process"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("{\"paymentId\": \"PAY123\", \"success\": true}")));
}
}
// ✅ 좋은 예: 커넥션 풀 재사용
@Bean
public WebClient optimizedWebClient() {
ConnectionProvider provider = ConnectionProvider.builder("custom")
.maxConnections(500)
.maxIdleTime(Duration.ofSeconds(20))
.maxLifeTime(Duration.ofSeconds(60))
.pendingAcquireTimeout(Duration.ofSeconds(5))
.evictInBackground(Duration.ofSeconds(120))
.build();
HttpClient httpClient = HttpClient.create(provider);
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
// ❌ 나쁜 예: 매번 새로운 WebClient 생성
public Mono<String> badExample() {
return WebClient.create() // 매번 새로 생성
.get()
.uri("http://api.example.com/data")
.retrieve()
.bodyToMono(String.class);
}
// ✅ 대용량 데이터 스트리밍 처리
public Flux<DataItem> streamLargeDataset() {
return webClient.get()
.uri("/api/large-dataset")
.retrieve()
.bodyToFlux(DataItem.class)
.buffer(100) // 100개씩 버퍼링
.flatMap(batch -> processBatch(batch))
.onBackpressureBuffer(1000); // 백프레셔 제어
}
// ❌ 나쁜 예: 전체 데이터를 메모리에 로드
public Mono<List<DataItem>> badStreamExample() {
return webClient.get()
.uri("/api/large-dataset")
.retrieve()
.bodyToFlux(DataItem.class)
.collectList(); // 모든 데이터를 메모리에 적재
}
@Service
public class ResilientApiService {
private final WebClient webClient;
/**
* 계층별 타임아웃 설정
*/
public Mono<ApiResponse> callWithTimeoutStrategy(ApiRequest request) {
return webClient.post()
.uri("/api/process")
.bodyValue(request)
.retrieve()
.bodyToMono(ApiResponse.class)
// 1. HTTP 응답 타임아웃 (빠른 실패)
.timeout(Duration.ofSeconds(5))
// 2. 재시도 로직
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof TimeoutException))
// 3. 최종 타임아웃 (전체 작업)
.timeout(Duration.ofSeconds(20))
// 4. 폴백 처리
.onErrorReturn(ApiResponse.fallback());
}
}
@Service
@RequiredArgsConstructor
public class DynamicApiService {
private final WebClient webClient;
private final LoadBalancer loadBalancer;
/**
* 로드 밸런싱과 동적 URL 구성
*/
public Mono<String> callWithLoadBalancing(String serviceName, String path) {
return Mono.fromCallable(() -> loadBalancer.chooseServer(serviceName))
.flatMap(server -> webClient.get()
.uri(uriBuilder -> uriBuilder
.scheme("http")
.host(server.getHost())
.port(server.getPort())
.path(path)
.queryParam("timestamp", Instant.now().toEpochMilli())
.build())
.retrieve()
.bodyToMono(String.class)
.onErrorResume(error -> {
loadBalancer.markServerDown(server);
return Mono.error(error);
}));
}
/**
* 환경별 다른 엔드포인트 호출
*/
public Mono<ConfigResponse> getEnvironmentConfig() {
String environment = getActiveEnvironment();
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/config/{env}")
.build(environment))
.retrieve()
.bodyToMono(ConfigResponse.class);
}
}
@Component
@Slf4j
public class SecureWebClientFilter implements ExchangeFilterFunction {
private final Set<String> sensitiveHeaders = Set.of("authorization", "x-api-key", "cookie");
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
// 로깅 시 민감 정보 마스킹
logSecureRequest(request);
return next.exchange(request)
.doOnNext(this::logSecureResponse);
}
private void logSecureRequest(ClientRequest request) {
Map<String, String> maskedHeaders = request.headers().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> sensitiveHeaders.contains(entry.getKey().toLowerCase()) ?
"***" : String.join(",", entry.getValue())
));
log.info("요청: {} {} headers: {}",
request.method(), request.url(), maskedHeaders);
}
private void logSecureResponse(ClientResponse response) {
log.info("응답: status={}", response.statusCode());
// 응답 본문은 로깅하지 않음 (민감 정보 포함 가능)
}
}
@Configuration
public class SecureWebClientConfig {
@Bean
public WebClient secureWebClient() throws Exception {
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
HttpClient httpClient = HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
/**
* 운영 환경용 - 인증서 검증 활성화
*/
@Bean
@Profile("prod")
public WebClient productionWebClient() throws Exception {
// 커스텀 트러스트 스토어 설정
KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream("truststore.jks"), "password".toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(trustStore);
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(tmf)
.build();
HttpClient httpClient = HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
}
@Component
@RequiredArgsConstructor
public class WebClientHealthIndicator implements HealthIndicator {
private final Map<String, WebClient> serviceClients;
@Override
public Health health() {
Health.Builder builder = Health.up();
serviceClients.forEach((serviceName, client) -> {
try {
String status = client.get()
.uri("/health")
.retrieve()
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(2))
.block();
builder.withDetail(serviceName, "UP");
} catch (Exception e) {
builder.withDetail(serviceName, "DOWN - " + e.getMessage());
builder.down();
}
});
return builder.build();
}
}
@Service
@RequiredArgsConstructor
public class BusinessMetricsService {
private final MeterRegistry meterRegistry;
private final WebClient apiClient;
/**
* 비즈니스 메트릭과 함께 API 호출
*/
public Mono<OrderResponse> createOrderWithMetrics(OrderRequest request) {
return Timer.Sample.start(meterRegistry)
.stop(meterRegistry.timer("business.order.creation.time"))
.then(apiClient.post()
.uri("/orders")
.bodyValue(request)
.retrieve()
.bodyToMono(OrderResponse.class)
.doOnSuccess(response -> {
// 성공 카운터 증가
meterRegistry.counter("business.order.created",
"payment_method", request.getPaymentMethod(),
"product_category", request.getProductCategory())
.increment();
// 주문 금액 게이지 업데이트
meterRegistry.gauge("business.order.amount.latest",
response.getTotalAmount().doubleValue());
})
.doOnError(error -> {
// 실패 카운터 증가
meterRegistry.counter("business.order.failed",
"error_type", error.getClass().getSimpleName())
.increment();
}));
}
}
@EventListener
@Component
@RequiredArgsConstructor
public class WebClientAlertHandler {
private final NotificationService notificationService;
private final MeterRegistry meterRegistry;
/**
* 에러율 임계값 초과 시 알림
*/
@Scheduled(fixedRate = 30000) // 30초마다 체크
public void checkErrorRates() {
double errorRate = calculateErrorRate();
if (errorRate > 0.05) { // 5% 초과 시
AlertRequest alert = AlertRequest.builder()
.level(AlertLevel.WARNING)
.title("WebClient 에러율 임계값 초과")
.message(String.format("현재 에러율: %.2f%%", errorRate * 100))
.timestamp(Instant.now())
.build();
notificationService.sendAlert(alert);
}
}
private double calculateErrorRate() {
Counter successCounter = meterRegistry.counter("webclient.request", "status", "success");
Counter errorCounter = meterRegistry.counter("webclient.request", "status", "error");
double total = successCounter.count() + errorCounter.count();
return total > 0 ? errorCounter.count() / total : 0;
}
}
@RestController
@RequiredArgsConstructor
@Slf4j
public class ApiGatewayController {
private final Map<String, WebClient> serviceClients;
private final AuthTokenService authTokenService;
/**
* 동적 라우팅과 요청 전달
*/
@PostMapping("/api/{service}/**")
public Mono<ResponseEntity<String>> routeRequest(
@PathVariable String service,
@RequestBody(required = false) String body,
ServerHttpRequest request,
@RequestHeader("Authorization") String authHeader) {
return authTokenService.validateToken(extractToken(authHeader))
.filter(Boolean::booleanValue)
.switchIfEmpty(Mono.error(new UnauthorizedException("유효하지 않은 토큰")))
.flatMap(valid -> {
WebClient client = serviceClients.get(service);
if (client == null) {
return Mono.error(new ServiceNotFoundException("서비스를 찾을 수 없음: " + service));
}
String path = extractPath(request);
return client.method(request.getMethod())
.uri(uriBuilder -> uriBuilder.path(path)
.query(request.getURI().getQuery())
.build())
.headers(headers -> copyHeaders(request.getHeaders(), headers))
.bodyValue(body != null ? body : "")
.retrieve()
.toEntity(String.class);
})
.timeout(Duration.ofSeconds(30))
.onErrorMap(TimeoutException.class,
ex -> new ServiceTimeoutException("서비스 응답 시간 초과"));
}
private String extractToken(String authHeader) {
return authHeader.startsWith("Bearer ") ? authHeader.substring(7) : authHeader;
}
private String extractPath(ServerHttpRequest request) {
String fullPath = request.getPath().value();
return fullPath.replaceFirst("/api/[^/]+", "");
}
private void copyHeaders(HttpHeaders source, HttpHeaders target) {
source.forEach((key, values) -> {
if (!key.equalsIgnoreCase("host") &&
!key.equalsIgnoreCase("content-length")) {
target.addAll(key, values);
}
});
}
}
@Configuration
public class PerformanceOptimizedConfig {
@Bean
public WebClient highPerformanceWebClient() {
// 커넥션 풀 설정
ConnectionProvider provider = ConnectionProvider.builder("high-perf")
.maxConnections(1000) // 최대 연결 수
.maxIdleTime(Duration.ofSeconds(30)) // 유휴 연결 유지 시간
.maxLifeTime(Duration.ofMinutes(10)) // 연결 최대 생명주기
.pendingAcquireTimeout(Duration.ofSeconds(3)) // 연결 대기 시간
.evictInBackground(Duration.ofSeconds(60)) // 백그라운드 정리 주기
.build();
// HTTP 클라이언트 최적화
HttpClient httpClient = HttpClient.create(provider)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10)));
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.codecs(configurer -> {
configurer.defaultCodecs().maxInMemorySize(5 * 1024 * 1024); // 5MB
configurer.defaultCodecs().enableLoggingRequestDetails(false); // 운영환경에서는 비활성화
})
.build();
}
}
@Service
@RequiredArgsConstructor
public class BatchProcessingService {
private final WebClient webClient;
/**
* 대량 데이터 배치 처리
*/
public Mono<BatchResult> processBatchRequests(List<BatchItem> items) {
return Flux.fromIterable(items)
.buffer(50) // 50개씩 배치
.flatMap(batch -> processBatchChunk(batch), 4) // 4개 동시 처리
.reduce(new BatchResult(), BatchResult::merge)
.timeout(Duration.ofMinutes(5));
}
private Mono<BatchResult> processBatchChunk(List<BatchItem> chunk) {
return webClient.post()
.uri("/api/batch")
.bodyValue(chunk)
.retrieve()
.bodyToMono(BatchResult.class)
.retryWhen(Retry.backoff(2, Duration.ofSeconds(1)))
.onErrorReturn(BatchResult.failed(chunk.size()));
}
}
실제 운영 환경에서 WebClient를 사용할 때는 다음 사항들을 꼭 고려해야 합니다:
이러한 요소들을 고려하여 WebClient를 구성하면, 안정적이고 확장 가능한 마이크로서비스 아키텍처를 구축할 수 있습니다.