gRPC 스트리밍 4가지 통신 방식부터 실전 구현까지

궁금하면 500원·2024년 7월 6일

MSA&아키텍처

목록 보기
10/46

1. gRPC 스트리밍의 이해

gRPC는 Google에서 개발한 고성능, 오픈소스 RPC(Remote Procedure Call) 프레임워크로, Protocol Buffers를 사용하여 효율적인 데이터 직렬화를 제공합니다.
gRPC 스트리밍은 클라이언트와 서버 간에 지속적인 데이터 스트림을 주고받는 강력한 메커니즘입니다.

gRPC 통신 방식

1. 단일 요청/단일 응답 (Unary RPC)

  • 가장 기본적인 gRPC 통신 방식
  • 클라이언트가 서버에 단일 요청을 보내고 서버가 단일 응답을 반환
  • REST API와 유사한 패턴
  • 간단한 CRUD 작업에 적합
rpc GetUser(UserRequest) returns (UserResponse);

2. 서버 스트리밍 RPC (Server Streaming RPC)

  • 클라이언트가 서버에 단일 요청을 보내면 서버가 연속된 데이터 스트림으로 응답
  • 대용량 데이터 전송, 실시간 업데이트 등에 적합
  • 주식 시세, 날씨 업데이트, 로그 스트리밍 등의 사용 사례
rpc StreamStockData(StreamRequest) returns (stream StockData);

3. 클라이언트 스트리밍 RPC (Client Streaming RPC)

  • 클라이언트가 서버에 연속된 데이터 스트림을 전송하고, 서버는 처리 완료 후 단일 응답을 반환
  • 파일 업로드, 센서 데이터 수집 등에 유용
  • 최종 결과만 중요한 경우에 적합
rpc UploadData(stream DataChunk) returns (UploadSummary);

4. 양방향 스트리밍 RPC (Bidirectional Streaming RPC)

  • 클라이언트와 서버가 서로 독립적으로 데이터 스트림을 양방향으로 전송
  • 실시간 채팅, 게임, 협업 애플리케이션 등에 이상적
  • 가장 유연하지만 구현과 관리가 복잡
rpc Chat(stream ChatMessage) returns (stream ChatMessage);

2. gRPC 스트리밍 구현 하기

Protocol Buffers 정의

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.grpc";
option java_outer_classname = "StreamingProto";

service StreamingService {
  // 서버 스트리밍
  rpc ServerStreaming(Request) returns (stream Response);
  
  // 클라이언트 스트리밍
  rpc ClientStreaming(stream Request) returns (Response);
  
  // 양방향 스트리밍
  rpc BidirectionalStreaming(stream Request) returns (stream Response);
}

message Request {
  string client_id = 1;
  string data = 2;
  int64 timestamp = 3;
}

message Response {
  string server_id = 1;
  string data = 2;
  int64 timestamp = 3;
  StatusCode status = 4;
}

enum StatusCode {
  OK = 0;
  ERROR = 1;
  PENDING = 2;
}

서버 스트리밍 구현 하기

import io.grpc.stub.StreamObserver;

public class StreamingServiceImpl extends StreamingServiceGrpc.StreamingServiceImplBase {
    
    @Override
    public void serverStreaming(Request request, StreamObserver<Response> responseObserver) {
        // 요청 매개변수 검증
        String clientId = request.getClientId();
        System.out.println("Received request from client: " + clientId);
        
        try {
            // 데이터 소스에서 스트리밍할 데이터 가져오기
            List<String> dataItems = dataService.getDataItems(clientId);
            
            // 각 데이터 항목에 대해 응답 스트리밍
            for (String data : dataItems) {
                Response response = Response.newBuilder()
                    .setServerId("server-001")
                    .setData(data)
                    .setTimestamp(System.currentTimeMillis())
                    .setStatus(StatusCode.OK)
                    .build();
                
                // 응답 스트리밍
                responseObserver.onNext(response);
                
                // 실제 상황에서는 백프레셔 제어를 위해 지연 추가 가능
                Thread.sleep(100);
            }
        } catch (Exception e) {
            // 오류 처리
            Response errorResponse = Response.newBuilder()
                .setServerId("server-001")
                .setStatus(StatusCode.ERROR)
                .setData("Error: " + e.getMessage())
                .setTimestamp(System.currentTimeMillis())
                .build();
            
            responseObserver.onNext(errorResponse);
        } finally {
            // 스트림 완료 표시
            responseObserver.onCompleted();
        }
    }
}

클라이언트 스트리밍 구현 하기

import io.grpc.stub.StreamObserver;

