gRPC 통신 기법

유기훈·2025년 4월 5일

인프런 "파이썬으로 쉽게 배우는 gRPC!" 강의 내용 정리

요청 취소

요청 취소란

  • 요청 취소 기법은 클라이언트가 서버에 보낸 요청을 중단하는 기법을 말한다.
  • 클라이언트가 더 이상 응답을 기다리지 않겠다는 신호를 서버에 보내는 것이다.

요청 취소가 필요한 이유

  • gRPC는 한번의 응답, 한번의 요청이라는 제한이 없다.
  • 사용자 인터페이스와의 상호작용
  • 타임 아웃
  • 리소스 절약
  • 오류 처리

클라이언트 기준

상황이유
응답이 너무 느릴 때시간 초과를 두고 사용자가 기다리지 않게 함
사용자가 요청을 중단했을 때사용자가 무엇인가 취소했을 때
더 이상 응답이 필요 없을 때빠른 응답을 이미 받아서 나머지 응답이 의미 없을 때
병렬 요청 중 다른 하나가 먼저 성공했을 때나머지 요청 취소

서버 기준

상황이유
클라이언트가 연결을 끊었을 때불필요한 처리 중단
내부 상태가 변경되어 요청을 처리할 수 없을 때리소스 낭비 방지
인증 실패, 제한 초과 등 정책 위반 시서비스 보호

요청 취소에 관련된 주요 API 문법!

Client Streaming / Bidirectional Streaming 에서 취소 할 때

  • onError 메서드를 호출하여 요청을 취소한다.
StreamObserver<ChatRequest> requestObserver =
    asyncStub.chat(new StreamObserver<ChatResponse>() {
        public void onNext(ChatResponse response) {
            System.out.println("Response: " + response.getMessage());
        }

        public void onError(Throwable t) {
            System.out.println("Server error: " + t);
        }

        public void onCompleted() {
            System.out.println("Finished.");
        }
    });

// 메시지 전송
requestObserver.onNext(ChatRequest.newBuilder().setMessage("Hello").build());

// 취소 조건이 생기면
requestObserver.onError(Status.CANCELLED.asRuntimeException());

Context 이벤트 리스너를 활용한 요청 취소 이벤트 처리

  • Context: io.grpc.Context는 gRPC의 스레드 로컬(ThreadLocal) 기반 컨텍스트로, 요청마다 생성되고 취소/타임아웃 등의 정보를 담는다.
  • Context 특징
    - 요청 생명 주기 동안 유효
    • withDeadline, withCancellation 등 사용 가능
    • 취소되면 isCancelled()로 확인 가능
    • 서버에서 Context.current().addListener()로 취소 이벤트 리스닝 가능
Context context = Context.current();
if (context.isCancelled()) {
    // 요청이 취소됨
}

데이터 압축

gRPC의 압축 기법

압축 기법이란?

  • 네트워크 통신에서 전송되는 데이터를 압축해서 보내는 것!

왜 필요할까

  • 네트워크 대역폭 절약
  • 전송 속도 향상
  • 비용 절감

gRPC 데이터 압축을 위한 API 문법

데이터 압축에 관련된 주요 API 문법

  • NoCompression
  • Depflate
  • Gzip

gRPC 데이터 압축 동작 방식

  • gRPC는 데이터 한개가 있을 때 함수를 호출해서 이 데이터를 압축하고 이 압축한 데이터를 보내는 방식이 아니다.
  • 서버와 채널을 만들 때 컴프레션 옵션을 줌으로써 서버 전체에, 그리고 클라이언트 전체에 적용되는 기본 압축 방식을 설정한다.
  • 서버에서 gzip 압축을 하면, 클라이언트에서는 gzip 지원시 gzip으로 자동 압축 해제가 이루어진다.
  • 압축 방식을 따로 지정하지 않으면 NoCompression으로 설정된다.
  • 압축 방식을 지정하는 방법은 아래 코드와 같다.
    Client -> Server
ManagedChannel channel = ManagedChannelBuilder
    .forAddress("localhost", 50051)
    .usePlaintext()
    .build();

MyServiceGrpc.MyServiceStub stub = MyServiceGrpc.newStub(channel)
    .withCompression("gzip"); // 압축 사용 설정

Server -> Client

Server server = ServerBuilder.forPort(50051)
    .addService(ServerInterceptors.intercept(
        new MyServiceImpl(),
        new ServerInterceptor() {
            @Override
            public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
                ServerCall<ReqT, RespT> call,
                Metadata headers,
                ServerCallHandler<ReqT, RespT> next) {

                call.setCompression("gzip"); // 응답 압축
                return next.startCall(call, headers);
            }
        }
    ))
    .build()
    .start();

