공식문서 gRPC 자바 기본 튜토리얼

Jake Seo·2021년 1월 26일
1

네트워크

목록 보기
10/16

공식문서 gRPC 자바 기본 튜토리얼

Prologue

이 글은 gRPC 공식문서에 있는 글을 기반으로 따라해보며 작성된 글입니다.

소개

이 튜토리얼에서는 자바 프로그래머가 gRPC로 어떻게 작업을 해야 하는지에 대한 소개를 합니다.

여러분은 다음과 같은 것을 배울 수 있습니다.

  • .proto 파일에 서비스 정의하기
  • 프로토콜 버퍼 컴파일러를 이용한 서버와 클라이언트 코드 생성
  • gRPC API를 이용하여 서비스를 위한 간단한 클라이언트와 서버 코드를 작성하는 방법

위 내용은 모두 여러분이 gRPC 입문프로토콜 버퍼에 관련된 글을 읽었다는 가정 하에 진행됩니다.

이 예제는 proto3 버전을 이용하여 진행됩니다.

왜 gRPC를 사용할까요?

이 예제는 클라이언트가 경로의 특징에 대한 정보를 갖고, 경로의 요약을 만들고, 교통정보 업데이트와 같은 경로 정보를 서버와 클라이언트를 통해 교환하는 간단한 경로 매핑 애플리케이션입니다.

gRPC를 이용하여 우리는 .proto 파일에 우리의 서비스를 정의할 수 있고 어떤 언어로든 클라이언트와 서버 코드를 생성할 수 있습니다. 이러한 클라이언트와 서버는 아주 커다란 데이터 센터에 있는 서버에서부터 아주 작은 태블릿 PC와 같은 환경에서도 돌아갈 수 있습니다. 언어와 환경 사이에 존재하는 통신의 모든 복잡함은 gRPC에 의해 처리됩니다. 또한 우리는 프로토콜 버퍼가 제공하는 효율적인 직렬화, 간단한 IDL(Interface Definition Language), 손쉬운 인터페이스 업데이트의 이점을 이용합니다.

예제 코드와 설정

git clone -b v1.35.0 https://github.com/grpc/grpc-java.git

위의 명령어를 입력한 뒤에 해당 리포지토리 내부에 있는 examples 디렉토리로 이동하면 그 내부에 보일러 플레이트 코드가 있습니다.

examples 디렉토리의 프로젝트를 이용하여 아래의 작업을 진행하면 됩니다.

서비스 정의하기

첫번째 단계는 프로토콜 버퍼를 사용하여 gRPC 서비스를 정의하는 일입니다. requestresponse 타입을 정의합니다.

먼저, .proto 파일에 java_package 부터 명시합니다.

위는 우리가 생성된 자바 클래스들을 사용하고 싶은 패키지를 명시합니다. 만일, 명시적인 java_package 옵션이 존재하지 않는다면, proto 패키지를 기본 값으로 사용할 것입니다. 하지만, proto 패키지들이 역전된 도메인 이름으로 시작하지 않는다고 예측하기 때문에, proto 패키지들은 일반적으로 좋은 위치에 자바 패키지들을 만들지 않습니다. 만일 우리가 Java 말고 다른 언어를 사용한다면, java_package 옵션은 아무런 효력도 발휘하지 않을 것입니다.

서비스를 정의하고 싶을 때는 .proto 파일에 service라는 키워드를 사용하여 만들 수 있습니다.

service RouteGuide {
...
}

이후에 우리는 서비스 내부에 rpc 메소드를 정의합니다. 요청과 응답 타입을 명시해줍니다. gRPC는 4가지 종류의 서비스 메소드를 정의할 수 있게 해줍니다. RouteGuide 서비스에서는 이 4가지를 모두 다뤄봅니다.

Simple RPC

클라이언트가 stub을 이용하여 요청을 서버로 보내고 응답이 올 때까지 기다리는 일반적인 방식입니다. 일반적인 함수호출과 비슷합니다.

// Obtains the feature at a given position
rpc GetFeature(Point) returns (Feature) {}

Server-side Streaming RPC

클라이언트가 서버로 요청을 보내고, 메세지의 시퀀스를 읽을 수 있는 stream을 받아옵니다. 클라이언트는 반환된 스트림으로 부터 시퀀스가 끝날 때까지 (더 이상 메세지가 존재하지 않을 때까지) 응답을 읽습니다. 예제에서 볼 수 있듯이, Serverside Streaming Methodstream 키워드를 응답 타입 앞에 붙임으로써 명시할 수 있습니다.

