Reliable Ordered Messages

javawork·2022년 7월 18일
0

원문

https://gafferongames.com/post/reliable_ordered_messages/

Introduction

UDP로 TCP의 신뢰성을 구현하려고 하는 경우가 많습니다. 하지만 굳이 TCP와 같은 방식을 사용할 필요는 없습니다. Reliable한 메세지를 구현하는 방법은 여러가지가 있습니다. 좀 더 창의력을 발휘하여 TCP보다 개선되고 더 유연한 reliable message 시스템을 구현해 봅시다.

Different Approaches

reliable-ordered와 unreliable 방식으로 패킷을 나누는 방식이 많이 사용됩니다. reliable하게 만드는 기본적인 아이디어는 반대쪽에서 받았다고 할 때까지 재전송을 하는 방식입니다. 기본적으로 TCP와 비슷한 방식입니다. 이것도 나쁘진 않지만 더 나은 방법이 있습니다.

일단 serialize 되는 방식이 정해져 있는 작은 메세지들을 구성합니다. 이는 패킷 길이를 앞에 명시하고 그 뒤로 body를 채우는 방식을 선호하지 않을 때 괜찮은 방식입니다(패킷에 작은 메세지가 많은 경우). 이미 전송된 메세지들을 큐에 보관하고 있다가 새로운 패킷이 전송될 때마다 큐에 있는 메세지들을 추가하여 전송됩니다. 이 방식이면 메세지를 재전송 할 필요가 없습니다. 수신 측에서 진짜로 받을 때까지(ack를 보낼 때까지) 이미 전송된 메세지를 계속 outgoing 패킷에 포함하는 것 입니다.

모든 unacked 메세지를 전송되는 모든 패킷에 포함하는 방법이 가장 간단합니다. 예를 들면, 모든 메세지는 sequence(seq)가 있고, 전송될 때 마다 증가합니다. 모든 전송 패킷에는 시작 메세지 id와 후속 번호의 다수의 메세지를 포함합니다. 수신 측은 지속적으로 최근 받은 메세지 id를 송신 측에 보내주고(ack), 송신 측은 ack된 메세지보다 새로운 메세지를 매 패킷에 포함합니다.

이 방식은 간단하고 구현하기 쉽지만, 대량의 메세지가 소실되면 패킷의 사이즈가 크게 증가할 수 있습니다. 전송되는 패킷에 포함되는 메세지의 개수를 제한하면 방지할 수 있습니다. 하지만 1초에 60개의 패킷 전송처럼 자주 보내는 상황이라면 ack를 받을 때까지 같은 메세지를 여러 번 전송할 수 있습니다.

Round trip time이 100ms라면 ack를 받기 전에 불필요하게 6번이나 메세지를 전송할 수 있습니다. 만일 정말 실시간 요소가 중요한 경우라면 감수할 수도 있겠지만, 대부분의 경우는 그런 대역폭은 다른 곳에 유용하게 쓰는 것이 좋습니다.

우선 순위를 매겨서 가장 중요한 n개의 메세지를 매 패킷에 포함하는 방식을 선호합니다. 이 방식은 빠른 전송과 매 패킷에 최대 n개의 메세지 만을 포함한다는 원칙을 모두 만족 시킬 수 있습니다.

Packet Level Acks

패킷 레벨 ack를 위해 아래의 패킷 헤더를 추가합니다.

struct Header
{
    uint16_t sequence;
    uint16_t ack;
    uint32_t ack_bits;
};

이 헤더는 ack시스템을 구축하기 위한 조합된 요소입니다. sequence는 전송 시마다 증가하는 숫자, ack 는 가장 최근에 받은 sequence, ack_bits는 다수의 ack된 패킷을 표현하는 비트 필드입니다.

ack_bits의 n번째 비트가 set되어있으면 ack - n 의 sequence를 받은 것 입니다. ack_bits 는 네트워크 대역을 절약해 줄 뿐만 아니라, 패킷 손실을 방지하기 위해 중복정보(redundancy)를 추가합니다. 모든 ack는 32번 전송됩니다. 패킷 하나가 유실되더라도, 31개의 같은 ack정보를 가진 패킷이 전송됩니다. 통계적으로 ack가 잘 전달될 가능성이 높습니다.

하지만 대량의 패킷 로스가 발생하면, 다음의 사항이 중요합니다.

  1. 어떤 패킷의 ack를 수신했다면, 그 패킷은 진짜로 잘 도착한 것
  2. 어떤 패킷의 ack를 받지 못했다면 유실되었을 확률이 크지만, ack가 유실되었을 수도 있음. 하지만 매우 드문 경우.

