Go gRPC 이용해서 피어 노드 구현하기

박재훈·2022년 12월 3일
0

GO

목록 보기
8/19
post-thumbnail

gRPC는 구글에서 나온 RPC이다. 다양한 언어에서 이용될 수 있으며 가볍고 빠르다.

gRPC

기본적으로 RPC는 위와 같이 동작한다. 나의 stub을 통해 다른 서버의 skeleton에 접근하여 그 서버의 로직을 실행시킨다. 이후 결과값을 stub을 통해 전달받는다.

이 과정을 좀 더 단계적으로 쓰면 다음처럼 된다.

  • stub 객체 생성 및 네트워크 연결
  • stub 객체를 통해 원격지 메소드 실행
  • stubskeleton에서 해당 메소드 실행 요청
  • skeleton은 그 요청을 받아서 메소드를 실행시킴
  • stub에서 결과값 전달받음

의사코드로 위 과정을 표현하자면...

// stub 객체 생성
let stub := new Stub()
// 원격지 연결
stub.connect()
// 원격지 메소드 실행 요청 및 파라미터 전달
stub.execute("method", parameters)

server.listen(request => {
	// 원격지에서는 요청을 받아서 해당 기능 실행
	let response := this[request.method](request.parameters)
    // 결과값 반환
    return response
})

gRPC를 쓰면 이 과정을 쉽게 구현할 수 있다.

ProtoBuf

gRPC에서는 ProtoBuf라고 하는 IDL(Interface Description Language, 인터페이스 정의 언어)을 이용하여 stubskeleton이 주고받을 메시지를 정의한다. 또한, 그 파일을 컴파일하여 생성된 각 언어별 파일을 이용하여 개발에 이용할 수 있다.

본 시스템에서 이용한 ProtoBuf는 다음과 같다.

// msg.proto

syntax = "proto3";

package msg;

option go_package = "github.com/protocol-diver/grpc-peer/msg";

service Message {
    rpc MessageSend(MessageSendRequest) returns (MessageSendResponse);
}

message MessageSendRequest {
    int32 sender = 1;
    int32 receiver = 2;
    string message = 3;
}

message MessageSendResponse {
    int32 sender = 1;
    int32 receiver = 2;
    string message = 3;
    int32 id = 4;
    string error = 5;
}

MessageSendRequest를 통해 skeletonMessageSend 실행을 요청하며, 이에 대해 응답으로 MessageSendResponse를 받는다.

ProtoBuf 파일을 가지고

protoc -I=. --go_out . --go_opt paths=source_relative --go-grpc_out . --go-grpc_opt paths=source_relative msg/msg.proto

위 명령어를 통해 go 파일을 얻을 수 있다.

Peer

분산 시스템에서의 피어는 서로 데이터를 주고 받으며 소통할 수 있다. 본 시스템에서는 피어를 구현하기 위해 stubskeleton의 역할을 모두 수행 가능한 구조체를 정의하였다.

type Peer struct {
	Id      int32
	Port    int
	Clients map[int]msg.MessageClient
	Server  msg.MessageServer
	Dial    Dial
}

Peer 구조체는 필드로 자신이 실행시킬 서버, 그리고 다른 피어들에 연결될 클라이언트들을 가진다. 다른 피어 하나당 클라이언트 객체가 필요하므로 맵으로 정의하였으며, 키는 그 클라이언트가 연결되는 포트 번호이다.

Dial은 새로운 클라이언트를 생성하는 인터페이스이다.

type Dial interface {
	NewClient(port int) (msg.MessageClient, error)
}

다른 피어로 메시지 전송을 시도할 때 Clients에 없으면 Dial을 이용해서 맵에 새로운 클라이언트를 추가시킨다.
이 인터페이스는 Peer 객체를 생성하는 곳에서 구현하여 멤버변수로 집어넣는다.

Client

func NewClient(port int) (msg.MessageClient, error) {
	conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		return nil, err
	}
	return msg.NewMessageClient(conn), nil
}

msg.MessageClient는 아까 ProtoBufprotoc를 통해 컴파일한 결과물 내에 정의되어 있으며, NewClient는 피어의 포트를 입력받아 msg.MessageClient를 새로 생성하는 함수이다.

Server

type Server struct {
	msg.MessageServer
}

