이번에는 client측에서 server 측으로 streaming을 보내보도록 하자.
먼저 protocol buffer의 spec을 보도록 하자.
service RouteGuide {
// A server-to-client streaming RPC.
//
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// A client-to-server streaming RPC.
//
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
}
이전에 server-side streaming에 사용되었던 ListFeatures와 달리 RecordRoute는 매개변수 측이 stream point인 것을 볼 수 있다. 이는 client 측에서 stream으로 n개의 데이터를 보낸다는 것이다.
RecordRoute를 받아내는 server측 golang 코드를 추가해보도록 하자.
우리의 routeGuideServer 구조체를 보면 pb.UnimplementedRouteGuideServer가 있을 것이다. 해당 구조체에 들어가면 우리가 구현해야하는 메서드의 시그니처에 대해서 안내를 받을 수 있다.
type routeGuideServer struct {
pb.UnimplementedRouteGuideServer
savedFeatures []*pb.Feature
mu sync.Mutex
routeNotes map[string][]*pb.RouteNote
}
RecordRoute을 확인해보면 메서드 시그니처가 다음과 같다는 것을 볼 수 있다.
func (UnimplementedRouteGuideServer) RecordRoute(grpc.ClientStreamingServer[Point, RouteSummary]) error {
return status.Errorf(codes.Unimplemented, "method RecordRoute not implemented")
}
이와 동일하게 구성해도 되지만 route_guide_grpc.pb.go를 보면 미리 type alias로 RouteGuide_RecordRouteServer을 구현해놓은 것을 볼 수 있다. 이는 generic이 없던 이전 code와의 호환성을 위한 코드이다. 이를 통해서도 알 수 있는 것은 go는 하위 호환성을 정말 중요하게 생각한다. 가끔 파이썬이나 java를 하다보면 버전 업 후에 클래스, 메서드들이 사라져서 에러가 발생하는 것을 볼 수 있는 데 go에서는 그런 일이 거의 없다.
type RouteGuide_RecordRouteServer = grpc.ClientStreamingServer[Point, RouteSummary]
이 RouteGuide_RecordRouteServer type alias를 활용하여 다음과 같이 메서드 시그니처를 만들 수 있다. 참고로 위의 RecordRoute와 동일한 메서드 시그니처이다.
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error { }
RecordRoute를 구현하기 전에 거리 계산에 필요한 메서드들을 구현해보도록 하자.
func toRadians(num float64) float64 {
return num * math.Pi / float64(180)
}
// calcDistance calculates the distance between two points using the "haversine" formula.
// The formula is based on http://mathforum.org/library/drmath/view/51879.html.
func calcDistance(p1 *pb.Point, p2 *pb.Point) int32 {
const CordFactor float64 = 1e7
const R = float64(6371000) // earth radius in metres
lat1 := toRadians(float64(p1.Latitude) / CordFactor)
lat2 := toRadians(float64(p2.Latitude) / CordFactor)
lng1 := toRadians(float64(p1.Longitude) / CordFactor)
lng2 := toRadians(float64(p2.Longitude) / CordFactor)
dlat := lat2 - lat1
dlng := lng2 - lng1
a := math.Sin(dlat/2)*math.Sin(dlat/2) +
math.Cos(lat1)*math.Cos(lat2)*
math.Sin(dlng/2)*math.Sin(dlng/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
distance := R * c
return int32(distance)
}
이제 RecordRoute를 구현해보도록 하자. RecordRoute는 client 측에서 준 위도, 경도 데이터를 바탕으로 지나온 거리를 구해주는 기능을 한다. 가령 point1과 point2를 주면 point1과 point2의 거리를 구하고, point3이 주어지면 point3과 point2와의 거리를 구하여 여태까지 지나온 총 거리를 구하는 것이다.
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
stream의 Recv를 통해서 client 데이터를 받을 수 있는 것을 볼 수 있다. 재밌는 것은 stream은 stdin으로 받는 것이기 때문에 io.EOF가 나올 때까지 쭉 받도록 만들어야 하는 것이다. 데이터를 모두 받은 후에 io.EOF가 나오면 stream을 통해서 응답을 전송하고 close해야한다. 따라서 SendAndClose를 사용한 것이다.
나머지 계산 코드는 특징점 계산과 거리 계산 코드인데, gRPC의 주요 코드는 아니므로 참고만 하도록 하자.
서버를 구동시켜보도록 하자.
go run ./...
2025/08/11 12:35:12 server starting... :50051
client 측 RecordRoute를 만들 기전에 util 클래스를 하나 만들어놓도록 하자.
public class RouteGuideUtil {
private static final double COORD_FACTOR = 1e7;
/**
* Gets the latitude for the given point.
*/
public static double getLatitude(Point location) {
return location.getLatitude() / COORD_FACTOR;
}
/**
* Gets the longitude for the given point.
*/
public static double getLongitude(Point location) {
return location.getLongitude() / COORD_FACTOR;
}
/**
* Indicates whether the given feature exists (i.e. has a valid name).
*/
public static boolean exists(Feature feature) {
return feature != null && !feature.getName().isEmpty();
}
}
다음으로 recordRoute를 호출하는 java client code를 만들어보도록 하자. 이번에는 async한 방법으로 만들어보도록 하자.
public class RouteGuideClient {
private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
private final RouteGuideGrpc.RouteGuideStub asyncStub;
...
public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
System.out.printf("*** ListFeatures: lowLat=%d lowLon=%d hiLat=%d hiLon={%d", lowLat, lowLon, hiLat, hiLon);
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
for (int i = 1; features.hasNext(); i++) {
Feature feature = features.next();
System.out.println("Result #" + i + ":" + feature);
if (testHelper != null) {
testHelper.onMessage(feature);
}
}
} catch (StatusRuntimeException e) {
System.out.println("RPC failed: " + e.getStatus());
if (testHelper != null) {
testHelper.onRpcError(e);
}
}
}
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
System.out.println("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
System.out.printf("Finished trip with %s points. Passed %s features. Travelled %s meters. It took %s seconds.\n", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
if (testHelper != null) {
testHelper.onMessage(summary);
}
}
@Override
public void onError(Throwable t) {
System.out.printf("RecordRoute Failed: %s\n", Status.fromThrowable(t));
if (testHelper != null) {
testHelper.onRpcError(t);
}
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
for (int i = 0; i < numPoints; ++i) {
int index = random.nextInt(features.size());
Point point = features.get(index).getLocation();
System.out.printf("Visiting point %s, %s\n", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(random.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
System.out.println("recordRoute can not finish within 1 minutes");
}
}
...
}
asyncStub으로 recordRoute를 호출하지만 바로 server에 요청하는 것이 아니라, recordRoute를 호출할 수 있는 StreamObserver 구현체를 만들어서 제공해주는 것이다. asyncStub과 같이 java 비동기 stub의 사용 방법은 observer 패턴으로 제공된 메서드들을 구현하면 된다. 3가지 메서드들로 구성되어 있다.
onNext: 다음으로 보낼 데이터를 전달한다.onError: client 측 에러 발생으로 server에 에러를 전달한다. onCompleted: client 측에서 연결을 종료하여 EOF를 전달한다.onNext: 서버가 전달한 응답을 받을 때 사용한다.onError: 서버에서 에러를 전달할 때 에러를 받는 용도로 사용한다.onCompleted: 서버가 연결을 정상 종료할 때 받는 용도로 사용한다.자바의 future나 threading 부분을 많이 사용한 분들에게는 익수한 사용 방법일 것이다.
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
코드를 보면 StreamObserver의 onNext, onError, onCompleted를 모두 구현하여 responseObserver로 asyncStub.recordRoute로 전달하는 것을 볼 수 있다. 이는 서버 측의 응답을 처리하기 위한 StreamObserver라고 생각하면 된다.
requestObserver는 client 측의 StreamObserver로 이미 gRPC stub에서 자동으로 client측이 기능들을 구현해놓은 구현체가 전달된다. 따라서, client가 데이터를 전달하기 위해서는 requestObserver의 onNext를 사용하면 되고, 에러가 발생했다면 서버에 알려주기 위해서 onError를 사용하면 된다. 연결을 정상적으로 종료하기 위해서는 onCompleted를 사용하면 된다.
참고로 서버에서 에러가 발생하거나 network 이슈로 응답이 안오거나 너무 오래 걸릴 수도 있다. 이런 경우의 에러를 해결하기 위해서 timeout을 두는 것이 좋다.
final CountDownLatch finishLatch = new CountDownLatch(1);
CountDownLatch를 통해서 client 측에서 요청을 모두 전달하고 onCompleted까지 호출했는데도 응답이 오지 않았다면 1분 간만 기다리고 timeout 시키는 것이다.
이제 recordRoute를 main 코드에서 실행시켜보도록 하자. 이전에 전달할 client 측 json 데이터를 우리의 gRPC 데이터로 변환해줄 메서드들 구현해보도록 하자.
public class JsonToGrpcConverter {
public static List<Feature> convertJsonToFeatures(String json) {
Gson gson = new Gson();
JsonData jsonData = gson.fromJson(json, JsonData.class);
List<Feature> features= new ArrayList<>();
for (JsonFeature jsonFeature : jsonData.feature) {
// Point 객체 생성
Point point = Point.newBuilder()
.setLatitude(jsonFeature.location.latitude)
.setLongitude(jsonFeature.location.longitude)
.build();
// Feature 객체 생성
Feature feature = Feature.newBuilder()
.setName(jsonFeature.name)
.setLocation(point)
.build();
features.add(feature);
}
return features;
}
// JSON 구조를 매핑할 클래스들
private static class JsonData {
List<JsonFeature> feature;
}
private static class JsonFeature {
JsonLocation location;
String name;
}
private static class JsonLocation {
int latitude;
int longitude;
}
}
gRPC로 만든 java 데이터 class들은 모두 builder 패턴으로 데이터를 채워 넣어야 한다. golang에 익숙한 개발자들에게는 뭐 이런 말도 안되는 논리가 있나 싶을 정도로 개발자에게 자유를 풀어주지 않는다. 그래서 newBuilder로 builder를 만들어 데이터를 하나씩 넣어주거나, 처음 protocol buffer를 만들 때부터 변환하기 쉽게 만드는 데이터를 넣어주는 것도 방법이다.
이제 client 측 main code를 만들어보도록 하자.
@SpringBootApplication
public class JavaRouteguideApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(JavaRouteguideApplication.class, args);
String json = "{\"feature\":[{\"location\":{\"latitude\":407838351,\"longitude\":-746143763},\"name\":\"Patriots Path, Mendham, NJ 07945, USA\"},{\"location\":{\"latitude\":408122808,\"longitude\":-743999179},\"name\":\"101 New Jersey 10, Whippany, NJ 07981, USA\"},{\"location\":{\"latitude\":413628156,\"longitude\":-749015468},\"name\":\"U.S. 6, Shohola, PA 18458, USA\"}]}";
String target = "localhost:50051";
List<Feature> features;
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()).build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
features = JsonToGrpcConverter.convertJsonToFeatures(json);
client.recordRoute(features, features.size());
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
json으로 구성된 string 데이터인 json을 JsonToGrpcConverter를 통해 List<Feature> features으로 만들어주어 recordRoute에 전달해주면 된다.
이제 실행시켜보고 로그를 확인해보도록 하자.
*** RecordRoute
Visiting point 40.7838351, -74.6143763
Visiting point 40.8122808, -74.3999179
Visiting point 41.3628156, -74.9015468
Finished trip with 3 points. Passed 3 features. Travelled 92589 meters. It took 3 seconds.
Finished RecordRoute
3개의 데이터가 잘 전달되고 결과 report도 잘 나온 것을 볼 수 있다. 이는 서버에 요청 stream을 3개 전달하고 stream들을 처리한 결과에 대한 전체 응답이 온 것을 볼 수 있다.