gRPC-Gateway + Python : 1. gRPC 서버 구현

HnBrd·2023년 7월 13일
0

파이썬

목록 보기
3/3
post-thumbnail

0. 서론

회사의 서비스 구조가 MSA에 가까운 형태이기 때문에 지속적으로 작은 크기의 모듈들을 만들고 있고, 그 작은 모듈들을 gRPC 서비스로 만들어서 다른 부서로 넘기고 있다. 그러다보니 계속 같은 코드를 작성하게되고, REST 까지 구현하다보면 중복으로 작성하는 코드가 생기게 된다.

따라서 빠른 구현 및 배포를 위해 템플릿 형태로 만들어서 놓으려 한다.
Golang, protobuf 및 protobuf compiler 에 대해서 무지한 상태로 적은 글이니, 피드백은 감사합니다.

gRPC / gRPC-gateway

gRPC를 사용해야 하는 이유가 있다면?

  1. 높은 압축률 = 빠른 통신
  2. HTTP/2 기능 지원 (ex. 양방향 스트리밍)
  3. 오해의 소지가 없는 명확한 API 명세

gRPC에 대한 동작 원리에 대한 글이 아니므로 그런 기술적인 부분이 궁금할 때 참고할 수 있는 링크를 추가해두었다.

어쨌거나 gRPC의 명확한 장점들에도 불구하고 gRPC를 사용하면서 생기는 불만

  1. proto 파일 컴파일을 따로 해줘야하는 귀찮음
  2. 결국에는 REST도 만들어달라고 함

다행스럽게도, gRPC ecosystem 에서는 gRPC의 부족한 부분을 채워줄 수 있는 라이브러리 묶음(?)을 제공하고 있다. 본 글에서는 간단하게 더하기, 빼기를 수행하는 서버를 구현하고, gRPC ecosystem 의 gRPC-gateway 레포지토리를 참고하여 REST 요청을 위한 리버스 프록시를 생성하고 Swagger-UI까지 적용해보도록 하겠다.

디렉토리 구조

우선 디렉토리 구조를 다음과 같이 잡고 시작한다.

root
├ grpc_client.py
├ grpc_server.py
├ config.yaml
├ proto/
├── sample.proto
  • proto 관련 파일을 몰아넣을 목적으로 /proto 디렉토리 생성
    • 실제 proto 파일은 /proto/sample.proto
  • 테스트 목적의 gRPC 클라이언트용으로 grpc_client.py 파일 생성
  • gRPC 서버용으로는/grpc_server.py 생성
  • 자주 변경되지 않는 서버의 설정을 담을 목적의 config.yaml 를 프로젝트 최상단에 생성
    • config.yaml 파일에는 일단 MAX_WORKERS : 2 라고 한 줄 써두면 된다 (서버 스레드 개수)

1. proto

/proto/sample.proto 에 더하기 빼기용 proto를 먼저 정의하였다.

syntax = "proto3";

option go_package="sample_service/gateway";
package sample_service.sample_module;

service SampleService {
  // 더하기용 rpc
  rpc AddRequest (OperandMessage) returns (ResultResponse) {};
  // 빼기용 rpc
  rpc SubRequest (OperandMessage) returns (ResultResponse) {}
};

// 피연산자를 담는 메시지
message OperandMessage {
  int32 operand_1 = 1;
  int32 operand_2 = 2;
}

// 결과를 다믄 메시지
message ResultResponse {
  int32 result = 1;
}
  • 더하기, 빼기용으로 rpc 를 각각 선언하고,
  • 두 개의 피연산자를 담을 구조인 message OperandMessage를 만들었다.
  • 결과는 int32 형의 숫자 하나로 받게 될 것이다.

이제 작성한 proto 를 컴파일해야 한다.

python -m grpc_tools.protoc -I./proto --python_out=./proto --grpc_python_out=./proto ./proto/sample.proto

패키지 가 없어서 오류가 발생한다면 패키지를 설치하면 된다.

pip install grpcio grpcio-tools

proto 디렉토리 내에 컴파일된 아이들이 생겨난 것을 볼 수 있다.
잘 생성된 것이 확인됐으면 이제 gRPC 서버를 구현해보자.

2. Sample Server

패키지 임포트

import os
import sys
import argparse
from datetime import datetime
import yaml
import logging
from concurrent import futures
import pprint
import grpc

먼저 필요한 패키지들을 임포트한다.
앞서 proto 를 컴파일 했기 때문에 가져다 쓸 수 있을 것이다. 그런데 관련 패키지들은 ./proto 라는 다른 디렉토리에 존재하므로

sys.path.append("./proto")

를 통해서 패키지를 찾을 수 있도록 경로를 추가한다. 이후에

from proto import sample_pb2
from proto import sample_pb2_grpc