NoCompression

  • 압축을 하지 않는 방식
  • 데이터가 굉장히 작거나 압축률이 굉장히 낮게 나오는 데이터 형태의 경우 오히려 압축을 하는게 오버해드가 큰 경우가 있음

Delfate 알고리즘

  • 빠른 압축 해제 속도와 높은 압축률을 자랑하지만 많이 사용하진 않는다.

gZip 알고리즘

  • deflate와 동일한 알고리즘을 사용하지만 오히려 확장성이 좋고 지원성이 좋아서 일반적으로 압축 기법을 사용한다면 gzip 기법을 많이 쓴다.

데이터 압축시 고려사항

압축 레벨

  • 같은 알고리즘이라 하더라도 압축 레벨을 다르게 설정할 수 있다.
  • 압축 레벨이 높을수록 압축률이 올라가고 압축 해제 시간이 올라간다.

상황

  • 서버가 모든 데이터를 압축한다면, 서버가 필요한 작업은 못하고 압축하는데 시간을 많이 쓸 수 있다.

gRPC Interceptor 개요

인터셉터!

  • gRPC 호출의 전후에 특정 작업을 수행할 수 있는 강력한 기능
  • gRPC 인터셉터는 서버와 클라이언트 간의 통신 흐름에 개입할 수 있기 때문에 로깅이나 인증, 모니터링 등 다양한 기능을 추가할 수 있다.
  • 에러처리

종류

  • 클라이언트 인터셉터: 클라이언트 측에서 요청을 보내기 전이나 응답을 받은 후에 동작
  • 서버 인터셉터: 요청을 받은 후나 응답을 보내기 전에 동작

사용 예제

Client Interceptor

요청에 사용자 토큰 추가

public class AuthClientInterceptor implements ClientInterceptor {

    private final String token;

    public AuthClientInterceptor(String token) {
        this.token = token;
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
        MethodDescriptor<ReqT, RespT> method,
        CallOptions callOptions,
        Channel next) {

        return new ForwardingClientCall.SimpleForwardingClientCall<>(
            next.newCall(method, callOptions)) {

            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), token);
                super.start(responseListener, headers);
            }
        };
    }
}
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
    .usePlaintext()
    .build();

Channel interceptedChannel = ClientInterceptors.intercept(channel, new AuthClientInterceptor("Bearer mytoken"));

MyServiceGrpc.MyServiceBlockingStub stub = MyServiceGrpc.newBlockingStub(interceptedChannel);

Server Interceptor

요청의 토큰 검증

public class AuthServerInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
        ServerCall<ReqT, RespT> call,
        Metadata headers,
        ServerCallHandler<ReqT, RespT> next) {

        String token = headers.get(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER));

        if (token == null || !token.equals("Bearer mytoken")) {
            call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), headers);
            return new ServerCall.Listener<>() {}; // no-op listener
        }

        return next.startCall(call, headers);
    }
}
Server server = ServerBuilder.forPort(50051)
    .addService(ServerInterceptors.intercept(new MyServiceImpl(), new AuthServerInterceptor()))
    .build()
    .start();

에러 핸들링

네트워크 통신 중 에러는?

  • 네트워크가 불안정하거나 패킷이 손상되거나 과부하가 될 수 있다. 이러한 이유로 gRPC는 다양한 에러 처리 방법을 제공하고 있다.
  • 대표적으로 재시도 기법, 타임아웃 기법이 있다.
  • gRPC는 발생한 에러 별로 다른 상태 코드를 제공한다.

상태코드

gRPC 타임아웃 기법

  • gRPC에서는 타임아웃을 Deadline 또는 timeout 이라고 부른다.
  • 클라이언트에서 요청마다 CallOptions를 설정하거나 stub 메서드 호출에 직접 타임아웃을 설정할 수 있다.
    Unary 통신 방식에서 timeout 설정
ChatServiceGrpc.ChatServiceBlockingStub stub = ChatServiceGrpc.newBlockingStub(channel)
    .withDeadlineAfter(3, TimeUnit.SECONDS); // 3초 내 응답 못 받으면 예외 발생

ChatRequest request = ChatRequest.newBuilder().setMessage("hello").build();

try {
    ChatResponse response = stub.sendMessage(request);
    System.out.println(response.getReply());
} catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
        System.out.println("💥 요청 시간이 초과되었습니다.");
    }
}

stream timeout 설정

  • stream 세션 유지 기간이다. 지정된 시간 이후 stream 세션이 종료되고 통신이 불가능하다.