public class StreamingServiceImpl extends StreamingServiceGrpc.StreamingServiceImplBase {
    
    @Override
    public StreamObserver<Request> clientStreaming(final StreamObserver<Response> responseObserver) {
        // 클라이언트로부터 스트림을 처리하기 위한 StreamObserver 반환
        return new StreamObserver<Request>() {
            
            // 데이터 수집을 위한 내부 상태 유지
            private final List<String> receivedData = new ArrayList<>();
            private String clientId = "";
            
            @Override
            public void onNext(Request request) {
                // 각 요청 처리
                clientId = request.getClientId();
                receivedData.add(request.getData());
                System.out.println("Received data chunk from " + clientId + ": " + request.getData());
            }
            
            @Override
            public void onError(Throwable t) {
                // 오류 처리
                System.err.println("Error in client stream: " + t.getMessage());
                t.printStackTrace();
                
                // 오류 응답 전송
                responseObserver.onNext(Response.newBuilder()
                    .setServerId("server-001")
                    .setStatus(StatusCode.ERROR)
                    .setData("Stream error: " + t.getMessage())
                    .setTimestamp(System.currentTimeMillis())
                    .build());
                responseObserver.onCompleted();
            }
            
            @Override
            public void onCompleted() {
                // 모든 데이터 수신 완료 시 처리
                System.out.println("Client stream completed. Processing " + receivedData.size() + " items");
                
                // 모든 데이터 처리 후 단일 응답 반환
                String processedResult = processData(receivedData);
                
                responseObserver.onNext(Response.newBuilder()
                    .setServerId("server-001")
                    .setStatus(StatusCode.OK)
                    .setData("Processed " + receivedData.size() + " items: " + processedResult)
                    .setTimestamp(System.currentTimeMillis())
                    .build());
                
                // 응답 완료 표시
                responseObserver.onCompleted();
            }
            
            private String processData(List<String> data) {
                // 실제 데이터 처리 로직
                return data.stream().reduce("", (a, b) -> a + b.length() + ",");
            }
        };
    }
}

양방향 스트리밍 구현 예제

서버 측

import io.grpc.stub.StreamObserver;

public class StreamingServiceImpl extends StreamingServiceGrpc.StreamingServiceImplBase {
    
    @Override
    public StreamObserver<Request> bidirectionalStreaming(StreamObserver<Response> responseObserver) {
        return new StreamObserver<Request>() {
            
            @Override
            public void onNext(Request request) {
                // 클라이언트 요청 처리
                String clientId = request.getClientId();
                String receivedData = request.getData();
                
                System.out.println("Received from " + clientId + ": " + receivedData);
                
                // 요청에 대한 즉각적인 응답 생성
                String responseData = processRequestData(receivedData);
                
                // 응답 스트리밍
                Response response = Response.newBuilder()
                    .setServerId("server-001")
                    .setStatus(StatusCode.OK)
                    .setData(responseData)
                    .setTimestamp(System.currentTimeMillis())
                    .build();
                
                responseObserver.onNext(response);
            }
            
            @Override
            public void onError(Throwable t) {
                System.err.println("Error in bidirectional stream: " + t.getMessage());
                t.printStackTrace();
            }
            
            @Override
            public void onCompleted() {
                System.out.println("Client has completed sending messages");
                responseObserver.onCompleted();
            }
            
            private String processRequestData(String data) {
                // 실제 데이터 처리 로직
                return "Processed: " + data.toUpperCase();
            }
        };
    }
}

클라이언트 측

import io.grpc.stub.StreamObserver;

public class BidirectionalStreamingClient {
    
    private final StreamingServiceGrpc.StreamingServiceStub asyncStub;
    
    public BidirectionalStreamingClient(ManagedChannel channel) {
        this.asyncStub = StreamingServiceGrpc.newStub(channel);
    }
    
    public void startBidirectionalStreaming() {
        CountDownLatch finishLatch = new CountDownLatch(1);
        
        StreamObserver<Response> responseObserver = new StreamObserver<Response>() {
            
            @Override
            public void onNext(Response response) {
                // 서버로부터 응답 처리
                System.out.println("Received from server: " + response.getData());
            }
            
            @Override
            public void onError(Throwable t) {
                System.err.println("Error in server response: " + t.getMessage());
                t.printStackTrace();
                finishLatch.countDown();
            }
            
            @Override
            public void onCompleted() {
                System.out.println("Server has completed sending messages");
                finishLatch.countDown();
            }
        };
        
        StreamObserver<Request> requestObserver = asyncStub.bidirectionalStreaming(responseObserver);
        
        try {
            // 여러 메시지 전송
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                System.out.println("Sending: " + message);
                
                Request request = Request.newBuilder()
                    .setClientId("client-001")
                    .setData(message)
                    .setTimestamp(System.currentTimeMillis())
                    .build();
                
                requestObserver.onNext(request);
                
                // 실제 상황에서는 백프레셔 제어를 위해 지연 추가 가능
                Thread.sleep(200);
            }
        } catch (Exception e) {
            requestObserver.onError(e);
            return;
        }
        
