gRPC를 배워보자 6일차 - gRPC Bidirectional streaming RPC

gRPC

목록 보기
6/6

Bidirectional streaming RPC

지난 시간에는 gRPC server와 client 간의 Client-side streaming을 보았다. 이번에는 gRPC의 server, client 간의 양방향 통신에 대해서 알아보도록 하자.

server <---> client

양 측이 서로 데이터를 주고 받을 수 있다는 것이다.

우리의 RouteGuide protocol buffer를 보도록 하자.

// Interface exported by the server.
service RouteGuide {
  // A simple RPC.
  //
  // Obtains the feature at a given position.
  //
  // A feature with an empty name is returned if there's no feature at the given
  // position.
  rpc GetFeature(Point) returns (Feature) {}

  // A server-to-client streaming RPC.
  //
  // Obtains the Features available within the given Rectangle.  Results are
  // streamed rather than returned at once (e.g. in a response message with a
  // repeated field), as the rectangle may cover a large area and contain a
  // huge number of features.
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

RouteChat를 보면 ListFeaturesRecordRoute와 달리, 매개변수에도 stream 지시자가 있고 응답을 전송하는 리턴 타입에도 stream이 있다. 이를 통해 RouteChat이 양방향 통신을 할 수 있다는 사실을 알 수 있다.

이제 server 측의 bidirectional streaming을 구현해보도록 하자.

bidirectional streaming golang 서버 구현

go gRPC stub code를 먼저 보도록 하자.

UnimplementedRouteGuideServer을 따라가면 다음의 메서드 시그니처가 있다.

  • route_guide_grpc.pb.go
func (UnimplementedRouteGuideServer) RouteChat(grpc.BidiStreamingServer[RouteNote, RouteNote]) error {
	return status.Errorf(codes.Unimplemented, "method RouteChat not implemented")
}

grpc.BidiStreamingServer이라는 인터페이스를 볼 수 있다. 이 인터페이스를 따라가면 다음의 명세가 있을 것이다.

type BidiStreamingServer[Req any, Res any] interface {
	// Recv receives the next request message from the client.  The server may
	// repeatedly call Recv to read messages from the request stream.  If
	// io.EOF is returned, it indicates the client called CloseSend on its
	// BidiStreamingClient.  Any other error indicates the stream was
	// terminated unexpectedly, and the handler method should return, as the
	// stream is no longer usable.
	Recv() (*Req, error)

	// Send sends a response message to the client.  The server handler may
	// call Send multiple times to send multiple messages to the client.  An
	// error is returned if the stream was terminated unexpectedly, and the
	// handler method should return, as the stream is no longer usable.
	Send(*Res) error

	// ServerStream is embedded to provide Context, SetHeader, SendHeader, and
	// SetTrailer functionality.  No other methods in the ServerStream should
	// be called directly.
	ServerStream
}

RecvSend가 있는 것을 볼 수 있다. 데이터를 받을 때는 Recv를 받고 io.EOF가 올 때까지 받는 것이고 보내는 것은 Send로 보내는 것이다. 해당 인터페이스의 구현체는 gRPC에서 자동으로 만들어 채워줄 것이다.

이제 RouteChat의 명세를 구현해보도록 하자.

  • routeguide_server.go
func serialize(point *pb.Point) string {
	return fmt.Sprintf("%d %d", point.Latitude, point.Longitude)
}

// RouteChat receives a stream of message/location pairs, and responds with a stream of all
// previous messages at each of those locations.
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		key := serialize(in.Location)

		s.mu.Lock()
		s.routeNotes[key] = append(s.routeNotes[key], in)
		// Note: this copy prevents blocking other clients while serving this one.
		// We don't need to do a deep copy, because elements in the slice are
		// insert-only and never modified.
		rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
		copy(rn, s.routeNotes[key])
		s.mu.Unlock()

		for _, note := range rn {
			if err := stream.Send(note); err != nil {
				return err
			}
		}
	}
}

사실 양방향 통신이라고 해서 이전과 특별히 다른 것은 없다. Recv로 데이터를 받아내고 Send로 데이터를 보내는 것이다. 이렇게 보면 이전에 client-side streaming과 큰 차이가 없어 보이지만, client-side streaming의 경우는 server가 데이터를 하나만 보낼 수 있다면, 양방향 통신에는 server도 다량의 stream을 보낼 수 있다는 것이다. 그래서 for문을 순회하면서 Send를 호출하는 것을 볼 수 있다.

bidirectional streaming java 클라이언트 구현

다음으로는 java gRPC client 측의 routeChat을 사용하여 데이터를 보내도록 하자. client 측에서 다량의 stream을 보내도록 하고, server로 부터 다량의 stream을 받도록 하는 것이다.


public class RouteGuideClient {
    private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
    private final RouteGuideGrpc.RouteGuideStub asyncStub;

    private Random random = new Random();
    private TestHelper testHelper;

    public RouteGuideClient(Channel channel) {
        blockingStub = RouteGuideGrpc.newBlockingStub(channel);
        asyncStub = RouteGuideGrpc.newStub(channel);
    }
    
    ...