경험에 따르면 패킷의 ack를 완벽하게 전송할 필요는 없습니다. 이렇게 구축된 시스템에서는 ack가 유실되어도 큰 문제가 되지 않습니다. 하지만 ack가 유실되는 모든 상황을 테스트하는 것은 어려운 일입니다.

이러한 시스템을 자체 구현하고 ack 시스템이 잘 동작하는 것을 검증하려면 나쁜 네트워크 상황에서 soak 테스트를 해야 합니다. 이런 기법은 reliable.ioyojimbo에 구현되어 있습니다.

Sequence Buffers

ack 시스템을 구축하기 위해서는 패킷이 ack되었는지 기록하고 중복 ack는 무시하는(패킷은 ack_bits로 여러 번 ack됨) 데이터 structure가 송신 측에 필요합니다. 수신 측에는 받은 패킷을 기록하고 그 정보를 바탕으로 패킷 헤더의 ack_bits를 채워 줄 수 있는 데이터 structure가 필요합니다.

해당 데이터 structure는 아래와 같은 조건을 만족해야 합니다.

  • 상수 시간에 insert
  • 상수 시간에 조회
  • 상수 시간에 삭제

보통 해쉬 테이블을 떠올리겠지만 더 간단한 방법이 있습니다.

const int BufferSize = 1024;
 
uint32_t sequence_buffer[BufferSize];
 
struct PacketData
{
    bool acked;
};
 
PacketData packet_data[BufferSize];
 
PacketData * GetPacketData( uint16_t sequence )
{
    const int index = sequence % BufferSize;
    if ( sequence_buffer[index] == sequence )
        return &packet_data[index];
    else
        return NULL;
}

아래와 같이 index를 계산하여 롤링 버퍼에 사용하고 있습니다.

const int index = sequence % BufferSize;

오래된 데이터는 필요 없기 때문에 잘 동작합니다. sequence가 증가하면서 오래된 데이터를 자연스럽게 덮어쓰게 됩니다. sequence_buffer[index]는 현재 찾고 있는 sequence에 해당하는 값 인지를 테스트 하는데 사용됩니다. sequence 버퍼의 값이 0xFFFFFFFF이면 빈 값으로 간주하고 NULL을 리턴합니다.

데이터가 전송 큐처럼 순서대로 추가되면, sequence 버퍼의 값을 새 sequence 번호로 업데이트하고 해당 인덱스의 데이터를 덮어쓰면 됩니다.

PacketData & InsertPacketData( uint16_t sequence )
{
    const int index = sequence % BufferSize;
    sequence_buffer[index] = sequence;
    return packet_data[index];
}

하지만 수신 측에는 패킷이 순서가 바뀐 채로 도착하고 일부 패킷은 유실됩니다. 이전 sequence 값이 0xFFFFFFFF가 되기 전에, 오래된 sequence 버퍼 항목이 남아있어서 버그가 생길 수 있습니다. 이에 대한 해결책은 이전의 가장 큰 sequence와 새로 도착한 sequence 사이(새 seq가 이전 seq보다 큰 경우에만)의 모든 sequence 버퍼의 항목을 0xFFFFFFFF로 채우는 것 입니다.

데이터 structure에 ack만 사용하는 것보다 보낸 시간을 추가하면 더 많은 작업을 수행할 수 있습니다.

struct PacketData
{
    bool acked;
    double send_time;
};

이 정보를 사용하여 패킷의 RTT(round trip time)를 계산할 수 있고, 그에 대한 평균 값을 구할 수도 있습니다. 또한 평균 RTT 보다 오래 걸린 패킷을 찾아서 고유한 패킷 손실 추정치를 생성할 수도 있게 됩니다.

Ack Algorithm

이제 데이터 structure가 있고 패킷 헤더도 있습니다. 패킷 레벨 ack를 구현하기 위한 알고리즘 입니다.

보낼 때

  1. 전송 sequence 버퍼에 전송 할 seq값을 ack = false로 해서 insert
  2. 수신 sequence 버퍼에서 ack와 ack_bits 값을 생성
  3. 패킷 헤더에 ack, ack_bits 값을 세팅
  4. 패킷을 전송하고 전송 패킷 sequence값을 증가

받을 때

  1. 받은 패킷 헤더에서 sequence 값을 읽음
  2. 받은 sequence 값이 저장된 sequence 값보다 크면, 받은 sequence 값으로 덮어씀
  3. 수신 sequence 버퍼에 insert
  4. 패킷 헤더의 ack_bits를 해석
  5. 전송한 sequence 버퍼를 순회하며 acked 값을 업데이트 - OnPacketAcked( sequence )

이 알고리즘은 서버/클라이언트 양쪽에 각자 고유의 sequence값을 가지고 동일하게 적용되어야 합니다.