// Obtains the Feature available within the given Rectangle.
// Results are streamed rather than returned at once
// e.g. in a response message with a repeated field
// as the rectangle may cover a large area and contain a huge number of feature
rpc ListFeatures(Rectangle) returns (stream Feature) {}

Client-side Streaming RPC

클라이언트가 메세지의 시퀀스를 작성하고 서버로 보냅니다. 이번에도 역시 stream을 이용하여 제공합니다. 클라이언트가 메세지를 작성하는 작업을 끝내면, 서버가 그것들을 모두 읽을 때까지 기다리고 응답을 반환합니다. stream 키워드를 요청 타입 전에 위치시킴으로써 Clientside Streaming Method라는 것을 명시할 수 있습니다.

// Accepts a stream of Points on a route being traversed
// returning a Route Summary when traversal is completed
rpc RecordRoute(stream Point) returns (RouteSummary) {}

Bidirectional Streaming RPC

양쪽 사이드에서 메세지의 시퀀스를 보냅니다. read-write stream을 사용합니다. 두 개의 stream이 독립적으로 운영됩니다. 그래서 클라이언트와 서버가 순서에 상관없이 읽거나 쓸 수 있습니다. 이를테면 서버가 응답을 하기 전에 클라이언트로부터 온 메세지를 모두 수신할 때까지 기다릴 수도 있습니다. 또는 메세지를 한번 읽고 한번 쓸 수도 있고 또는 다양한 조합들에 의해 이러한 통신이 이루어질 수도 있습니다. 각 stream에 대한 메세지의 순서는 정해져있습니다. 메소드의 Bidirectional Steaming RPC 타입은 응답과 요청 타입 앞에 stream을 위치시켜줌으로써 명시 가능합니다.

// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

.proto 파일에는 또한 proto buffer message 타입 정의를 포함합니다. 서비스의 모든 요청과 응답에서 사용됩니다. 다음은 Point 메세지의 예시입니다.

// Points are represented as latitude-longitude paris in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees 
// and longitude should be in the range +/- 180 degrees (inclusive)
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

클라이언트와 서버 코드 생성하기

gRPC 클라이언트와 서버 인터페이스를 .proto 파일의 서비스 정의로부터 생성해봅시다. 프로토콜 버퍼 컴파일러인 protoc와 특별한 gRPC 자바 플러그인을 이용하여 gRPC 클라이언트와 서버 인터페이스를 생성해볼 것입니다. gRPC 서비스를 생성하기 위해서는 proto3를 사용해야 합니다.

Gradle과 Maven을 사용할 때, protoc 빌드 플러그인은 빌드의 필수적인 부분의 코드를 생성해낼 수 있습니다. .proto 파일로부터 어떻게 코드를 생성해내는지에 대해서는 gprc-java README를 참조하면 됩니다.

서비스 정의에서 아래의 자바 파일들이 생성될 것입니다.

  • Feature.java, Point.java, Rectangle.java 그리고 프로토콜 버퍼 코드를 만들어내고, 직렬화하고, 우리 요청과 응답 메세지 타입을 가져올 수 있게 하는 것을 포함한 다른 것들도 만들어낼 것입니다.
  • 다른 유용한 코드들을 포함한 RouteGuideGrpc.java도 있습니다.
    • 내부에 RouteGuide 서버의 구현을 위한 베이스 클래스, RouteGuideGrpc.RouteGuideImplBase, RouteGuide 서비스 내부에 정의된 모든 메소드들도 같이 만듭니다.
    • 클라이언트가 RouteGuide 서버와 통신하기 위한 stub 클래스들도 만들어냅니다.

서버 만들기

RouteGuide 서버를 어떻게 만드는지에 대해 먼저 살펴봅시다. gRPC 클라이언트를 만드는데만 관심이 있다면, 이 섹션을 넘어가고 클라이언트 만들기 섹션으로 가도 됩니다.

RouteGuide 서비스가 자기 일을 하게 만드는데 필요한 부분은 크게 두가지가 존재합니다.

  • 서비스 정의에서 생성된 service base class를 오버라이딩하여 진짜 "작업"을 하게 합니다.
  • 클라이언트로부터 요청을 받고, 서비스 응답을 반환하는 gRPC 서버를 운영합니다.