func (m *Server) MessageSend(ctx context.Context, req *msg.MessageSendRequest) (*msg.MessageSendResponse, error) {
	res := &msg.MessageSendResponse{
		Sender:   req.Sender,
		Receiver: req.Receiver,
		Message:  req.Message,
	}

	f, err := os.OpenFile(fmt.Sprintf("./data/%d.csv", req.Sender), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0755)
	if err != nil {
		res.Error = err.Error()
		return res, err
	}
	defer f.Close()

	x := fmt.Sprintf(
		"timstamp: %d, sender: %d, receiver: %d, message: %s\n",
		time.Now().Unix(),
		req.Sender,
		req.Receiver,
		req.Message,
	)
	f.WriteString(x)
	b := []byte{}
	f.Read(b)
	res.Id = int32(bytes.Count(b, []byte{10})) // [10] means "\n"

	return res, nil
}

Server는 서버를 구현하기 위한 구조체이다. msg.MessageServerProtoBufservice Message를 Go로 표현한 인터페이스이며, 마찬가지로 자동생성되었다. 내용은 아래와 같다.

type MessageServer interface {
	MessageSend(context.Context, *MessageSendRequest) (*MessageSendResponse, error)
	mustEmbedUnimplementedMessageServer()
}

이걸 구현해야 하지만 mustEmbedUnimplementedMessageServer()라는 프라이빗 메소드의 이름에서 알 수 있듯이 임베딩 한 뒤 필요한 메소드들만 구현해야 한다. 따라서 Server 구조체는 msg.MessageServer를 임베딩 한 뒤 MessageSend 메소드를 구현하였다.

MessageSend 메소드에서는 파라미터로 받은 MessageSendRequest를 가지고 자신의 csv 파일에 로그를 추가한 뒤 MessageSendResponse를 반환한다. 클라이언트로부터 들어온 요청을 처리하는, 말 그대로 서버가 할 일을 정의한 것이다.

Initialization

var (
	colors = []int{31, 32, 33, 34, 35, 36, 37}
	using  = make(map[int]*int, len(colors))
)

func init() {
	for _, color := range colors {
		using[color] = nil
	}
}

각 피어들을 좀 더 쉽게 볼 수 있도록 피어별로 컬러코드를 부여하기 위해 init 함수에서 피어별로 사용한 컬러코드를 기록할 수 있는 using 맵을 만들었다. 또한 colors 슬라이스에는 전체 컬러코드가 담겨있다.
이것은 나중에 로깅할 때 \033[31m 이런 식으로 이용될 예정이다.

Peer

type Dial struct{}

func (d *Dial) NewClient(port int) (msg.MessageClient, error) {
	return NewClient(port)
}

func NewPeer(port int) *model.Peer {
	peer := &model.Peer{
		Port:    port,
		Dial:    &Dial{},
		Server:  &Server{},
		Clients: make(map[int]msg.MessageClient),
	}
	for {
		colorIdx := rand.Intn(len(colors))
		color := colors[colorIdx]
		if using[color] == nil {
			using[color] = &port
			peer.Id = int32(color)
			break
		}
	}

	go peer.Listen()
	logger.Log(peer.Id, "Peer created: %s", peer.String())

	return peer
}

위의 Peer 구조체의 새 객체를 반환하기 위한 함수로, NewPeer는 피어의 포트 번호를 입력받아 새 Peer 객체를 만들어 실행시킨 뒤 반환한다.
Dial 인터페이스를 구현한 구조체도 초기화 할 때 필드로 넣어주었다.

Test

이제 이 피어들이 서로 데이터를 전송하고 수신하는 걸 테스트 해봐야 한다.
이를 위해 메인 함수에는 다음과 같은 코드를 작성하였다.

ports := []int{3100, 3101, 3102, 3103, 3104, 3105, 3106, 3107}

for _, port := range ports {
	go func(port int) {
		p := peer.NewPeer(port)
		time.Sleep(time.Second * 3) // wait 3s to start every peer listening

		for i := int64(0); ; i++ {
			// select target port
			targetPort := port
			for ; targetPort == port; targetPort = ports[rand.Intn(len(ports))] {
			}

			res, err := p.Send(port, targetPort, "p%d-%d-%03d", p.Port, targetPort, i)
			if err != nil {
				logger.Log(p.Id, err.Error())
			} else {
				logger.Log(p.Id, res.String())
			}
			i++

			// sleep 1.5s ~ 3s
			ms := rand.Intn(1500) + 1500
			time.Sleep(time.Millisecond * time.Duration(ms))
		}
	}(port)
}

time.Sleep(time.Second * 30)

테스트용 피어는 8개이며 3100~3107 포트에서 동작한다.
각 피어들은 랜덤으로 선정된 다른 피어에 메시지를 전송하며, 1.5초에서 3초의 텀을 가진다.
전체 시스템은 30초간 동작한다.

동작 과정은 위 영상과 같다.

profile
코딩 좋아합니다

0개의 댓글