
gRPC는 Google에서 개발한 고성능, 오픈소스 RPC(Remote Procedure Call) 프레임워크로, Protocol Buffers를 사용하여 효율적인 데이터 직렬화를 제공합니다.
gRPC 스트리밍은 클라이언트와 서버 간에 지속적인 데이터 스트림을 주고받는 강력한 메커니즘입니다.

rpc GetUser(UserRequest) returns (UserResponse);
rpc StreamStockData(StreamRequest) returns (stream StockData);
rpc UploadData(stream DataChunk) returns (UploadSummary);
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.grpc";
option java_outer_classname = "StreamingProto";
service StreamingService {
// 서버 스트리밍
rpc ServerStreaming(Request) returns (stream Response);
// 클라이언트 스트리밍
rpc ClientStreaming(stream Request) returns (Response);
// 양방향 스트리밍
rpc BidirectionalStreaming(stream Request) returns (stream Response);
}
message Request {
string client_id = 1;
string data = 2;
int64 timestamp = 3;
}
message Response {
string server_id = 1;
string data = 2;
int64 timestamp = 3;
StatusCode status = 4;
}
enum StatusCode {
OK = 0;
ERROR = 1;
PENDING = 2;
}
import io.grpc.stub.StreamObserver;
public class StreamingServiceImpl extends StreamingServiceGrpc.StreamingServiceImplBase {
@Override
public void serverStreaming(Request request, StreamObserver<Response> responseObserver) {
// 요청 매개변수 검증
String clientId = request.getClientId();
System.out.println("Received request from client: " + clientId);
try {
// 데이터 소스에서 스트리밍할 데이터 가져오기
List<String> dataItems = dataService.getDataItems(clientId);
// 각 데이터 항목에 대해 응답 스트리밍
for (String data : dataItems) {
Response response = Response.newBuilder()
.setServerId("server-001")
.setData(data)
.setTimestamp(System.currentTimeMillis())
.setStatus(StatusCode.OK)
.build();
// 응답 스트리밍
responseObserver.onNext(response);
// 실제 상황에서는 백프레셔 제어를 위해 지연 추가 가능
Thread.sleep(100);
}
} catch (Exception e) {
// 오류 처리
Response errorResponse = Response.newBuilder()
.setServerId("server-001")
.setStatus(StatusCode.ERROR)
.setData("Error: " + e.getMessage())
.setTimestamp(System.currentTimeMillis())
.build();
responseObserver.onNext(errorResponse);
} finally {
// 스트림 완료 표시
responseObserver.onCompleted();
}
}
}
import io.grpc.stub.StreamObserver;
public class StreamingServiceImpl extends StreamingServiceGrpc.StreamingServiceImplBase {
@Override
public StreamObserver<Request> clientStreaming(final StreamObserver<Response> responseObserver) {
// 클라이언트로부터 스트림을 처리하기 위한 StreamObserver 반환
return new StreamObserver<Request>() {
// 데이터 수집을 위한 내부 상태 유지
private final List<String> receivedData = new ArrayList<>();
private String clientId = "";
@Override
public void onNext(Request request) {
// 각 요청 처리
clientId = request.getClientId();
receivedData.add(request.getData());
System.out.println("Received data chunk from " + clientId + ": " + request.getData());
}
@Override
public void onError(Throwable t) {
// 오류 처리
System.err.println("Error in client stream: " + t.getMessage());
t.printStackTrace();
// 오류 응답 전송
responseObserver.onNext(Response.newBuilder()
.setServerId("server-001")
.setStatus(StatusCode.ERROR)
.setData("Stream error: " + t.getMessage())
.setTimestamp(System.currentTimeMillis())
.build());
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
// 모든 데이터 수신 완료 시 처리
System.out.println("Client stream completed. Processing " + receivedData.size() + " items");
// 모든 데이터 처리 후 단일 응답 반환
String processedResult = processData(receivedData);
responseObserver.onNext(Response.newBuilder()
.setServerId("server-001")
.setStatus(StatusCode.OK)
.setData("Processed " + receivedData.size() + " items: " + processedResult)
.setTimestamp(System.currentTimeMillis())
.build());
// 응답 완료 표시
responseObserver.onCompleted();
}
private String processData(List<String> data) {
// 실제 데이터 처리 로직
return data.stream().reduce("", (a, b) -> a + b.length() + ",");
}
};
}
}
import io.grpc.stub.StreamObserver;
public class StreamingServiceImpl extends StreamingServiceGrpc.StreamingServiceImplBase {
@Override
public StreamObserver<Request> bidirectionalStreaming(StreamObserver<Response> responseObserver) {
return new StreamObserver<Request>() {
@Override
public void onNext(Request request) {
// 클라이언트 요청 처리
String clientId = request.getClientId();
String receivedData = request.getData();
System.out.println("Received from " + clientId + ": " + receivedData);
// 요청에 대한 즉각적인 응답 생성
String responseData = processRequestData(receivedData);
// 응답 스트리밍
Response response = Response.newBuilder()
.setServerId("server-001")
.setStatus(StatusCode.OK)
.setData(responseData)
.setTimestamp(System.currentTimeMillis())
.build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable t) {
System.err.println("Error in bidirectional stream: " + t.getMessage());
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Client has completed sending messages");
responseObserver.onCompleted();
}
private String processRequestData(String data) {
// 실제 데이터 처리 로직
return "Processed: " + data.toUpperCase();
}
};
}
}
import io.grpc.stub.StreamObserver;
public class BidirectionalStreamingClient {
private final StreamingServiceGrpc.StreamingServiceStub asyncStub;
public BidirectionalStreamingClient(ManagedChannel channel) {
this.asyncStub = StreamingServiceGrpc.newStub(channel);
}
public void startBidirectionalStreaming() {
CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<Response> responseObserver = new StreamObserver<Response>() {
@Override
public void onNext(Response response) {
// 서버로부터 응답 처리
System.out.println("Received from server: " + response.getData());
}
@Override
public void onError(Throwable t) {
System.err.println("Error in server response: " + t.getMessage());
t.printStackTrace();
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Server has completed sending messages");
finishLatch.countDown();
}
};
StreamObserver<Request> requestObserver = asyncStub.bidirectionalStreaming(responseObserver);
try {
// 여러 메시지 전송
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
System.out.println("Sending: " + message);
Request request = Request.newBuilder()
.setClientId("client-001")
.setData(message)
.setTimestamp(System.currentTimeMillis())
.build();
requestObserver.onNext(request);
// 실제 상황에서는 백프레셔 제어를 위해 지연 추가 가능
Thread.sleep(200);
}
} catch (Exception e) {
requestObserver.onError(e);
return;
}
// 전송 완료 표시
requestObserver.onCompleted();
try {
// 서버 응답 완료 대기
finishLatch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
syntax = "proto3";
package stock;
service StockService {
rpc StreamStockData(StreamRequest) returns (stream StockData);
}
message StreamRequest {
string client_id = 1;
}
message StockData {
string symbol = 1;
double price = 2;
int64 timestamp = 3;
double change_percent = 4;
int64 volume = 5;
}
public class StockServiceImpl extends StockServiceGrpc.StockServiceImplBase {
private final StockDataProvider dataProvider;
public StockServiceImpl(StockDataProvider dataProvider) {
this.dataProvider = dataProvider;
}
@Override
public void streamStockData(StreamRequest request, StreamObserver<StockData> responseObserver) {
String clientId = request.getClientId();
System.out.println("Starting stock stream for client: " + clientId);
// 주식 데이터 스트리밍 스레드
Thread streamingThread = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
// 실시간 주식 데이터 가져오기
List<StockData> stockUpdates = dataProvider.getLatestStockData();
// 모든 업데이트 전송
for (StockData stockData : stockUpdates) {
responseObserver.onNext(stockData);
}
// 1초마다 업데이트
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Stock stream interrupted for client: " + clientId);
} catch (Exception e) {
System.err.println("Error in stock stream for client " + clientId + ": " + e.getMessage());
e.printStackTrace();
// 에러 처리
responseObserver.onError(
Status.INTERNAL
.withDescription("Internal error in stock stream: " + e.getMessage())
.asRuntimeException()
);
} finally {
responseObserver.onCompleted();
}
});
// 스트리밍 스레드 시작
streamingThread.start();
}
}
백프레셔는 데이터 스트림에서 데이터 생산자가 데이터 소비자보다 빠르게 데이터를 생산할 때 발생하는 상황을 관리하는 메커니즘입니다.
이를 제대로 관리하지 않으면 메모리 부족, 성능 저하, 심지어 시스템 크래시가 발생할 수 있습니다.
public void streamStockData(StreamRequest request, StreamObserver<StockData> responseObserver) {
final CountDownLatch latch = new CountDownLatch(10); // 버퍼 용량을 10으로 설정
// 주식 데이터 스트리밍 스레드
Thread streamingThread = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
// 실시간 주식 데이터 가져오기
List<StockData> stockUpdates = dataProvider.getLatestStockData();
// 모든 업데이트 전송
for (StockData stockData : stockUpdates) {
responseObserver.onNext(stockData);
latch.countDown(); // 버퍼 카운트 감소
}
// 버퍼가 소진될 때까지 대기 (백프레셔 적용)
latch.await();
// 새 버퍼 생성
latch = new CountDownLatch(10);
// 1초마다 업데이트
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}
});
streamingThread.start();
}
public void streamStockData(StreamRequest request, ClientResponseObserver<StreamRequest, StockData> responseObserver) {
responseObserver.beforeStart(new ClientCallStreamObserver<StreamRequest>() {
private boolean isReady = false;
@Override
public void disableAutoInboundFlowControl() {
// 자동 흐름 제어 비활성화
}
@Override
public void request(int count) {
// 클라이언트가 요청한 항목 수만큼 스트리밍
streamNextBatch(count);
}
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
// 준비 상태 변경 시 핸들러 설정
}
@Override
public boolean isReady() {
return isReady;
}
private void streamNextBatch(int count) {
for (int i = 0; i < count; i++) {
if (hasMoreData()) {
StockData data = getNextStockData();
responseObserver.onNext(data);
} else {
break;
}
}
}
});
}
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.build();
// 서버 측 하트비트 구현
scheduler.scheduleAtFixedRate(() -> {
Response heartbeat = Response.newBuilder()
.setType(Response.Type.HEARTBEAT)
.setTimestamp(System.currentTimeMillis())
.build();
responseObserver.onNext(heartbeat);
}, 0, 30, TimeUnit.SECONDS);
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.enableFullStreamDecompression()
.build();
// 스트림 처리를 위한 별도 서비스
public class StreamProcessingService {
private final ExecutorService executorService;
public StreamProcessingService(int threadPoolSize) {
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
}
public void processStreamAsync(Stream<Data> dataStream, Consumer<ProcessedData> resultConsumer) {
executorService.submit(() -> {
dataStream.forEach(data -> {
ProcessedData processedData = processData(data);
resultConsumer.accept(processedData);
});
});
}
private ProcessedData processData(Data data) {
// 데이터 처리 로직
return new ProcessedData(data);
}
public void shutdown() {
executorService.shutdown();
}
}
public class ConcurrentStreamingService extends StreamingServiceImplBase {
private final ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
@Override
public StreamObserver<Request> bidirectionalStreaming(StreamObserver<Response> responseObserver) {
final AtomicBoolean completed = new AtomicBoolean(false);
return new StreamObserver<Request>() {
@Override
public void onNext(Request request) {
// 각 요청을 별도 스레드에서 병렬 처리
executorService.submit(() -> {
if (!completed.get()) {
Response response = processRequestConcurrently(request);
responseObserver.onNext(response);
}
});
}
@Override
public void onError(Throwable t) {
completed.set(true);
System.err.println("Error in stream: " + t.getMessage());
}
@Override
public void onCompleted() {
completed.set(true);
responseObserver.onCompleted();
}
};
}
private Response processRequestConcurrently(Request request) {
// 병렬 처리 로직
return Response.newBuilder()
.setData("Processed concurrently: " + request.getData())
.setTimestamp(System.currentTimeMillis())
.build();
}
}
public class MonitoredStreamingService extends StreamingServiceImplBase {
private final MetricRegistry metrics = new MetricRegistry();
private final Counter requestCounter = metrics.counter("requests.total");
private final Timer processingTimer = metrics.timer("request.processing.time");
private final Meter errorMeter = metrics.meter("stream.errors");
@Override
public void serverStreaming(Request request, StreamObserver<Response> responseObserver) {
requestCounter.inc();
final Timer.Context timerContext = processingTimer.time();
try {
// 정상 처리 로직
for (int i = 0; i < 10; i++) {
responseObserver.onNext(createResponse("Data " + i));
}
responseObserver.onCompleted();
} catch (Exception e) {
errorMeter.mark();
log.error("Error in stream processing", e);
responseObserver.onError(e);
} finally {
// 처리 시간 기록
long processingTime = timerContext.stop();
log.info("Request processed in {}ms", processingTime / 1_000_000);
}
}
}
// 서버 측 인증 추가
Server server = ServerBuilder.forPort(8080)
.addService(ServerInterceptors.intercept(
new StreamingServiceImpl(),
new AuthInterceptor())) // 인증 인터셉터
.build();
// 인증 인터셉터 구현
class AuthInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// 헤더에서 인증 토큰 추출
String authToken = headers.get(Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER));
if (authToken == null || !isValidToken(authToken)) {
call.close(Status.UNAUTHENTICATED.withDescription("Invalid or missing auth token"), new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
return next.startCall(call, headers);
}
private boolean isValidToken(String token) {
// 토큰 검증 로직
return token.startsWith("Bearer ") && validateJwt(token.substring(7));
}
}
// 서버 측 SSL 설정
Server secureServer = ServerBuilder.forPort(8443)
.useTransportSecurity(
new File("server-cert.pem"), // 서버 인증서
new File("server-key.pem") // 서버 비공개 키
)
.addService(new StreamingServiceImpl())
.build();
// 클라이언트 측 SSL 설정
ManagedChannel secureChannel = ManagedChannelBuilder.forAddress("example.com", 8443)
.useTransportSecurity() // TLS 활성화
.build();
// 요청 제한 인터셉터
class RateLimitInterceptor implements ServerInterceptor {
private final RateLimiter rateLimiter = RateLimiter.create(10.0); // 초당 10개 요청 제한
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
if (!rateLimiter.tryAcquire()) {
call.close(Status.RESOURCE_EXHAUSTED.withDescription("Rate limit exceeded"), new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
return next.startCall(call, headers);
}
}
처리 시간이 오래 걸리거나 외부 리소스에 대한 응답을 기다려야 하는 작업을 효율적으로 처리하는 방법입니다.
@Override
public void serverStreaming(Request request, StreamObserver<Response> responseObserver) {
CompletableFuture.supplyAsync(() -> {
// 외부 API 호출 또는 시간이 오래 걸리는 작업
return fetchDataFromExternalSource(request.getQuery());
})
.thenAccept(dataItems -> {
// 결과를 스트림으로 전송
for (Data item : dataItems) {
Response response = Response.newBuilder()
.setData(item.toString())
.build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
})
.exceptionally(e -> {
// 오류 처리
responseObserver.onError(Status.INTERNAL
.withDescription("Error processing request: " + e.getMessage())
.asRuntimeException());
return null;
});
}
@Override
public void serverStreaming(Request request, StreamObserver<Response> responseObserver) {
// Reactor 또는 RxJava 사용
Flux.fromIterable(getDataSource())
.flatMap(this::processItemAsync)
.map(item -> Response.newBuilder().setData(item).build())
.doOnNext(responseObserver::onNext)
.doOnComplete(responseObserver::onCompleted)
.doOnError(e -> responseObserver.onError(
Status.INTERNAL
.withDescription("Error in reactive stream: " + e.getMessage())
.asRuntimeException()
))
.subscribe();
}
private Mono<String> processItemAsync(String item) {
return Mono.fromCallable(() -> {
// 비동기 처리 로직
return processItem(item);
}).subscribeOn(Schedulers.boundedElastic());
}