        // 전송 완료 표시
        requestObserver.onCompleted();
        
        try {
            // 서버 응답 완료 대기
            finishLatch.await(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3. 주식 시장 데이터 스트리밍

Protocol Buffers 정의

syntax = "proto3";

package stock;

service StockService {
  rpc StreamStockData(StreamRequest) returns (stream StockData);
}

message StreamRequest {
  string client_id = 1;
}

message StockData {
  string symbol = 1;
  double price = 2;
  int64 timestamp = 3;
  double change_percent = 4;
  int64 volume = 5;
}

서버 구현

public class StockServiceImpl extends StockServiceGrpc.StockServiceImplBase {
    
    private final StockDataProvider dataProvider;
    
    public StockServiceImpl(StockDataProvider dataProvider) {
        this.dataProvider = dataProvider;
    }
    
    @Override
    public void streamStockData(StreamRequest request, StreamObserver<StockData> responseObserver) {
        String clientId = request.getClientId();
        System.out.println("Starting stock stream for client: " + clientId);
        
        // 주식 데이터 스트리밍 스레드
        Thread streamingThread = new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    // 실시간 주식 데이터 가져오기
                    List<StockData> stockUpdates = dataProvider.getLatestStockData();
                    
                    // 모든 업데이트 전송
                    for (StockData stockData : stockUpdates) {
                        responseObserver.onNext(stockData);
                    }
                    
                    // 1초마다 업데이트
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Stock stream interrupted for client: " + clientId);
            } catch (Exception e) {
                System.err.println("Error in stock stream for client " + clientId + ": " + e.getMessage());
                e.printStackTrace();
                
                // 에러 처리
                responseObserver.onError(
                    Status.INTERNAL
                        .withDescription("Internal error in stock stream: " + e.getMessage())
                        .asRuntimeException()
                );
            } finally {
                responseObserver.onCompleted();
            }
        });
        
        // 스트리밍 스레드 시작
        streamingThread.start();
    }
}

백프레셔(Backpressure) 관리

백프레셔는 데이터 스트림에서 데이터 생산자가 데이터 소비자보다 빠르게 데이터를 생산할 때 발생하는 상황을 관리하는 메커니즘입니다.
이를 제대로 관리하지 않으면 메모리 부족, 성능 저하, 심지어 시스템 크래시가 발생할 수 있습니다.

CountDownLatch를 사용한 백프레셔 구현

public void streamStockData(StreamRequest request, StreamObserver<StockData> responseObserver) {
    final CountDownLatch latch = new CountDownLatch(10); // 버퍼 용량을 10으로 설정
    
    // 주식 데이터 스트리밍 스레드
    Thread streamingThread = new Thread(() -> {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                // 실시간 주식 데이터 가져오기
                List<StockData> stockUpdates = dataProvider.getLatestStockData();
                
                // 모든 업데이트 전송
                for (StockData stockData : stockUpdates) {
                    responseObserver.onNext(stockData);
                    latch.countDown(); // 버퍼 카운트 감소
                }
                
                // 버퍼가 소진될 때까지 대기 (백프레셔 적용)
                latch.await();
                
                // 새 버퍼 생성
                latch = new CountDownLatch(10);
                
                // 1초마다 업데이트
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            responseObserver.onError(e);
        } finally {
            responseObserver.onCompleted();
        }
    });
    
    streamingThread.start();
}

FlowControlWindow를 사용한 백프레셔 구현

public void streamStockData(StreamRequest request, ClientResponseObserver<StreamRequest, StockData> responseObserver) {
    responseObserver.beforeStart(new ClientCallStreamObserver<StreamRequest>() {
        private boolean isReady = false;
        
        @Override
        public void disableAutoInboundFlowControl() {
            // 자동 흐름 제어 비활성화
        }
        
        @Override
        public void request(int count) {
            // 클라이언트가 요청한 항목 수만큼 스트리밍
            streamNextBatch(count);
        }
        
        @Override
        public void setOnReadyHandler(Runnable onReadyHandler) {
            // 준비 상태 변경 시 핸들러 설정
        }
        
        @Override
        public boolean isReady() {
            return isReady;
        }
        
        private void streamNextBatch(int count) {
            for (int i = 0; i < count; i++) {
                if (hasMoreData()) {
                    StockData data = getNextStockData();
                    responseObserver.onNext(data);
                } else {
                    break;
                }
            }
        }
    });
}

