회사의 서비스 구조가 MSA에 가까운 형태이기 때문에 지속적으로 작은 크기의 모듈들을 만들고 있고, 그 작은 모듈들을 gRPC 서비스로 만들어서 다른 부서로 넘기고 있다. 그러다보니 계속 같은 코드를 작성하게되고, REST 까지 구현하다보면 중복으로 작성하는 코드가 생기게 된다.
따라서 빠른 구현 및 배포를 위해 템플릿 형태로 만들어서 놓으려 한다.
Golang, protobuf 및 protobuf compiler 에 대해서 무지한 상태로 적은 글이니, 피드백은 감사합니다.
gRPC를 사용해야 하는 이유가 있다면?
gRPC에 대한 동작 원리에 대한 글이 아니므로 그런 기술적인 부분이 궁금할 때 참고할 수 있는 링크를 추가해두었다.
어쨌거나 gRPC의 명확한 장점들에도 불구하고 gRPC를 사용하면서 생기는 불만은
proto
파일 컴파일을 따로 해줘야하는 귀찮음다행스럽게도, gRPC ecosystem 에서는 gRPC의 부족한 부분을 채워줄 수 있는 라이브러리 묶음(?)을 제공하고 있다. 본 글에서는 간단하게 더하기, 빼기를 수행하는 서버를 구현하고, gRPC ecosystem 의 gRPC-gateway 레포지토리를 참고하여 REST 요청을 위한 리버스 프록시를 생성하고 Swagger-UI까지 적용해보도록 하겠다.
우선 디렉토리 구조를 다음과 같이 잡고 시작한다.
root
├ grpc_client.py
├ grpc_server.py
├ config.yaml
├ proto/
├── sample.proto
/proto
디렉토리 생성/proto/sample.proto
grpc_client.py
파일 생성/grpc_server.py
생성config.yaml
를 프로젝트 최상단에 생성config.yaml
파일에는 일단 MAX_WORKERS : 2
라고 한 줄 써두면 된다 (서버 스레드 개수)/proto/sample.proto
에 더하기 빼기용 proto를 먼저 정의하였다.
syntax = "proto3";
option go_package="sample_service/gateway";
package sample_service.sample_module;
service SampleService {
// 더하기용 rpc
rpc AddRequest (OperandMessage) returns (ResultResponse) {};
// 빼기용 rpc
rpc SubRequest (OperandMessage) returns (ResultResponse) {}
};
// 피연산자를 담는 메시지
message OperandMessage {
int32 operand_1 = 1;
int32 operand_2 = 2;
}
// 결과를 다믄 메시지
message ResultResponse {
int32 result = 1;
}
message OperandMessage
를 만들었다.int32
형의 숫자 하나로 받게 될 것이다.이제 작성한 proto 를 컴파일해야 한다.
python -m grpc_tools.protoc -I./proto --python_out=./proto --grpc_python_out=./proto ./proto/sample.proto
패키지 가 없어서 오류가 발생한다면 패키지를 설치하면 된다.
pip install grpcio grpcio-tools
proto
디렉토리 내에 컴파일된 아이들이 생겨난 것을 볼 수 있다.
잘 생성된 것이 확인됐으면 이제 gRPC 서버를 구현해보자.
import os
import sys
import argparse
from datetime import datetime
import yaml
import logging
from concurrent import futures
import pprint
import grpc
먼저 필요한 패키지들을 임포트한다.
앞서 proto 를 컴파일 했기 때문에 가져다 쓸 수 있을 것이다. 그런데 관련 패키지들은 ./proto
라는 다른 디렉토리에 존재하므로
sys.path.append("./proto")
를 통해서 패키지를 찾을 수 있도록 경로를 추가한다. 이후에
from proto import sample_pb2
from proto import sample_pb2_grpc
를 통해서 proto 패키지를 임포트 한다.
if __name__=="__main__":
# 인자 파싱
args = parse_arguments()
# 설정 파일 파싱
cfg = load_config(args.config)
# 로거 생성
logger = init_logger()
# argument, config, logger를 넘겨서 서버 실행
serve(args, cfg, logger)
다음으로는 본격적으로 서버를 실행하기 이전에 main 구현. 프로그램 실행 시 받은 인자, 설정 파일을 파싱하고, 로거를 생성해서 serve
함수에 넘긴다.
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=str, default=13271, help="Server port number")
parser.add_argument("--config", default="./config.yaml", help="Path to .yaml config file")
args = parser.parse_args()
return args
def init_logger():
logger = logging.getLogger("gRPC SERVER")
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
return logger
def load_config(config):
with open(config) as f:
cfg = yaml.load(f, Loader=yaml.SafeLoader)
print("CONFIG INFORMATION")
pprint.PrettyPrinter(indent=2).pprint(cfg)
return cfg
앞서 main
함수에서 사용했던 함수들을 실제로 구현할 차례이다.
먼저 parse_arguments()
를 통해 설정 파일의 위치를 받고, 설정 파일을 로드하는 방식으로 구현했다.
다음으로 서버에서 사용할 Logger 객체를 반환하는 함수를 만들었다.
이 세 가지가 전부 필요하지 않을 수 있으니, 그때 그때 수정해서 사용하는게 좋겠다. 예를 들어,
DEBUG
로 고정되어 있으므로, args 에 --quiet
, --verbose
과 같은 인자를 추가해서 로깅 레벨을 조절def serve(args, cfg, logger):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=cfg['MAX_WORKERS']))
sample_pb2_grpc.add_SampleServiceServicer_to_server(Servicer(cfg, logger), server)
server.add_insecure_port('[::]:' + str(args.port))
server.start()
logger.info("gRPC server initialized")
server.wait_for_termination()
프로그램을 시작하고 난 뒤에 서버를 실행하기 위한 설정들이 다 로드되고 나면, 해당 정보들이 serve
함수로 넘어오게 된다. 이 함수에서는 그 정보들을 바탕으로 서버 클래스를 인스턴스화하는 작업을 수행하면 된다.
server.wait_for_termination()
을 추가하지 않으면 아무것도 수행하지 않은 채 프로그램이 종료되버리니 필수!
앞서 config.yaml
에 적어놓은 MAX_WORKERS
값을 사용해서 쓰레드 풀의 인자로 넣으면 된다 (쓰레드 풀에 max_workers
의 수는 동시에 처리해야하는 양방향 통신의 최대 개수)
따라서 워커의 수를 1
로 설정해버리면 경우에 따라 데드락이 발생해서 서비스가 원활하지 않을 수 있으니 주의
class Servicer(sample_pb2_grpc.SampleServiceServicer):
def __init__(self, cfg, logger):
self.cfg = cfg
self.logger = logger
def log_current_time(self):
req_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
self.logger.info(f"Client request for {sys._getframe(1).f_code.co_name} at {req_time}")
def AddRequest(self, req, ctx):
self.log_current_time()
operand_1 = req.operand_1
operand_2 = req.operand_2
return sample_pb2.ResultResponse(result=operand_1+operand_2)
def SubRequest(self, req, ctx):
self.log_current_time()
operand_1 = req.operand_1
operand_2 = req.operand_2
return sample_pb2.ResultResponse(result=operand_1-operand_2)
서버 클래스는 sample_pb2_grpc.SampleServiceService
로 정의된 클래스를 상속받은 뒤
proto 파일에서 정의했던 함수 이름을 가져와서 기능을 구현하면 된다.
결과는 sample_pb2
의 메시지 객체에 담아서 반환
서버가 잘 작동하는지 테스트 해볼 목적으로 간단한 클라이언트를 구현했다.
마찬가지로 main과 proto 파일들을 임포트하고
import os
import sys
import grpc
sys.path.append("./proto")
import yaml
import argparse
from proto import sample_pb2
from proto import sample_pb2_grpc
# 결과를 dict로 바꿔서 출력하기 위해 다음 함수를 임포트
from google.protobuf.json_format import MessageToDict
...
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--ip", default="127.0.0.1", help="IP address")
parser.add_argument("--port", default=13271, help="Port number")
args = parser.parse_args()
return args
if __name__=="__main__":
args = parse_arguments()
main(args)
메시지를 전달하기 위한 channel 을 만들고 stub 을 정의
def main(args):
addr = args.ip + ":" + str(args.port)
channel = grpc.insecure_channel(addr)
stub = sample_pb2_grpc.SampleServiceStub(channel)
...
빡세게 테스트할 것이 아니기 때문에 대충 3
이랑 5
넣어서 하드코딩
def main(args):
...
req = sample_pb2.OperandMessage(
operand_1 = 3,
operand_2 = 5
)
...
그러고 나면 stub
를 사용해서 메시지를 보내고 결과를 출력
def main(args):
...
res = stub.AddRequest(req)
print(res)
python grpc_server.py
로 서버를 실행하면
그 다음 클라이언트를 실행해서 결과가 잘 출력되는지 확인
메시지에 값으로 3
과 5
를 주고 더하기 연산
을 요청하고 난 뒤에 받은 결과값이 8
로 제대로 나오는 것을 확인할 수 있다.
그럼 이제 기본 gRPC 서버 구현은 마친 셈이다. 다음 글에서는 서버에서 REST 요청을 처리할 수 있도록 리버스 프록시 서버를 구현해 볼 것이다.
최종 코드는 다음과 같다.
import os
import sys
import argparse
from datetime import datetime
import yaml
import logging
from concurrent import futures
import pprint
import grpc
sys.path.append("./proto")
# TODO : import your generated proto module sample_pb2 -> your_service_pb2
from proto import sample_pb2
from proto import sample_pb2_grpc
class Servicer(sample_pb2_grpc.SampleServiceServicer):
def __init__(self, cfg, logger):
self.cfg = cfg
self.logger = logger
def log_current_time(self):
req_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
self.logger.info(f"Client request for {sys._getframe(1).f_code.co_name} at {req_time}")
# TODO : modify function names - match gRPC proto
def AddRequest(self, req, ctx):
self.log_current_time()
operand_1 = req.operand_1
operand_2 = req.operand_2
return sample_pb2.ResultResponse(result=operand_1+operand_2)
def SubRequest(self, req, ctx):
self.log_current_time()
operand_1 = req.operand_1
operand_2 = req.operand_2
return sample_pb2.ResultResponse(result=operand_1-operand_2)
def serve(args, cfg, logger):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=cfg['MAX_WORKERS']))
# TODO : modify sample_pb2_grpc -> your_service_pb2_grpc
sample_pb2_grpc.add_SampleServiceServicer_to_server(Servicer(cfg, logger), server)
server.add_insecure_port('[::]:' + str(args.port))
server.start()
logger.info("gRPC server initialized")
server.wait_for_termination()
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=str, default=13271, help="Server port number")
parser.add_argument("--config", default="./config.yaml", help="Path to .yaml config file")
args = parser.parse_args()
return args
def init_logger():
logger = logging.getLogger("gRPC SERVER")
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
return logger
def load_config(config):
with open(config) as f:
cfg = yaml.load(f, Loader=yaml.SafeLoader)
print("CONFIG INFORMATION")
pprint.PrettyPrinter(indent=2).pprint(cfg)
return cfg
if __name__=="__main__":
args = parse_arguments()
cfg = load_config(args.config)
logger = init_logger()
serve(args, cfg, logger)
import os
import sys
import grpc
sys.path.append("./proto")
import yaml
import argparse
# TODO : import your own generated grpc modules
from proto import sample_pb2
from proto import sample_pb2_grpc
from google.protobuf.json_format import MessageToDict
def main(args):
addr = args.ip + ":" + str(args.port)
channel = grpc.insecure_channel(addr)
# TODO : make stub and call your own declared function
stub = sample_pb2_grpc.SampleServiceStub(channel)
req = sample_pb2.OperandMessage(
operand_1 = 3,
operand_2 = 5
)
res = stub.AddRequest(req)
print(res)
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--ip", default="127.0.0.1", help="IP address")
parser.add_argument("--port", default=13271, help="Port number")
args = parser.parse_args()
return args
if __name__=="__main__":
args = parse_arguments()
main(args)