ChatServiceGrpc.ChatServiceStub asyncStub = ChatServiceGrpc
    .newStub(channel)
    .withDeadlineAfter(10, TimeUnit.SECONDS); // 전체 스트리밍 세션 10초 제한

StreamObserver<ChatMessage> requestObserver = asyncStub.chat(
    new StreamObserver<ChatResponse>() {
        @Override
        public void onNext(ChatResponse value) {
            System.out.println("서버 응답: " + value.getMessage());
        }

        @Override
        public void onError(Throwable t) {
            System.err.println("💥 에러 발생: " + Status.fromThrowable(t));
        }

        @Override
        public void onCompleted() {
            System.out.println("서버가 스트림 종료");
        }
    }
);

// 클라이언트에서 메시지 전송
requestObserver.onNext(ChatMessage.newBuilder().setMessage("안녕하세요!").build());

// 몇 초 후에 종료
requestObserver.onCompleted();

서버 헬스체크

헬스체크가 필요한 이유

  • 가용성 유지. 헬스체크를 통해 서비스가 정상적으로 작동하는지 지속적으로 확인할 수 있다.
  • 문제가 발생하면 이를 즉시 감지하고 대체 서버로의 전환등을 통해 가용성을 유지할 수 있다.

gRPC 헬스체크 방법

  • 미리 정의된 gRPC 메서드와 서비스를 사용하면된다.

Maven 의존성 추가

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-services</artifactId>
  <version>1.63.0</version>
</dependency>

서버 설정 - HealthStatusManager 사용

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;

public class MyGrpcServer {
    public static void main(String[] args) throws Exception {
        // 헬스 체크 매니저 생성
        HealthStatusManager healthStatusManager = new HealthStatusManager();

        Server server = ServerBuilder.forPort(50051)
            .addService(new MyServiceImpl()) // 실제 gRPC 서비스
            .addService(healthStatusManager.getHealthService()) // 헬스 서비스 등록
            .build()
            .start();

        // 서비스별 상태 설정 (""는 전체 서버 상태)
        healthStatusManager.setStatus("my.grpc.MyService", ServingStatus.SERVING);
        healthStatusManager.setStatus("", ServingStatus.SERVING); // 전체 서버 상태

        System.out.println("gRPC 서버 시작됨");

        server.awaitTermination();
    }
}

클라이언트에서 헬스 체크 호출

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;

public class MyGrpcServer {
    public static void main(String[] args) throws Exception {
        // 헬스 체크 매니저 생성
        HealthStatusManager healthStatusManager = new HealthStatusManager();

        Server server = ServerBuilder.forPort(50051)
            .addService(new MyServiceImpl()) // 실제 gRPC 서비스
            .addService(healthStatusManager.getHealthService()) // 헬스 서비스 등록
            .build()
            .start();

        // 서비스별 상태 설정 (""는 전체 서버 상태)
        healthStatusManager.setStatus("my.grpc.MyService", ServingStatus.SERVING);
        healthStatusManager.setStatus("", ServingStatus.SERVING); // 전체 서버 상태

        System.out.println("gRPC 서버 시작됨");

        server.awaitTermination();
    }
}

상태 변경은 실시간 가능

healthStatusManager.setStatus("my.grpc.MyService", ServingStatus.NOT_SERVING); // 예: 종료 직전

gRPC Reflection

gRPC Reflection 이란

  • gRPC 서버에서 제공하는 서비스와 메서드를 클라이언트가 동적으로 탐색할 수 있도록 하는 기능
  • 디버깅, 개발 및 도구 통합
  • 클라이언트가 서버의 프로토콜 정의를 할지 못해도 서버와 상호작용있음
  • 이건 특히 클라이언트가 서버의 proto 파일 없이도 gRPC 서버와 소통할 수 있게 도와주는 기능
  • gRPC 리플렉션(Server Reflection) 은 gRPC 서버가 자신이 어떤 서비스, 메서드, 메시지 타입을 제공하는지 메타데이터를 클라이언트에게 알려주는 기능
  • 말 그대로 “서버가 제공하는 서비스의 구조”를 런타임에 조회 가능하게 만들어 준다.

사용 방법

Maven 의존성 추가

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-services</artifactId>
  <version>1.64.0</version> <!-- gRPC 버전에 맞게 조정 -->
</dependency>

서버에 리플렉션 서비스 등록

import io.grpc.protobuf.services.ProtoReflectionService;

Server server = ServerBuilder.forPort(50051)
    .addService(new YourGrpcServiceImpl())
    .addService(ProtoReflectionService.newInstance()) // 리플렉션 등록
    .build()
    .start();
profile
개발 블로그

0개의 댓글