지난 시간에는 gRPC server와 client 간의 Client-side streaming을 보았다. 이번에는 gRPC의 server, client 간의 양방향 통신에 대해서 알아보도록 하자.
server <---> client
양 측이 서로 데이터를 주고 받을 수 있다는 것이다.
우리의 RouteGuide protocol buffer를 보도록 하자.
// Interface exported by the server.
service RouteGuide {
// A simple RPC.
//
// Obtains the feature at a given position.
//
// A feature with an empty name is returned if there's no feature at the given
// position.
rpc GetFeature(Point) returns (Feature) {}
// A server-to-client streaming RPC.
//
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// A client-to-server streaming RPC.
//
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
// A Bidirectional streaming RPC.
//
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
RouteChat를 보면 ListFeatures와 RecordRoute와 달리, 매개변수에도 stream 지시자가 있고 응답을 전송하는 리턴 타입에도 stream이 있다. 이를 통해 RouteChat이 양방향 통신을 할 수 있다는 사실을 알 수 있다.
이제 server 측의 bidirectional streaming을 구현해보도록 하자.
go gRPC stub code를 먼저 보도록 하자.
UnimplementedRouteGuideServer을 따라가면 다음의 메서드 시그니처가 있다.
func (UnimplementedRouteGuideServer) RouteChat(grpc.BidiStreamingServer[RouteNote, RouteNote]) error {
return status.Errorf(codes.Unimplemented, "method RouteChat not implemented")
}
grpc.BidiStreamingServer이라는 인터페이스를 볼 수 있다. 이 인터페이스를 따라가면 다음의 명세가 있을 것이다.
type BidiStreamingServer[Req any, Res any] interface {
// Recv receives the next request message from the client. The server may
// repeatedly call Recv to read messages from the request stream. If
// io.EOF is returned, it indicates the client called CloseSend on its
// BidiStreamingClient. Any other error indicates the stream was
// terminated unexpectedly, and the handler method should return, as the
// stream is no longer usable.
Recv() (*Req, error)
// Send sends a response message to the client. The server handler may
// call Send multiple times to send multiple messages to the client. An
// error is returned if the stream was terminated unexpectedly, and the
// handler method should return, as the stream is no longer usable.
Send(*Res) error
// ServerStream is embedded to provide Context, SetHeader, SendHeader, and
// SetTrailer functionality. No other methods in the ServerStream should
// be called directly.
ServerStream
}
Recv와 Send가 있는 것을 볼 수 있다. 데이터를 받을 때는 Recv를 받고 io.EOF가 올 때까지 받는 것이고 보내는 것은 Send로 보내는 것이다. 해당 인터페이스의 구현체는 gRPC에서 자동으로 만들어 채워줄 것이다.
이제 RouteChat의 명세를 구현해보도록 하자.
func serialize(point *pb.Point) string {
return fmt.Sprintf("%d %d", point.Latitude, point.Longitude)
}
// RouteChat receives a stream of message/location pairs, and responds with a stream of all
// previous messages at each of those locations.
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key], in)
// Note: this copy prevents blocking other clients while serving this one.
// We don't need to do a deep copy, because elements in the slice are
// insert-only and never modified.
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn, s.routeNotes[key])
s.mu.Unlock()
for _, note := range rn {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
사실 양방향 통신이라고 해서 이전과 특별히 다른 것은 없다. Recv로 데이터를 받아내고 Send로 데이터를 보내는 것이다. 이렇게 보면 이전에 client-side streaming과 큰 차이가 없어 보이지만, client-side streaming의 경우는 server가 데이터를 하나만 보낼 수 있다면, 양방향 통신에는 server도 다량의 stream을 보낼 수 있다는 것이다. 그래서 for문을 순회하면서 Send를 호출하는 것을 볼 수 있다.
다음으로는 java gRPC client 측의 routeChat을 사용하여 데이터를 보내도록 하자. client 측에서 다량의 stream을 보내도록 하고, server로 부터 다량의 stream을 받도록 하는 것이다.
public class RouteGuideClient {
private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
private final RouteGuideGrpc.RouteGuideStub asyncStub;
private Random random = new Random();
private TestHelper testHelper;
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
...
private RouteNote newNote(String message, int lat, int lon) {
return RouteNote.newBuilder().setMessage(message)
.setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build()).build();
}
public CountDownLatch routeChat() {
System.out.println("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
System.out.printf("Got message %s %s %s\n", note.getMessage(), note.getLocation().getLatitude(), note.getLocation().getLongitude());
if (testHelper != null) {
testHelper.onMessage(note);
}
}
@Override
public void onError(Throwable t) {
System.out.println("RouteChat Failed: " + Status.fromThrowable(t));
if (testHelper != null) {
testHelper.onRpcError(t);
}
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
System.out.printf("Sending message %s %s %s\n", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
...
}
routeChat 메서드를 보면 client-side streaming에서 사용했던 방법과 비슷하게 데이터를 보내는 것을 볼 수 있다.
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
System.out.printf("Got message %s %s %s\n", note.getMessage(), note.getLocation().getLatitude(), note.getLocation().getLongitude());
if (testHelper != null) {
testHelper.onMessage(note);
}
}
@Override
public void onError(Throwable t) {
System.out.println("RouteChat Failed: " + Status.fromThrowable(t));
if (testHelper != null) {
testHelper.onRpcError(t);
}
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Finished RouteChat");
finishLatch.countDown();
}
});
private final RouteGuideGrpc.RouteGuideStub asyncStub;은 비동기 통신으로 gRPC 요청을 보내고 응답을 받아낸다. 응답을 받기 위해서는 routeChat안에 responseObserver를 만들어주어야 한다. responseObserver를 보면 StreamObserver라는 인터페이스인 것을 알 수 있다.
public io.grpc.stub.StreamObserver<com.grpc.javarouteguide.grpc.routeguide.RouteNote> routeChat(
io.grpc.stub.StreamObserver<com.grpc.javarouteguide.grpc.routeguide.RouteNote> responseObserver) {
return io.grpc.stub.ClientCalls.asyncBidiStreamingCall(
getChannel().newCall(getRouteChatMethod(), getCallOptions()), responseObserver);
}
StreamObserver 인터페이스의 명세는 다음과 같다.
public interface StreamObserver<V> {
void onNext(V var1);
void onError(Throwable var1);
void onCompleted();
}
asyncStub으로 응답을 받을 때는 이전에도 말했듯이 onNext를 통해서 응답을 받아내야 한다. 응답을 모두 받았다면 onCompleted가 실행되고 만약 이 도중에 에러가 발생하면 onError가 발생한다.
client-side에서 요청을 보내는 코드를 보면 다음과 같다.
for (RouteNote request : requests) {
System.out.printf("Sending message %s %s %s\n", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
requestObserver를 통해서 onNext로 다량의 데이터를 보내는 것을 볼 수 있다. requestObserver도 StreamObserver를 따르는 구현체이기 때문이다.
routeChat의 재미난 점은 대량의 데이터를 전송하고 나서 모든 응답이 언제올 지 모르기 때문에, CountDownLatch를 반환하여 호출한 측에서 제어하도록 한다.
이제 routeChat을 호출하는 main code를 만들어보도록 하자.
@SpringBootApplication
public class JavaRouteguideApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(JavaRouteguideApplication.class, args);
String json = "{\"feature\":[{\"location\":{\"latitude\":407838351,\"longitude\":-746143763},\"name\":\"Patriots Path, Mendham, NJ 07945, USA\"},{\"location\":{\"latitude\":408122808,\"longitude\":-743999179},\"name\":\"101 New Jersey 10, Whippany, NJ 07981, USA\"},{\"location\":{\"latitude\":413628156,\"longitude\":-749015468},\"name\":\"U.S. 6, Shohola, PA 18458, USA\"}]}";
String target = "localhost:50051";
List<Feature> features;
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()).build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
System.out.println("routeChat can not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
리턴으로 받은 CountDownLatch의 timeout을 설정하여 응답이 모두 처리되도록 하면 된다. 참고로 client는 routeChat 메서드 안에서 하드 코딩된 requests를 전송하고 있다.
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
이제 실행해보도고 결과를 확인하자. 4개를 전송했으니 4개에 대한 응답을 받아야 한다.
*** RouteChat
Sending message First message 0 0
Sending message Second message 0 10000000
Sending message Third message 10000000 0
Sending message Fourth message 10000000 10000000
Got message First message 0 0
Got message Second message 0 10000000
Got message Third message 10000000 0
Got message Fourth message 10000000 10000000
Finished RouteChat
다음의 결과를 통해서 client가 server로 4개의 stream을 보냈고, server가 client로 4개의 응답을 보낸 것이다. 이로써 bidirectional streaming이 가능하다는 것을 알 수 있다.