RouteGuide 서버를 예제 에서 찾아볼 수 있습니다. grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java 어떻게 동작하는지에 대해 좀 더 자세히 알아봅시다.

RouteGuide 구현하기

아래 보이듯이, 우리 서버는 생성된 RouteGuideGrpc.RouteGuideImplBase 추상 클래스를 상속한 RouteGuideService 클래스를 갖고 있습니다.

private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
  ...
}

위는 실제로 작성된 코드의 일부입니다.

간단한(Simple) RPC

RouteGuideService는 우리 모든 서비스 메소드를 구현합니다. 가장 간단한 메소드를 먼저 봅시다. Getfeature()는 클라이언트로부터 Point를 가져오고, Feature에 있는 데이터베이스로부터 그에 상응하는 feature 정보를 반환해줍니다.

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  responseObserver.onNext(checkFeature(request));
  responseObserver.onCompleted();
}

...

private Feature checkFeature(Point location) {
  for (Feature feature : features) {
    if (feature.getLocation().getLatitude() == location.getLatitude()
        && feature.getLocation().getLongitude() == location.getLongitude()) {
      return feature;
    }
  }

  // No feature was found, return an unnamed feature.
  return Feature.newBuilder().setName("").setLocation(location).build();
}

위는 실제 코드입니다.

getFeature() 메소드는 두 개의 파라미터를 받습니다.

  • Point: 요청
  • StreamObserver<Feature>: 응답 옵저버로 서버가 서버의 응답과 호출을 하는데 이용되는 특별한 인터페이스입니다.

클라이언트로 응답을 반환하고 호출을 완료하기 위해서는 아래와 같은 과정이 필요합니다.

  1. 클라이언트에게 반환하기 위해 서비스에 정의된 것처럼 Feature 응답 오브젝트를 생성합니다. 이 예제에서는 private으로 선언된checkFeature() 메소드에서 각각 진행됩니다.
  2. 응답 옵저버의 onNext() 메소드를 사용하여 Feature를 반환합니다.
  3. 응답 옵저버의 onCompleted() 메소드를 사용하여 RPC 통신이 끝났음을 명시해줍니다.

Server-side Streaming RPC

ListFeature는 Server-side Streaming RPC입니다. 이를 이용하여 여러 개의 Feature를 클라이언트로 보내봅시다.

private final Collection<Feature> features;

...

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

실제 코드

간단한(simple) RPC와 같이, 이 메소드는 요청 오브젝트(클라이언트가 Feature들을 찾길 원하는 Rectangle)를 받습니다. 그리고 StreamObserver라는 응답 옵저버도 받습니다.

이 시간에, 클라이언트로 반환하길 원하는 만큼의 많은 Feature를 가질 것입니다. (이번엔, Feature들이 요청 Rectangle 내부에 있는지 아닌지를 기준으로 하여 서비스의 Feature 콜렉션으로부터 그것들을 선택할 것입니다.) 그리고 onNext() 메소드를 이용하여 차례로 응답 옵저버에 작성할 것입니다. 마지막으로, 간단한(simple) RPC 내부에서처럼, gRPC에게 응답을 작성하는 것을 끝마쳤다고 알리기 위해 응답 옵저버의 onCompleted() 메소드를 사용할 것입니다.

Client-side Streaming RPC

이제 조금 더 복잡한 예제를 살펴봅시다. Client-side Streaming 메소드인 RecordRoute()입니다. 클라이언트로부터 Point들의 스트림을 받아와서 이동 경로에 대한 정보를 품고 있는 단일 RouteSummary를 반환합니다.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

실제 소스코드

위의 소스코드에서 볼 수 있듯, 이전 메소드 타입들처럼 우리 메소드는 StreamObserver 응답 옵저버 파라미터를 갖지만, 이번에는 클라이언트가 Point들을 작성할 수 있게 StreamObserver를 반환합니다.

이 메소드의 바디 부분에서, 반환을 위해 익명의 StreamObserver를 인스턴스화 합니다. 그 내부 내용에서 일어나는 일은 다음과 같습니다.

  • Feature와 클라이언트가 메세지 스트림으로 Point를 작성한 정보를 얻기 위해 onNext() 메소드를 오버라이드합니다.
  • RouteSummary를 빌드하기 위해 클라이언트가 메세지 작성을 끝마쳤다는 것을 알릴 때 호출되는 onCompleted() 메소드를 오버라이드합니다. 메소드의 응답 옵저버의 onNext()RouteSummary와 함께 호출합니다. 그리고 이후에 onCompleted() 메소드를 호출하여 서버사이드로부터 호출을 끝냅니다.