4. gRPC 스트리밍 모범 사례

스트림 관리 전략

  • 타임아웃 설정: 클라이언트와 서버 양쪽에서 적절한 타임아웃 설정
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
    .usePlaintext()
    .keepAliveTime(30, TimeUnit.SECONDS)
    .keepAliveTimeout(10, TimeUnit.SECONDS)
    .build();
  • 하트비트 메커니즘: 장기 실행 스트림에서 연결 상태 확인
// 서버 측 하트비트 구현
scheduler.scheduleAtFixedRate(() -> {
    Response heartbeat = Response.newBuilder()
        .setType(Response.Type.HEARTBEAT)
        .setTimestamp(System.currentTimeMillis())
        .build();
    responseObserver.onNext(heartbeat);
}, 0, 30, TimeUnit.SECONDS);

메시지 크기 최적화

  • 필드 최적화: 불필요한 필드 제거, 필수 필드만 포함
  • 중첩된 메시지 대신 참조 사용: 반복되는 구조에는 중첩 대신 참조 ID 사용
  • 압축 활용: 대용량 데이터 전송 시 압축 활성화
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
    .usePlaintext()
    .enableFullStreamDecompression()
    .build();

스트림 처리 로직 분리

// 스트림 처리를 위한 별도 서비스
public class StreamProcessingService {
    
    private final ExecutorService executorService;
    
    public StreamProcessingService(int threadPoolSize) {
        this.executorService = Executors.newFixedThreadPool(threadPoolSize);
    }
    
    public void processStreamAsync(Stream<Data> dataStream, Consumer<ProcessedData> resultConsumer) {
        executorService.submit(() -> {
            dataStream.forEach(data -> {
                ProcessedData processedData = processData(data);
                resultConsumer.accept(processedData);
            });
        });
    }
    
    private ProcessedData processData(Data data) {
        // 데이터 처리 로직
        return new ProcessedData(data);
    }
    
    public void shutdown() {
        executorService.shutdown();
    }
}

동시성과 병렬 처리

public class ConcurrentStreamingService extends StreamingServiceImplBase {
    
    private final ExecutorService executorService = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors()
    );
    
    @Override
    public StreamObserver<Request> bidirectionalStreaming(StreamObserver<Response> responseObserver) {
        final AtomicBoolean completed = new AtomicBoolean(false);
        
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request request) {
                // 각 요청을 별도 스레드에서 병렬 처리
                executorService.submit(() -> {
                    if (!completed.get()) {
                        Response response = processRequestConcurrently(request);
                        responseObserver.onNext(response);
                    }
                });
            }
            
            @Override
            public void onError(Throwable t) {
                completed.set(true);
                System.err.println("Error in stream: " + t.getMessage());
            }
            
            @Override
            public void onCompleted() {
                completed.set(true);
                responseObserver.onCompleted();
            }
        };
    }
    
    private Response processRequestConcurrently(Request request) {
        // 병렬 처리 로직
        return Response.newBuilder()
            .setData("Processed concurrently: " + request.getData())
            .setTimestamp(System.currentTimeMillis())
            .build();
    }
}

오류 및 상태 모니터링

public class MonitoredStreamingService extends StreamingServiceImplBase {
    
    private final MetricRegistry metrics = new MetricRegistry();
    private final Counter requestCounter = metrics.counter("requests.total");
    private final Timer processingTimer = metrics.timer("request.processing.time");
    private final Meter errorMeter = metrics.meter("stream.errors");
    
    @Override
    public void serverStreaming(Request request, StreamObserver<Response> responseObserver) {
        requestCounter.inc();
        
        final Timer.Context timerContext = processingTimer.time();
        
        try {
            // 정상 처리 로직
            for (int i = 0; i < 10; i++) {
                responseObserver.onNext(createResponse("Data " + i));
            }
            responseObserver.onCompleted();
        } catch (Exception e) {
            errorMeter.mark();
            log.error("Error in stream processing", e);
            responseObserver.onError(e);
        } finally {
            // 처리 시간 기록
            long processingTime = timerContext.stop();
            log.info("Request processed in {}ms", processingTime / 1_000_000);
        }
    }
}

5. 보안 고려사항

인증 및 권한 부여