이제 상대방이 패킷을 수신했다는 것을 알려주는 OnPacketAcked 콜백이 있습니다. 이 ack 시스템의 주요 이점은 어떤 패킷이 수신 되었는지 알 수 있다는 점입니다. 신뢰성 있는 메세지 시스템을 구축한 것에만 국한되지 않고, 객체 별로 delta 인코딩을 구현하는데 사용될 수도 있습니다(이동 좌표를 절대 값으로 보내지 않고 ack된 값에서 변경된 값만 전송하여 패킷의 사이즈를 줄일 수 있음).

Message Objects

메세지는 패킷보다 작고 직렬화(serialize)되는 방법이 동반되어 있는 객체들 입니다. 이 시스템에서는 통합된 serialize 함수를 사용해서 직렬화를 수행합니다. 직렬화 함수는 템플릿화 되어있기 때문에 한번 작성하면 읽기, 쓰기, 측정이 가능합니다.

제가 제안하는 측정 방법은 MeasureStream이라는 더미 스트림 클래스를 만들어서 이 스트림을 인자로 직렬화 함수를 호출하면 실제 직렬화를 하지 않고 기록될 비트수만 측정하는 것 입니다. 이 방법은 메세지를 패킷에 넣기 전에 사이즈를 알아내는 데에 유용합니다.

struct TestMessage : public Message
{
    uint32_t a,b,c;
 
    TestMessage()
    {
        a = 0;
        b = 0;
        c = 0;
    }
 
    template <typename Stream> bool Serialize( Stream & stream )
    {
        serialize_bits( stream, a, 32 );
        serialize_bits( stream, b, 32 );
        serialize_bits( stream, c, 32 );
        return true;
    }
 
    virtual SerializeInternal( WriteStream & stream )
    {
        return Serialize( stream );
    }
 
    virtual SerializeInternal( ReadStream & stream )
    {
        return Serialize( stream );
    }
 
    virtual SerializeInternal( MeasureStream & stream )
    {
        return Serialize( stream );       
    }
};

여기서 trick은 스트림 타입 별 가상 함수에 통합 템플릿 직렬화 함수를 연결하는 것 입니다. 이제 base 메세지 포인터가 있으면 다음 동작을 수행할 수 있습니다.

Message * message = CreateSomeMessage();
message->SerializeInternal( stream );

다른 대안은 각 타입에 대한 직렬화 함수를 호출하기 전에 올바른 메세지 타입으로 캐스팅하는 커다란 switch문을 구현하는 것 입니다. 과거에 PS3에서 이런 방식으로 작업했지만, 요즘(2016년)에는 virtual table의 성능상 오버헤드에 대해서는 무시할 수준이라 이러한 작업이 필요하지 않습니다.

공통의 base 클래스에서 파생된 메세지는 직렬화, 메세지 타입 쿼리, 참조 카운팅과 같은 공통의 인터페이스를 제공합니다. 참조 카운팅은 메시지가 포인터로 전달되고 ack를 받을 때까지 메시지 전송 큐에 저장될 뿐만 아니라 C++ structure인 outgoing 패킷에도 저장되기 때문에 필요합니다.

이것은 메세지와 패킷에 포인터 형식으로 전달하여 데이터 복사를 방지하는 전략입니다. 다른 스레드로 전달 될 때는 직렬화된 버퍼 형식으로 전달됩니다. 전송 큐에 있는 메세지가 참조되지 않고(ack가 확인됨) 해당 메세지를 참조하는 패킷이 없으면 메세지가 소멸됩니다.

메세지를 생성하는 메서드도 필요합니다. 타입 별로 메시지를 생성하기 위해 재정의된 가상 함수가 있는 메시지 팩토리 클래스로 이 작업을 수행합니다. 패킷 팩토리가 전체 메시지 타입을 알고 있어야 합니다. 따라서 네트워크를 통해 받은 버퍼를 해석하여 엄격하게 메시지 타입을 직렬화하고 유효한 범위를 벗어난 메시지 타입 값을 가진 악성 패킷을 버릴 수 있습니다.

enum TestMessageTypes
{
    TEST_MESSAGE_A,
    TEST_MESSAGE_B,
    TEST_MESSAGE_C,
    TEST_MESSAGE_NUM_TYPES
};
 
// message definitions omitted
 
class TestMessageFactory : public MessageFactory
{
public:
 
    Message * Create( int type )
    {
        switch ( type )
        {
            case TEST_MESSAGE_A: return new TestMessageA();
            case TEST_MESSAGE_B: return new TestMessageB();
            case TEST_MESSAGE_C: return new TestMessageC();
        }
    }
 
    virtual int GetNumTypes() const
    {
        return TEST_MESSAGE_NUM_TYPES;
    }
};

이러한 코드는 보통 매크로로 단순화 시키게 됩니다.