Bidirectional Streaming RPC

마지막으로 양방향 스트리밍 RPC인 RouteChat()을 알아봅시다.

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

실제 소스 코드

Client-side Streaming 예제에서와 같이, 응답 옵저버인 StreamObserver 를 받고 반환해줍니다. 한가지 예외는 이번에는 클라이언트가 메세지 스트림으로 메세지를 작성하는 도중에 응답 옵저버를 통해서 값을 반환해준다는 점입니다. 읽기 쓰기를 위한 문법은 정확히 Client-side StreamingServer-side Streaming 메소드들과 일치합니다. 각 사이드에서는 쓰여진 순서대로 서로의 메세지를 갖음에도 불구하고, 클라이언트와 서버 모두 어떤 순서로든 읽고 쓸 수 있습니다. 스트림들은 완전히 독립적으로 운영됩니다.

서버 시작하기

모든 메소드를 구현했다면, gRPC 서버를 시작해야 클라이언트가 실제로 우리 서비스를 사용할 수 있습니다. 아래의 짧은 코드는 우리가 어떻게 RouteGuide 서비스를 실행하는지에 대한 코드입니다.

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
...
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
 ...
}

위에서 볼 수 있듯, 빌드하고 ServerBuilder를 이용하여 서버를 시작합니다.

서버를 실행하기 위해서는

  1. 우리가 클라이언트의 요청을 듣기 위한 주소와 포트를 명시해야 합니다. ServerBuilderforPort() 메소드를 이용합니다.
  2. 서비스 구현 클래스 RouteGuideService의 인스턴스를 생성하고 ServerBuilderaddService() 메소드로 넘깁니다.
  3. 빌더가 우리 서비스를 제공할 RPC 서버를 만들고 시작할 수 있도록 build()start()를 호출합니다.

클라이언트 만들기

이 섹션에서는 RouteGuide 서비스를 위한 클라이언트를 만드는 방법에 대해 알아봅시다. grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java에서 예제 클라이언트 코드를 살펴볼 수 있습니다.

stub 인스턴스화하기

서비스 메소드를 호출하기 위해서, 처음에 할 일은 stub을 만드는 것입니다.

  • 블록킹/동기 (blocking/synchronous) stub: RPC 호출이 서버가 응답을 할 때까지 기다렸다가 응답을 반환하거나 예외를 던지는 것을 말합니다.
  • 논블록킹/비동기 (non-blocking/asynchronous) stub: 서버로 논블록킹 호출을 하고 응답이 비동기적으로 반환됩니다. 오직 비동기 스텁을 사용해서만 스트리밍 호출의 특정한 타입을 만들 수 있습니다.

먼저, gRPC channel 을 만들어봅시다. channel은 우리가 연결할 서버의 주소와 포트를 명시합니다.

public RouteGuideClient(String host, int port) {
  this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}

/** Construct client for accessing RouteGuide server using the existing channel. */
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
  channel = channelBuilder.build();
  blockingStub = RouteGuideGrpc.newBlockingStub(channel);
  asyncStub = RouteGuideGrpc.newStub(channel);
}

ManagedChannelBuilder를 이용해 channel을 만들 수 있습니다.

이제 스텁을 만들기 위해서 channel을 사용할 수 있습니다. .proto 파일로부터 생성된 클래스인 RouteGuideGrpc 내부에 제공된 메소드인 newStubnewBlockingStub을 이용하여 스텁을 만들면 됩니다.

blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);

서비스 메소드 호출하기

서비스 메소드를 어떻게 호출할 수 있는지 알아봅시다.

간단한(Simple) RPC

