인프런 "파이썬으로 쉽게 배우는 gRPC!" 강의 내용 정리
클라이언트 기준
| 상황 | 이유 |
|---|---|
| 응답이 너무 느릴 때 | 시간 초과를 두고 사용자가 기다리지 않게 함 |
| 사용자가 요청을 중단했을 때 | 사용자가 무엇인가 취소했을 때 |
| 더 이상 응답이 필요 없을 때 | 빠른 응답을 이미 받아서 나머지 응답이 의미 없을 때 |
| 병렬 요청 중 다른 하나가 먼저 성공했을 때 | 나머지 요청 취소 |
서버 기준
| 상황 | 이유 |
|---|---|
| 클라이언트가 연결을 끊었을 때 | 불필요한 처리 중단 |
| 내부 상태가 변경되어 요청을 처리할 수 없을 때 | 리소스 낭비 방지 |
| 인증 실패, 제한 초과 등 정책 위반 시 | 서비스 보호 |
StreamObserver<ChatRequest> requestObserver =
asyncStub.chat(new StreamObserver<ChatResponse>() {
public void onNext(ChatResponse response) {
System.out.println("Response: " + response.getMessage());
}
public void onError(Throwable t) {
System.out.println("Server error: " + t);
}
public void onCompleted() {
System.out.println("Finished.");
}
});
// 메시지 전송
requestObserver.onNext(ChatRequest.newBuilder().setMessage("Hello").build());
// 취소 조건이 생기면
requestObserver.onError(Status.CANCELLED.asRuntimeException());
Context context = Context.current();
if (context.isCancelled()) {
// 요청이 취소됨
}
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 50051)
.usePlaintext()
.build();
MyServiceGrpc.MyServiceStub stub = MyServiceGrpc.newStub(channel)
.withCompression("gzip"); // 압축 사용 설정
Server -> Client
Server server = ServerBuilder.forPort(50051)
.addService(ServerInterceptors.intercept(
new MyServiceImpl(),
new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
call.setCompression("gzip"); // 응답 압축
return next.startCall(call, headers);
}
}
))
.build()
.start();
요청에 사용자 토큰 추가
public class AuthClientInterceptor implements ClientInterceptor {
private final String token;
public AuthClientInterceptor(String token) {
this.token = token;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), token);
super.start(responseListener, headers);
}
};
}
}
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
Channel interceptedChannel = ClientInterceptors.intercept(channel, new AuthClientInterceptor("Bearer mytoken"));
MyServiceGrpc.MyServiceBlockingStub stub = MyServiceGrpc.newBlockingStub(interceptedChannel);
요청의 토큰 검증
public class AuthServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String token = headers.get(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER));
if (token == null || !token.equals("Bearer mytoken")) {
call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), headers);
return new ServerCall.Listener<>() {}; // no-op listener
}
return next.startCall(call, headers);
}
}
Server server = ServerBuilder.forPort(50051)
.addService(ServerInterceptors.intercept(new MyServiceImpl(), new AuthServerInterceptor()))
.build()
.start();
ChatServiceGrpc.ChatServiceBlockingStub stub = ChatServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(3, TimeUnit.SECONDS); // 3초 내 응답 못 받으면 예외 발생
ChatRequest request = ChatRequest.newBuilder().setMessage("hello").build();
try {
ChatResponse response = stub.sendMessage(request);
System.out.println(response.getReply());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
System.out.println("💥 요청 시간이 초과되었습니다.");
}
}
stream timeout 설정
ChatServiceGrpc.ChatServiceStub asyncStub = ChatServiceGrpc
.newStub(channel)
.withDeadlineAfter(10, TimeUnit.SECONDS); // 전체 스트리밍 세션 10초 제한
StreamObserver<ChatMessage> requestObserver = asyncStub.chat(
new StreamObserver<ChatResponse>() {
@Override
public void onNext(ChatResponse value) {
System.out.println("서버 응답: " + value.getMessage());
}
@Override
public void onError(Throwable t) {
System.err.println("💥 에러 발생: " + Status.fromThrowable(t));
}
@Override
public void onCompleted() {
System.out.println("서버가 스트림 종료");
}
}
);
// 클라이언트에서 메시지 전송
requestObserver.onNext(ChatMessage.newBuilder().setMessage("안녕하세요!").build());
// 몇 초 후에 종료
requestObserver.onCompleted();
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>1.63.0</version>
</dependency>
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
public class MyGrpcServer {
public static void main(String[] args) throws Exception {
// 헬스 체크 매니저 생성
HealthStatusManager healthStatusManager = new HealthStatusManager();
Server server = ServerBuilder.forPort(50051)
.addService(new MyServiceImpl()) // 실제 gRPC 서비스
.addService(healthStatusManager.getHealthService()) // 헬스 서비스 등록
.build()
.start();
// 서비스별 상태 설정 (""는 전체 서버 상태)
healthStatusManager.setStatus("my.grpc.MyService", ServingStatus.SERVING);
healthStatusManager.setStatus("", ServingStatus.SERVING); // 전체 서버 상태
System.out.println("gRPC 서버 시작됨");
server.awaitTermination();
}
}
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
public class MyGrpcServer {
public static void main(String[] args) throws Exception {
// 헬스 체크 매니저 생성
HealthStatusManager healthStatusManager = new HealthStatusManager();
Server server = ServerBuilder.forPort(50051)
.addService(new MyServiceImpl()) // 실제 gRPC 서비스
.addService(healthStatusManager.getHealthService()) // 헬스 서비스 등록
.build()
.start();
// 서비스별 상태 설정 (""는 전체 서버 상태)
healthStatusManager.setStatus("my.grpc.MyService", ServingStatus.SERVING);
healthStatusManager.setStatus("", ServingStatus.SERVING); // 전체 서버 상태
System.out.println("gRPC 서버 시작됨");
server.awaitTermination();
}
}
healthStatusManager.setStatus("my.grpc.MyService", ServingStatus.NOT_SERVING); // 예: 종료 직전
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>1.64.0</version> <!-- gRPC 버전에 맞게 조정 -->
</dependency>
import io.grpc.protobuf.services.ProtoReflectionService;
Server server = ServerBuilder.forPort(50051)
.addService(new YourGrpcServiceImpl())
.addService(ProtoReflectionService.newInstance()) // 리플렉션 등록
.build()
.start();