// 서버 측 인증 추가
Server server = ServerBuilder.forPort(8080)
    .addService(ServerInterceptors.intercept(
        new StreamingServiceImpl(),
        new AuthInterceptor()))  // 인증 인터셉터
    .build();

// 인증 인터셉터 구현
class AuthInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call,
            Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {
        
        // 헤더에서 인증 토큰 추출
        String authToken = headers.get(Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER));
        
        if (authToken == null || !isValidToken(authToken)) {
            call.close(Status.UNAUTHENTICATED.withDescription("Invalid or missing auth token"), new Metadata());
            return new ServerCall.Listener<ReqT>() {};
        }
        
        return next.startCall(call, headers);
    }
    
    private boolean isValidToken(String token) {
        // 토큰 검증 로직
        return token.startsWith("Bearer ") && validateJwt(token.substring(7));
    }
}

TLS/SSL 암호화

// 서버 측 SSL 설정
Server secureServer = ServerBuilder.forPort(8443)
    .useTransportSecurity(
        new File("server-cert.pem"),  // 서버 인증서
        new File("server-key.pem")    // 서버 비공개 키
    )
    .addService(new StreamingServiceImpl())
    .build();

// 클라이언트 측 SSL 설정
ManagedChannel secureChannel = ManagedChannelBuilder.forAddress("example.com", 8443)
    .useTransportSecurity()  // TLS 활성화
    .build();

요청 제한 및 속도 제한

// 요청 제한 인터셉터
class RateLimitInterceptor implements ServerInterceptor {
    
    private final RateLimiter rateLimiter = RateLimiter.create(10.0);  // 초당 10개 요청 제한
    
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
            ServerCall<ReqT, RespT> call,
            Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {
        
        if (!rateLimiter.tryAcquire()) {
            call.close(Status.RESOURCE_EXHAUSTED.withDescription("Rate limit exceeded"), new Metadata());
            return new ServerCall.Listener<ReqT>() {};
        }
        
        return next.startCall(call, headers);
    }
}

6. 비동기 작업 처리

처리 시간이 오래 걸리거나 외부 리소스에 대한 응답을 기다려야 하는 작업을 효율적으로 처리하는 방법입니다.

CompletableFuture를 사용한 비동기 처리

@Override
public void serverStreaming(Request request, StreamObserver<Response> responseObserver) {
    CompletableFuture.supplyAsync(() -> {
        // 외부 API 호출 또는 시간이 오래 걸리는 작업
        return fetchDataFromExternalSource(request.getQuery());
    })
    .thenAccept(dataItems -> {
        // 결과를 스트림으로 전송
        for (Data item : dataItems) {
            Response response = Response.newBuilder()
                .setData(item.toString())
                .build();
            responseObserver.onNext(response);
        }
        responseObserver.onCompleted();
    })
    .exceptionally(e -> {
        // 오류 처리
        responseObserver.onError(Status.INTERNAL
            .withDescription("Error processing request: " + e.getMessage())
            .asRuntimeException());
        return null;
    });
}

리액티브 스트림과 결합

@Override
public void serverStreaming(Request request, StreamObserver<Response> responseObserver) {
    // Reactor 또는 RxJava 사용
    Flux.fromIterable(getDataSource())
        .flatMap(this::processItemAsync)
        .map(item -> Response.newBuilder().setData(item).build())
        .doOnNext(responseObserver::onNext)
        .doOnComplete(responseObserver::onCompleted)
        .doOnError(e -> responseObserver.onError(
            Status.INTERNAL
                .withDescription("Error in reactive stream: " + e.getMessage())
                .asRuntimeException()
        ))
        .subscribe();
}

private Mono<String> processItemAsync(String item) {
    return Mono.fromCallable(() -> {
        // 비동기 처리 로직
        return processItem(item);
    }).subscribeOn(Schedulers.boundedElastic());
}

7. 정리와 요약

gRPC 스트리밍의 주요 장점

  • 효율성: Protocol Buffers를 통한 바이너리 직렬화로 더 작은 메시지 크기
  • 양방향 통신: 다양한 통신 패턴 지원 (단방향, 서버 스트리밍, 클라이언트 스트리밍, 양방향 스트리밍)
  • 강력한 타입 시스템: 컴파일 타임에 타입 검사 지원
  • 다중 언어 지원: 다양한 프로그래밍 언어로 자동 코드 생성
  • HTTP/2 기반: 멀티플렉싱, 헤더 압축 등 HTTP/2의 모든 이점 활용

스트리밍 구현 시 주의사항

  • 리소스 관리: 장기 실행 스트림에서 메모리와 연결 관리에 주의
  • 오류 처리: 다양한 네트워크 및 시스템
profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글