를 통해서 proto 패키지를 임포트 한다.

main

if __name__=="__main__":
	# 인자 파싱
    args = parse_arguments()
    # 설정 파일 파싱
    cfg = load_config(args.config)
    # 로거 생성
    logger = init_logger()
    
    # argument, config, logger를 넘겨서 서버 실행
    serve(args, cfg, logger)

다음으로는 본격적으로 서버를 실행하기 이전에 main 구현. 프로그램 실행 시 받은 인자, 설정 파일을 파싱하고, 로거를 생성해서 serve 함수에 넘긴다.

logger, arguments, config

def parse_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", type=str, default=13271, help="Server port number")
    parser.add_argument("--config", default="./config.yaml", help="Path to .yaml config file")
    args = parser.parse_args()
    return args

def init_logger():
    logger = logging.getLogger("gRPC SERVER")
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    logger.setLevel(logging.DEBUG)
    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(formatter)
    logger.addHandler(stream_handler)
    return logger

def load_config(config):
    with open(config) as f:
        cfg = yaml.load(f, Loader=yaml.SafeLoader)
    print("CONFIG INFORMATION")
    pprint.PrettyPrinter(indent=2).pprint(cfg)
    return cfg

앞서 main 함수에서 사용했던 함수들을 실제로 구현할 차례이다.

먼저 parse_arguments() 를 통해 설정 파일의 위치를 받고, 설정 파일을 로드하는 방식으로 구현했다.

다음으로 서버에서 사용할 Logger 객체를 반환하는 함수를 만들었다.

이 세 가지가 전부 필요하지 않을 수 있으니, 그때 그때 수정해서 사용하는게 좋겠다. 예를 들어,

  • 포트 번호를 지정하는 부분이 args 에 있는데, 자주 변경하지 않는다면 config 로 이동
  • 단일 config 파일을 사용한다면 굳이 args 에서 config 파일의 경로를 지정할 필요가 없으니 삭제
  • 로거의 경우 로깅 레벨이 DEBUG 로 고정되어 있으므로, args 에 --quiet, --verbose 과 같은 인자를 추가해서 로깅 레벨을 조절

serve

def serve(args, cfg, logger):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=cfg['MAX_WORKERS']))
    sample_pb2_grpc.add_SampleServiceServicer_to_server(Servicer(cfg, logger), server)
    server.add_insecure_port('[::]:' + str(args.port))
    server.start()
    logger.info("gRPC server initialized")
    server.wait_for_termination()

프로그램을 시작하고 난 뒤에 서버를 실행하기 위한 설정들이 다 로드되고 나면, 해당 정보들이 serve 함수로 넘어오게 된다. 이 함수에서는 그 정보들을 바탕으로 서버 클래스를 인스턴스화하는 작업을 수행하면 된다.

server.wait_for_termination()

을 추가하지 않으면 아무것도 수행하지 않은 채 프로그램이 종료되버리니 필수!

앞서 config.yaml 에 적어놓은 MAX_WORKERS 값을 사용해서 쓰레드 풀의 인자로 넣으면 된다 (쓰레드 풀에 max_workers 의 수는 동시에 처리해야하는 양방향 통신의 최대 개수)
따라서 워커의 수를 1 로 설정해버리면 경우에 따라 데드락이 발생해서 서비스가 원활하지 않을 수 있으니 주의

class Service

class Servicer(sample_pb2_grpc.SampleServiceServicer):
    def __init__(self, cfg, logger):
        self.cfg = cfg
        self.logger = logger

    def log_current_time(self):
        req_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
        self.logger.info(f"Client request for {sys._getframe(1).f_code.co_name} at {req_time}")

    def AddRequest(self, req, ctx):
        self.log_current_time()
        operand_1 = req.operand_1
        operand_2 = req.operand_2
        return sample_pb2.ResultResponse(result=operand_1+operand_2)

    def SubRequest(self, req, ctx):
        self.log_current_time()
        operand_1 = req.operand_1
        operand_2 = req.operand_2
        return sample_pb2.ResultResponse(result=operand_1-operand_2)

서버 클래스는 sample_pb2_grpc.SampleServiceService 로 정의된 클래스를 상속받은 뒤
proto 파일에서 정의했던 함수 이름을 가져와서 기능을 구현하면 된다.

결과는 sample_pb2 의 메시지 객체에 담아서 반환

3. Sample Client

서버가 잘 작동하는지 테스트 해볼 목적으로 간단한 클라이언트를 구현했다.
마찬가지로 main과 proto 파일들을 임포트하고

import os
import sys
import grpc
sys.path.append("./proto")
import yaml
import argparse

from proto import sample_pb2
from proto import sample_pb2_grpc
# 결과를 dict로 바꿔서 출력하기 위해 다음 함수를 임포트
from google.protobuf.json_format import MessageToDict

