gRPC는 구글에서 나온 RPC이다. 다양한 언어에서 이용될 수 있으며 가볍고 빠르다.
기본적으로 RPC는 위와 같이 동작한다. 나의 stub
을 통해 다른 서버의 skeleton
에 접근하여 그 서버의 로직을 실행시킨다. 이후 결과값을 stub
을 통해 전달받는다.
이 과정을 좀 더 단계적으로 쓰면 다음처럼 된다.
stub
객체 생성 및 네트워크 연결stub
객체를 통해 원격지 메소드 실행stub
은 skeleton
에서 해당 메소드 실행 요청skeleton
은 그 요청을 받아서 메소드를 실행시킴stub
에서 결과값 전달받음의사코드로 위 과정을 표현하자면...
// stub 객체 생성
let stub := new Stub()
// 원격지 연결
stub.connect()
// 원격지 메소드 실행 요청 및 파라미터 전달
stub.execute("method", parameters)
server.listen(request => {
// 원격지에서는 요청을 받아서 해당 기능 실행
let response := this[request.method](request.parameters)
// 결과값 반환
return response
})
gRPC를 쓰면 이 과정을 쉽게 구현할 수 있다.
gRPC에서는 ProtoBuf
라고 하는 IDL(Interface Description Language, 인터페이스 정의 언어)을 이용하여 stub
과 skeleton
이 주고받을 메시지를 정의한다. 또한, 그 파일을 컴파일하여 생성된 각 언어별 파일을 이용하여 개발에 이용할 수 있다.
본 시스템에서 이용한 ProtoBuf
는 다음과 같다.
// msg.proto
syntax = "proto3";
package msg;
option go_package = "github.com/protocol-diver/grpc-peer/msg";
service Message {
rpc MessageSend(MessageSendRequest) returns (MessageSendResponse);
}
message MessageSendRequest {
int32 sender = 1;
int32 receiver = 2;
string message = 3;
}
message MessageSendResponse {
int32 sender = 1;
int32 receiver = 2;
string message = 3;
int32 id = 4;
string error = 5;
}
MessageSendRequest
를 통해 skeleton
의 MessageSend
실행을 요청하며, 이에 대해 응답으로 MessageSendResponse
를 받는다.
이 ProtoBuf
파일을 가지고
protoc -I=. --go_out . --go_opt paths=source_relative --go-grpc_out . --go-grpc_opt paths=source_relative msg/msg.proto
위 명령어를 통해 go 파일을 얻을 수 있다.
분산 시스템에서의 피어는 서로 데이터를 주고 받으며 소통할 수 있다. 본 시스템에서는 피어를 구현하기 위해 stub
과 skeleton
의 역할을 모두 수행 가능한 구조체를 정의하였다.
type Peer struct {
Id int32
Port int
Clients map[int]msg.MessageClient
Server msg.MessageServer
Dial Dial
}
Peer
구조체는 필드로 자신이 실행시킬 서버, 그리고 다른 피어들에 연결될 클라이언트들을 가진다. 다른 피어 하나당 클라이언트 객체가 필요하므로 맵으로 정의하였으며, 키는 그 클라이언트가 연결되는 포트 번호이다.
Dial
은 새로운 클라이언트를 생성하는 인터페이스이다.
type Dial interface {
NewClient(port int) (msg.MessageClient, error)
}
다른 피어로 메시지 전송을 시도할 때 Clients
에 없으면 Dial
을 이용해서 맵에 새로운 클라이언트를 추가시킨다.
이 인터페이스는 Peer
객체를 생성하는 곳에서 구현하여 멤버변수로 집어넣는다.
func NewClient(port int) (msg.MessageClient, error) {
conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return msg.NewMessageClient(conn), nil
}
msg.MessageClient
는 아까 ProtoBuf
를 protoc
를 통해 컴파일한 결과물 내에 정의되어 있으며, NewClient
는 피어의 포트를 입력받아 msg.MessageClient
를 새로 생성하는 함수이다.
type Server struct {
msg.MessageServer
}
func (m *Server) MessageSend(ctx context.Context, req *msg.MessageSendRequest) (*msg.MessageSendResponse, error) {
res := &msg.MessageSendResponse{
Sender: req.Sender,
Receiver: req.Receiver,
Message: req.Message,
}
f, err := os.OpenFile(fmt.Sprintf("./data/%d.csv", req.Sender), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0755)
if err != nil {
res.Error = err.Error()
return res, err
}
defer f.Close()
x := fmt.Sprintf(
"timstamp: %d, sender: %d, receiver: %d, message: %s\n",
time.Now().Unix(),
req.Sender,
req.Receiver,
req.Message,
)
f.WriteString(x)
b := []byte{}
f.Read(b)
res.Id = int32(bytes.Count(b, []byte{10})) // [10] means "\n"
return res, nil
}
Server
는 서버를 구현하기 위한 구조체이다. msg.MessageServer
는 ProtoBuf
의 service Message
를 Go로 표현한 인터페이스이며, 마찬가지로 자동생성되었다. 내용은 아래와 같다.
type MessageServer interface {
MessageSend(context.Context, *MessageSendRequest) (*MessageSendResponse, error)
mustEmbedUnimplementedMessageServer()
}
이걸 구현해야 하지만 mustEmbedUnimplementedMessageServer()
라는 프라이빗 메소드의 이름에서 알 수 있듯이 임베딩 한 뒤 필요한 메소드들만 구현해야 한다. 따라서 Server
구조체는 msg.MessageServer
를 임베딩 한 뒤 MessageSend
메소드를 구현하였다.
MessageSend
메소드에서는 파라미터로 받은 MessageSendRequest
를 가지고 자신의 csv 파일에 로그를 추가한 뒤 MessageSendResponse
를 반환한다. 클라이언트로부터 들어온 요청을 처리하는, 말 그대로 서버가 할 일을 정의한 것이다.
var (
colors = []int{31, 32, 33, 34, 35, 36, 37}
using = make(map[int]*int, len(colors))
)
func init() {
for _, color := range colors {
using[color] = nil
}
}
각 피어들을 좀 더 쉽게 볼 수 있도록 피어별로 컬러코드를 부여하기 위해 init
함수에서 피어별로 사용한 컬러코드를 기록할 수 있는 using
맵을 만들었다. 또한 colors
슬라이스에는 전체 컬러코드가 담겨있다.
이것은 나중에 로깅할 때 \033[31m
이런 식으로 이용될 예정이다.
type Dial struct{}
func (d *Dial) NewClient(port int) (msg.MessageClient, error) {
return NewClient(port)
}
func NewPeer(port int) *model.Peer {
peer := &model.Peer{
Port: port,
Dial: &Dial{},
Server: &Server{},
Clients: make(map[int]msg.MessageClient),
}
for {
colorIdx := rand.Intn(len(colors))
color := colors[colorIdx]
if using[color] == nil {
using[color] = &port
peer.Id = int32(color)
break
}
}
go peer.Listen()
logger.Log(peer.Id, "Peer created: %s", peer.String())
return peer
}
위의 Peer
구조체의 새 객체를 반환하기 위한 함수로, NewPeer
는 피어의 포트 번호를 입력받아 새 Peer
객체를 만들어 실행시킨 뒤 반환한다.
Dial
인터페이스를 구현한 구조체도 초기화 할 때 필드로 넣어주었다.
이제 이 피어들이 서로 데이터를 전송하고 수신하는 걸 테스트 해봐야 한다.
이를 위해 메인 함수에는 다음과 같은 코드를 작성하였다.
ports := []int{3100, 3101, 3102, 3103, 3104, 3105, 3106, 3107}
for _, port := range ports {
go func(port int) {
p := peer.NewPeer(port)
time.Sleep(time.Second * 3) // wait 3s to start every peer listening
for i := int64(0); ; i++ {
// select target port
targetPort := port
for ; targetPort == port; targetPort = ports[rand.Intn(len(ports))] {
}
res, err := p.Send(port, targetPort, "p%d-%d-%03d", p.Port, targetPort, i)
if err != nil {
logger.Log(p.Id, err.Error())
} else {
logger.Log(p.Id, res.String())
}
i++
// sleep 1.5s ~ 3s
ms := rand.Intn(1500) + 1500
time.Sleep(time.Millisecond * time.Duration(ms))
}
}(port)
}
time.Sleep(time.Second * 30)
테스트용 피어는 8개이며 3100~3107 포트에서 동작한다.
각 피어들은 랜덤으로 선정된 다른 피어에 메시지를 전송하며, 1.5초에서 3초의 텀을 가진다.
전체 시스템은 30초간 동작한다.
동작 과정은 위 영상과 같다.