    private RouteNote newNote(String message, int lat, int lon) {
        return RouteNote.newBuilder().setMessage(message)
                .setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build()).build();
    }

    public CountDownLatch routeChat() {
        System.out.println("*** RouteChat");
        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver<RouteNote> requestObserver =
                asyncStub.routeChat(new StreamObserver<RouteNote>() {
                    @Override
                    public void onNext(RouteNote note) {
                        System.out.printf("Got message %s %s %s\n", note.getMessage(), note.getLocation().getLatitude(), note.getLocation().getLongitude());
                        if (testHelper != null) {
                            testHelper.onMessage(note);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("RouteChat Failed: " + Status.fromThrowable(t));
                        if (testHelper != null) {
                            testHelper.onRpcError(t);
                        }
                        finishLatch.countDown();
                    }

                    @Override
                    public void onCompleted() {
                        System.out.println("Finished RouteChat");
                        finishLatch.countDown();
                    }
                });
        try {
            RouteNote[] requests =
                    {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
                            newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

            for (RouteNote request : requests) {
                System.out.printf("Sending message %s %s %s\n", request.getMessage(), request.getLocation()
                        .getLatitude(), request.getLocation().getLongitude());
                requestObserver.onNext(request);
            }
        } catch (RuntimeException e) {
            // Cancel RPC
            requestObserver.onError(e);
            throw e;
        }
        // Mark the end of requests
        requestObserver.onCompleted();

        // return the latch while receiving happens asynchronously
        return finishLatch;
    }

    ...

}

routeChat 메서드를 보면 client-side streaming에서 사용했던 방법과 비슷하게 데이터를 보내는 것을 볼 수 있다.

 StreamObserver<RouteNote> requestObserver =
                asyncStub.routeChat(new StreamObserver<RouteNote>() {
                    @Override
                    public void onNext(RouteNote note) {
                        System.out.printf("Got message %s %s %s\n", note.getMessage(), note.getLocation().getLatitude(), note.getLocation().getLongitude());
                        if (testHelper != null) {
                            testHelper.onMessage(note);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("RouteChat Failed: " + Status.fromThrowable(t));
                        if (testHelper != null) {
                            testHelper.onRpcError(t);
                        }
                        finishLatch.countDown();
                    }

                    @Override
                    public void onCompleted() {
                        System.out.println("Finished RouteChat");
                        finishLatch.countDown();
                    }
                });

private final RouteGuideGrpc.RouteGuideStub asyncStub;은 비동기 통신으로 gRPC 요청을 보내고 응답을 받아낸다. 응답을 받기 위해서는 routeChat안에 responseObserver를 만들어주어야 한다. responseObserver를 보면 StreamObserver라는 인터페이스인 것을 알 수 있다.

public io.grpc.stub.StreamObserver<com.grpc.javarouteguide.grpc.routeguide.RouteNote> routeChat(
        io.grpc.stub.StreamObserver<com.grpc.javarouteguide.grpc.routeguide.RouteNote> responseObserver) {
      return io.grpc.stub.ClientCalls.asyncBidiStreamingCall(
          getChannel().newCall(getRouteChatMethod(), getCallOptions()), responseObserver);
    }

StreamObserver 인터페이스의 명세는 다음과 같다.

public interface StreamObserver<V> {
    void onNext(V var1);

    void onError(Throwable var1);

    void onCompleted();
}

asyncStub으로 응답을 받을 때는 이전에도 말했듯이 onNext를 통해서 응답을 받아내야 한다. 응답을 모두 받았다면 onCompleted가 실행되고 만약 이 도중에 에러가 발생하면 onError가 발생한다.

client-side에서 요청을 보내는 코드를 보면 다음과 같다.

for (RouteNote request : requests) {
                System.out.printf("Sending message %s %s %s\n", request.getMessage(), request.getLocation()
                        .getLatitude(), request.getLocation().getLongitude());
                requestObserver.onNext(request);
            }

requestObserver를 통해서 onNext로 다량의 데이터를 보내는 것을 볼 수 있다. requestObserverStreamObserver를 따르는 구현체이기 때문이다.

routeChat의 재미난 점은 대량의 데이터를 전송하고 나서 모든 응답이 언제올 지 모르기 때문에, CountDownLatch를 반환하여 호출한 측에서 제어하도록 한다.

이제 routeChat을 호출하는 main code를 만들어보도록 하자.

@SpringBootApplication
public class JavaRouteguideApplication {
    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(JavaRouteguideApplication.class, args);
        String json = "{\"feature\":[{\"location\":{\"latitude\":407838351,\"longitude\":-746143763},\"name\":\"Patriots Path, Mendham, NJ 07945, USA\"},{\"location\":{\"latitude\":408122808,\"longitude\":-743999179},\"name\":\"101 New Jersey 10, Whippany, NJ 07981, USA\"},{\"location\":{\"latitude\":413628156,\"longitude\":-749015468},\"name\":\"U.S. 6, Shohola, PA 18458, USA\"}]}";
        String target = "localhost:50051";
        List<Feature> features;

        ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()).build();
        try {
            RouteGuideClient client = new RouteGuideClient(channel);
            // Send and receive some notes.
            CountDownLatch finishLatch = client.routeChat();

            if (!finishLatch.await(1, TimeUnit.MINUTES)) {
                System.out.println("routeChat can not finish within 1 minutes");
            }
        } finally {
            channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
        }
    }
}

리턴으로 받은 CountDownLatch의 timeout을 설정하여 응답이 모두 처리되도록 하면 된다. 참고로 client는 routeChat 메서드 안에서 하드 코딩된 requests를 전송하고 있다.

RouteNote[] requests =
                    {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
                            newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

이제 실행해보도고 결과를 확인하자. 4개를 전송했으니 4개에 대한 응답을 받아야 한다.

*** RouteChat
Sending message First message 0 0
Sending message Second message 0 10000000
Sending message Third message 10000000 0
Sending message Fourth message 10000000 10000000
Got message First message 0 0
Got message Second message 0 10000000
Got message Third message 10000000 0
Got message Fourth message 10000000 10000000
Finished RouteChat

다음의 결과를 통해서 client가 server로 4개의 stream을 보냈고, server가 client로 4개의 응답을 보낸 것이다. 이로써 bidirectional streaming이 가능하다는 것을 알 수 있다.

0개의 댓글