자, 저번 게시글에서는 간단히 gRPC에 대한 내용을 정리 했다. 이번에는 gRPC를 실질적으로 어떻게 활용해야하는지 등에 대해서 간단히 작성해볼려고 한다.
우선 저자(본인)은 Python을 활용할 예정이다. 여러 예제에서는 GO 를 사용하지만 Python 예제가 정리 된게 없기도 할 뿐더러 인공지능 서버 구축을 위해서 gRPC를 사용하기 때문에 Python을 활용할 예정이다.
더불어 개발 환경 세팅은 따로 언급하진 않았다. 그에 대한 내용은 공식 문서 를 참조하길 바란다. 공식 문서에서 Example을 clone하고 이것저것 보여주지만 grpcio-tools
까지만 pip 설치해도 된다. 이후 내용은 아래 코드와 예제를 참고하면 된다.
하고자하는 서비스? 에 대한 규정을 내려야한다. 여기서 규정은 Server와 Client (혹은 Master와 Worker)가 어떤 것을 주고 받을 것인지, 그에 Data type은 어떻게 정의를 내릴 것인지를 정한다.
gRPC에서는 이를 Proto 를 통해서 규정한다 공식 문서 링크. 보통 예제에서는 Greeting을 쓰지만 나는 당장 써먹을 수 있는 np.ndarray
전송과 수신 방법에 대해서 정의할려고 한다.
Server는 gRPC에서 언제든지 Request를 들을 자세가 되어 있어야한다. 반대로 Client는 Request를 보내고 Respone을 받을 준비가 되어야 한다. 저자는 이 부분이 막상 구현하다가 헷갈려 아래와 같이 정리를 하였다.
Service는 하고자 하는 내용, Request와 Respone은 Message로 코드에서 적용될 것이고 Request에 따라 Server에서 해야할 일도 달라질 것이다. Proto는 아래와 같이 정의가 되야할 것이다.
// ./proto/simple_example.proto
syntax = "proto3";
service SimpleService {
rpc GetSimpleTensorComputedResult(SimpleTensorRequest) returns (SimpleTensorResponse) {}
}
message SimpleTensorRequest {
bytes tensor = 1;
repeated int64 tensor_shape = 2;
}
message SimpleTensorResponse {
bytes tensor = 1;
repeated int64 tensor_shape = 2;
}
제공하는 서비스는 일단 예제로 간단한 서비스를 정의하였다. 이후에 서비스에서 제공하는 내용은 GetSimpleTensorComputeResult
로 간단히 Tensor의 내용을 서버에서 연산하고 그 값을 반환하도록 하였다.
이후에는 grpcio-tools를 이용해 stub의 역할에 대한 코드를 뽑아야 한다. 이는 아래와 같은 명령문을 통해서 추출하면 된다.
python -m grpc_tools.protoc -I ./proto --python_out=. --grpc_python_out=. ./proto/simple_example.proto
grpc_tools.protoc 가 grpcio-tools로 설치된 모듈이다. proto 디렉토리 내에 위에서 정의한 코드가 있다는 가정하에 계속하면... 해당 모듈(?) 코드는 --python_out=.
으로 현재 디렉토리에 추출하고 --grpc_python_out=.
로 grpc 파이썬은 어디로 추출할지 마지막으로 정의한 proto는 어디에 있는지 지정해주면 된다.
자, 서버에서는 Client에게 ndarray를 받으면 이를 연산하고 다시 반환하는 코드가 있어야한다. 해당 코드는 아래와 같이 작성하였다.
# server.py
import simple_example_pb2_grpc
import simple_example_pb2
import numpy as np
import logging
import base64
import grpc
import time
from concurrent import futures
logging.basicConfig(level=logging.INFO)
SERVER_HOST = "localhost"
SERVER_PORT = "50050"
class SimpleExamplePrim(simple_example_pb2_grpc.SimpleServiceServicer):
def GetSimpleTensorComputedResult(self, request, context):
b64Tensor = request.tensor
b64Tuple = tuple(request.tensor_shape)
print(b64Tensor, b64Tuple)
tensor = np.frombuffer(b64Tensor)
computed_tensor = tensor + 10
computed_tensor_shape = computed_tensor.shape
print(tensor.shape)
encoded_tensor = base64.b64encode(computed_tensor.tobytes())
return simple_example_pb2.SimpleTensorResponse(
tensor=encoded_tensor,
tensor_shape=computed_tensor_shape)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
simple_example_pb2_grpc.add_SimpleServiceServicer_to_server(SimpleExamplePrim(), server)
server.add_insecure_port(f"{SERVER_HOST}:{SERVER_PORT}")
server.start()
print("Started server")
try:
while True:
time.sleep(10000)
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
serve()
주의해야할 점은 proto
에서 정의한 rpc 서비스의 이름과 같아야한다. proto
에서 Tensor를 주고 받기 위한 서비스 이름으로 GetSimpleTensorComputedResult
라고 했으니 이를 종속 받는 객체 내에 메쏘드로 GetSimpleTensorComputedResult
로 정의한다. 이후에는 아래 예제에서 한 것과 같이 하면 된다. 간단히 연산을 하는 코드로 Piecewise 연산으로 10을 각각 더해주었다.
Request를 받은 후, Respone은 SimpleTensorResponse
로 보내준다. 이때 문제점이 하나 있는데 ndarray를 그대로 전송할 수는 없고 base64로 Encoding을 거쳐야한다. 이유는 gRPC는 Byte 위에서 옮겨다니는 Protocol이라고 봐야한다. 이를 한번 인코딩을 미리 코드 선에서 거쳐야하며 proto 정의를 bytes 혹은 string으로 정의를 내림으로서 우리가 Request의 인자값을 그에 맞게 설정해줘야 한다.
이후에 전송을 하고 이를 받는 코드는 아래를 참조하면 된다.
Client는 Server 에게 "Tensor 보냈으니 연산해서 보내" 라는 요청을 보내야한다. 이를 SimpleTensorRequest
로 정의한다. 코드는 아래와 같다.
# client.py
import simple_example_pb2_grpc
import simple_example_pb2
import numpy as np
import base64
import grpc
from concurrent import futures
SERVER_HOST = "localhost"
SERVER_PORT = "50050"
def run():
channel = grpc.insecure_channel(f"{SERVER_HOST}:{SERVER_PORT}")
stub = simple_example_pb2_grpc.SimpleServiceStub(channel)
simple_tensor_shape = (224, 224, 3)
simple_tensor = np.random.rand(*simple_tensor_shape)
simple_tensor_encoded = base64.b64encode(simple_tensor.tobytes())
simple_tensor_response = stub.GetSimpleTensorComputedResult(
simple_example_pb2.SimpleTensorResponse(
tensor=simple_tensor_encoded,
tensor_shape=simple_tensor_shape,
))
resp_tensor = np.frombuffer(base64.b64decode(simple_tensor_response.tensor))
resp_tensor = resp_tensor.reshape(simple_tensor_response.tensor_shape)
print(resp_tensor.shape)
if __name__ == "__main__":
run()
simple_tensor
를 생성하고 Base64로 인코딩 한 후에 shape과 함께 보내준다. 이를 서버에게 Request를 보내고 다시 디코딩을 거쳐서 Shape이 맞는지 확인차 출력하고 프로그램은 종료된다.
아직 저자가 해결하지 못한 문제가 한가지 있는데 바로 전송 크기와 gRPC Server에서 한번에 받을 수 있는 양이 제한적이다 라는 것이다. 만약에 (784, 256, 12)
텐서를 생성하여 보내면 아래와 같은 에러가 발생한다.
Traceback (most recent call last):
File "/Users/bahk_insung/Documents/Github/cplex_lib/grpc/examples/client.py", line 33, in <module>
run()
File "/Users/bahk_insung/Documents/Github/cplex_lib/grpc/examples/client.py", line 21, in run
simple_tensor_response = stub.GetSimpleTensorComputedResult(
File "/Users/bahk_insung/miniconda3/envs/grpc_env/lib/python3.9/site-packages/grpc/_channel.py", line 1030, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/Users/bahk_insung/miniconda3/envs/grpc_env/lib/python3.9/site-packages/grpc/_channel.py", line 910, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Received message larger than max (25690124 vs. 4194304)"
debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:50050 {grpc_message:"Received message larger than max (25690124 vs. 4194304)", grpc_status:8, created_time:"2023-07-31T14:19:47.494257+09:00"}"
>
Maximum이 따로 규정되어 있는 것으로 보아 이는 별다른 조정이 필요하거나 로직상에 수정이 필요할 것으로 보인다.
우선 오늘 포스트는 여기서 마무리 짓고... 이후에 전체 Array를 한번에 전송하는 방법과 Dicitionary, List를 어떻게 보내야하는지도 이후에 정리할 예정이다.