...

def parse_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", default="127.0.0.1", help="IP address")
    parser.add_argument("--port", default=13271, help="Port number")
    args = parser.parse_args()
    return args

if __name__=="__main__":
    args = parse_arguments()
    main(args)

메시지를 전달하기 위한 channel 을 만들고 stub 을 정의

def main(args):
	addr = args.ip + ":" + str(args.port)
    channel = grpc.insecure_channel(addr)
    stub = sample_pb2_grpc.SampleServiceStub(channel)
    ...

빡세게 테스트할 것이 아니기 때문에 대충 3 이랑 5 넣어서 하드코딩

def main(args):
	...
    req = sample_pb2.OperandMessage(
        operand_1 = 3,
        operand_2 = 5
    )
    ...

그러고 나면 stub 를 사용해서 메시지를 보내고 결과를 출력

def main(args):
    ...
    res = stub.AddRequest(req)
    print(res)

4. 실행

python grpc_server.py

로 서버를 실행하면

그 다음 클라이언트를 실행해서 결과가 잘 출력되는지 확인

메시지에 값으로 35 를 주고 더하기 연산 을 요청하고 난 뒤에 받은 결과값이 8 로 제대로 나오는 것을 확인할 수 있다.
그럼 이제 기본 gRPC 서버 구현은 마친 셈이다. 다음 글에서는 서버에서 REST 요청을 처리할 수 있도록 리버스 프록시 서버를 구현해 볼 것이다.

최종 코드는 다음과 같다.

/grpc_server.py

import os
import sys
import argparse
from datetime import datetime
import yaml
import logging
from concurrent import futures
import pprint
import grpc
sys.path.append("./proto")
# TODO : import your generated proto module sample_pb2 -> your_service_pb2
from proto import sample_pb2
from proto import sample_pb2_grpc

class Servicer(sample_pb2_grpc.SampleServiceServicer):
    def __init__(self, cfg, logger):
        self.cfg = cfg
        self.logger = logger

    def log_current_time(self):
        req_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
        self.logger.info(f"Client request for {sys._getframe(1).f_code.co_name} at {req_time}")

    # TODO : modify function names - match gRPC proto
    def AddRequest(self, req, ctx):
        self.log_current_time()
        operand_1 = req.operand_1
        operand_2 = req.operand_2
        return sample_pb2.ResultResponse(result=operand_1+operand_2)

    def SubRequest(self, req, ctx):
        self.log_current_time()
        operand_1 = req.operand_1
        operand_2 = req.operand_2
        return sample_pb2.ResultResponse(result=operand_1-operand_2)


def serve(args, cfg, logger):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=cfg['MAX_WORKERS']))
    # TODO : modify sample_pb2_grpc -> your_service_pb2_grpc
    sample_pb2_grpc.add_SampleServiceServicer_to_server(Servicer(cfg, logger), server)
    server.add_insecure_port('[::]:' + str(args.port))
    server.start()
    logger.info("gRPC server initialized")
    server.wait_for_termination()

def parse_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", type=str, default=13271, help="Server port number")
    parser.add_argument("--config", default="./config.yaml", help="Path to .yaml config file")
    args = parser.parse_args()
    return args

def init_logger():
    logger = logging.getLogger("gRPC SERVER")
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    logger.setLevel(logging.DEBUG)
    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(formatter)
    logger.addHandler(stream_handler)
    return logger

def load_config(config):
    with open(config) as f:
        cfg = yaml.load(f, Loader=yaml.SafeLoader)
    print("CONFIG INFORMATION")
    pprint.PrettyPrinter(indent=2).pprint(cfg)
    return cfg

if __name__=="__main__":
    args = parse_arguments()
    cfg = load_config(args.config)
    logger = init_logger()
    serve(args, cfg, logger)

/grpc_client.py

import os
import sys
import grpc
sys.path.append("./proto")
import yaml
import argparse

# TODO : import your own generated grpc modules
from proto import sample_pb2
from proto import sample_pb2_grpc
from google.protobuf.json_format import MessageToDict

def main(args):
    addr = args.ip + ":" + str(args.port)
    channel = grpc.insecure_channel(addr)
    # TODO : make stub and call your own declared function
    stub = sample_pb2_grpc.SampleServiceStub(channel)
    req = sample_pb2.OperandMessage(
        operand_1 = 3,
        operand_2 = 5
    )
    res = stub.AddRequest(req)
    print(res)

def parse_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", default="127.0.0.1", help="IP address")
    parser.add_argument("--port", default=13271, help="Port number")
    args = parser.parse_args()
    return args

if __name__=="__main__":
    args = parse_arguments()
    main(args)
profile
잡식

0개의 댓글

관련 채용 정보