간단한(Simple) RPC인 GetFeature를 호출하는 것은 로컬 메소드를 호출하는 것처럼 직관적입니다.

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature;
try {
  feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

요청 프로토콜 버퍼 오브젝트(이 경우엔 Point)를 만들고, 블록킹 스텁에 있는 getFeature() 메소드로 넘긴 후에 Feature를 결과로 받아옵니다.

만일 에러가 발생한다면, StatusRuntimeException에서 얻을 수 있는 Status로 인코딩 됩니다.

Server-side Streaming RPC

다음으로, Server-side Streaming을 살펴봅시다. ListFeatures은 지리학적인 Feature들의 스트림을 반환합니다.

Rectangle request =
    Rectangle.newBuilder()
        .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

위에서 볼 수 있듯 간단한(Simple) RPC 때와 매우 비슷한 코드입니다. 단, 차이점은 단일 Feature를 반환한다는 것 밖에 없습니다. 이 메소드는 반환된 Feature들을 모두 읽을 때 사용될 수 있는 Iterator를 반환합니다.

Client-side Streaming RPC

이건 이전보다 조금 더 복잡합니다. Client-side Streaming 메소드 RecordRoutePoint의 스트림을 서버로 보내고 단일 RouteSummary를 받습니다. 이 메소드를 사용하려면, 비동기 stub이 필요합니다. Creating Server 섹션에서 경험해봤다면 어렵지 않게 느껴질 것입니다. 비동기 스트리밍 RPC는 서버나 클라이언트나 비슷한 방법으로 구현됩니다.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

위 소스코드에서 볼 수 있듯, 이 메소드를 호출하기 위해서 우리는서버가 RouteSummary 응답과 함께 호출할 수 있도록 특별한 인터페이스를 구현하는 StreamObserver라는 것을 만들어야 합니다. StreamObserver 내부에서 우리는

  • onNext() 메소드를 오버라이드하여 서버가 RouteSummary를 메세지 스트림으로 보낼 때, 반환된 정보를 출력하게 만들 것입니다.
  • 서버가 호출을 끝냈을 때 호출되는 onCompleted() 메소드를 오버라이드하여 서버가 작성을 완료했는지 알아볼 수 있게 해주는 CountDownLatch를 줄일 것입니다.

우리는 그 이후에 StreamObserver를 비동기 스텁의 recordRoute() 메소드로 넘기고, StreamObserver 요청 옵저버를 다시 돌려받아 Point들을 다시 작성하여 서버로 보냅니다. Point들을 작성하는 것을 끝마치면, 우리는 요청 옵저버의 onCompleted() 메소드를 이용하여 gRPC에 클라이언트쪽 작성 작업이 끝났다는 것을 알립니다. 작업이 모두 끝나면, 이쪽에서 서버가 완료되었다는 것을 확인하기 위해 CountDownLatch를 확인합니다.

Bidirectional Streaming RPC

Bidirectional Streaming RPCRouteChat()을 살펴봅시다.

public void routeChat() throws Exception {
  info("*** RoutChat");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteNote> requestObserver =
      asyncStub.routeChat(new StreamObserver<RouteNote>() {
        @Override
        public void onNext(RouteNote note) {
          info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
              .getLatitude(), note.getLocation().getLongitude());
        }

        @Override
        public void onError(Throwable t) {
          Status status = Status.fromThrowable(t);
          logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
          finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
          info("Finished RouteChat");
          finishLatch.countDown();
        }
      });

  try {
    RouteNote[] requests =
        {newNote("First message", 0, 0), newNote("Second message", 0, 1),
            newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

    for (RouteNote request : requests) {
      info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
          .getLatitude(), request.getLocation().getLongitude());
      requestObserver.onNext(request);
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

Client-side Streaming RPC 예제처럼, 둘 다 응답 옵저버인 StreamObserver를 받고 반환합니다. 이번에 달라진 점은 서버가 여전히 메세지를 작성하고 있는 동안, 메소드의 응답 옵저버를 통해서 그들의 메세지 스트림에 값을 보냅니다. Client-side Streaming 메소드와 읽고 쓰는 문법 자체는 정확히 똑같습니다. 서버와 클라이언트는 메세지가 쓰여진 순서대로 각자의 메세지들을 받게 됩니다. 서버와 클라이언트는 어떤 순서로든 읽을 수 있습니다. 스트림들의 운영은 완전히 독립적으로 시행됩니다.

한번 시도해보세요!

example directory README를 따라하며 클라이언트와 서버를 빌드 후에 실행해보세요.

profile
풀스택 웹개발자로 일하고 있는 Jake Seo입니다. 주로 Jake Seo라는 닉네임을 많이 씁니다. 프론트엔드: Javascript, React 백엔드: Spring Framework에 관심이 있습니다.

0개의 댓글