Reliable Ordered Message Algorithm

reliable-ordered 메세지를 전송하기 위한 알고리즘 입니다.

메세지를 보낼 때

  1. MasureStream을 사용하여 메세지를 직렬화하기 위해 몇 bit가 필요한지 계산
  2. 메세지 ID를 키값으로 메세지 포인터와 bit수를 sequence 버퍼에 넣음. 메세지가 마지막으로 전송된 시간을 -1로 세팅
  3. 전송 메세지 ID를 증가

패킷을 보낼 때

  1. 전송 메세지 sequence 버퍼 안에서 가장 오래된 ack되지 않은 메세지 ID와 가장 최근에 추가된 메세지 id사이를 id가 증가하는 방향으로 iterate 합니다.
  2. 수신자가 저장할 수 없는 메세지 id를 보내면 ack 시스템을 깨뜨리게 됩니다(해당 메세지는 저장되지 않지만 메세지를 포함한 패킷은 ack 될 것이기 때문에 재전송하지 않음). 가장 오래된 ack 되지 않은 메세지 id에 수신 버퍼의 크기를 더한 것과 같거나 더 큰 메세지 ID를 가진 메세지를 보내면 안됩니다.
  3. 지난 0.1초 동안 전송되지 않았고 패킷의 남은 공간에 맞는 크기의 메세지는 전송될 목록에 추가 합니다. 메세지 ID가 작을(오래될) 수록 우선 순위가 높습니다.
  4. 전송될 패킷에 메세지를 추가하고 참조 카운팅을 증가 시킵니다. 패킷 소멸자가 포함된 메세지들의 참조 카운팅을 감소 시키는지 확인 하세요.
  5. 패킷의 메시지 수와 패킷에 포함된 메시지 ID 배열을 전송 패킷 sequence 번호를 키값으로 sequence 버퍼에 저장하여 패킷 레벨 ack를 해당 패킷에 포함된 메시지 집합에 매핑하는 데 사용할 수 있습니다.
  6. 패킷을 패킷 전송 큐에 추가 합니다.

패킷을 받을 때

  1. 패킷에 포함된 메세지들을 iterate하면서 메세지 ID를 키값으로 수신 메세지 sequence 버퍼에 저장합니다.
  2. ack 시스템은 자동으로 방금 수신한 패킷의 sequence 번호를 ack 합니다.

패킷을 ack 할 때

  1. 패킷에 포함된 메세지 ID를 sequence 번호로 조회합니다.
  2. 메세지가 이미 존재하는 경우 해당 메세지를 메세지 전송 큐에서 제거하고 참조 카운팅을 감소 시킵니다.
  3. 전송 메세지 sequence 버퍼에서 이전에 ack되지 않은 것 중에서 valid 한 것이 확인된 메세지 ID와 현재 전송 메세지 ID, 둘 중에 먼저 도달한 것으로 최근 ack되지 않은 메세지 ID를 업데이트 합니다.

메세지를 받을 때

  1. 현재 수신한 메세지 ID에 해당하는 메세지가 있는지 확인 하려면 수신 메세지 sequence 버퍼를 확인합니다.
  2. 메세지가 있으면 수신 메세지 sequence 버퍼에서 제거하고 수신 메세지 ID를 증가 시킨 후에 메세지에 대한 포인터를 return합니다.
  3. 메세지가 없으면 수신할 메세지가 없는 것 입니다. NULL을 return 합니다.

요약하자면 패킷이 메세지 ack 정보를 포함해서 전송하기 전에는 그 메세지는 계속 패킷에 포함됩니다. ack를 보내기 위해 송신자 측의 데이터 structure를 활용하여 패킷 sequence 번호를 메세지 ID set에 매핑합니다. ack를 받으면 메세지를 전송 큐에서 제거합니다. 수신 측에서는 순서대로 도착하지 않은 메세지들은 메세지 ID를 키값으로 sequence 버퍼에 저장하기 때문에 보낸 순서대로 받을 수 있게 됩니다.

The End Result

보내는 측의 사용자 인터페이스

TestMessage * message = (TestMessage*) factory.Create( TEST_MESSAGE );
if ( message )
{
    message->a = 1;
    message->b = 2;
    message->c = 3;
    connection.SendMessage( message );
}

받는 측의 사용자 인터페이스

while ( true )
{
    Message * message = connection.ReceiveMessage();
    if ( !message )
        break;
 
    if ( message->GetType() == TEST_MESSAGE )
    {
        TestMessage * testMessage = (TestMessage*) message;
        // process test message
    }
 
    factory.Release( message );
}

flexible하기 때문에 원하는 기능을 추가하기에 충분합니다.

profile
게임 개발자

0개의 댓글