SendBuffer & PacketSession

Eunho Bae·2022년 5월 5일
0

코드

ServerCore

Session.cs

namespace ServerCore
{
	public abstract class PacketSession : Session
	{
		public static readonly int HeaderSize = 2;

		// size = size 포함한 전체 패킷 크기
		// [size(2)][packetId(2)][ ... ][size(2)][packetId(2)][ ... ]
		public sealed override int OnRecv(ArraySegment<byte> buffer) // 오버라이드 다시 불가
		{
			int processLen = 0;

			while (true)
			{
				// 최소한 헤더는 파싱할 수 있는지 확인
				if (buffer.Count < HeaderSize)
					break;

				// 패킷이 완전체로 도착했는지 확인
				ushort dataSize = BitConverter.ToUInt16(buffer.Array, buffer.Offset); // ushort
				if (buffer.Count < dataSize)
					break;

				// 여기까지 왔으면 패킷 조립 가능. new 사용했다고 힙에다 할당해주는게 아니라 스택 복사
				OnRecvPacket(new ArraySegment<byte>(buffer.Array, buffer.Offset, dataSize));
				
				processLen += dataSize;
				// [size(2)][packetId(2)][ ... ] 다음 부분으로 위치 변경
				buffer = new ArraySegment<byte>(buffer.Array, buffer.Offset + dataSize, buffer.Count - dataSize);
			}

			return processLen;
		}

		public abstract void OnRecvPacket(ArraySegment<byte> buffer);
	}

	public abstract class Session
	{
		Socket _socket;
		int _disconnected = 0;

		RecvBuffer _recvBuffer = new RecvBuffer(1024);

		object _lock = new object();
		Queue<ArraySegment<byte>> _sendQueue = new Queue<ArraySegment<byte>>();
		List<ArraySegment<byte>> _pendingList = new List<ArraySegment<byte>>();
		SocketAsyncEventArgs _sendArgs = new SocketAsyncEventArgs();
		SocketAsyncEventArgs _recvArgs = new SocketAsyncEventArgs();

		public abstract void OnConnected(EndPoint endPoint);
		public abstract int  OnRecv(ArraySegment<byte> buffer); // 얼마만큼 데이터를 처리했는지 리턴
		public abstract void OnSend(int numOfBytes);
		public abstract void OnDisconnected(EndPoint endPoint);

		public void Start(Socket socket)
		{
			_socket = socket;

			_recvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnRecvCompleted);
			_sendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendCompleted);

			RegisterRecv();
		}

		public void Send(ArraySegment<byte> sendBuff)
		{
			lock (_lock)
			{
				_sendQueue.Enqueue(sendBuff);
				if (_pendingList.Count == 0)
					RegisterSend();
			}
		}

		public void Disconnect()
		{
			if (Interlocked.Exchange(ref _disconnected, 1) == 1)
				return;

			OnDisconnected(_socket.RemoteEndPoint);
			_socket.Shutdown(SocketShutdown.Both);
			_socket.Close();
		}

		#region 네트워크 통신

		void RegisterSend()
		{
			while (_sendQueue.Count > 0)
			{
				ArraySegment<byte> buff = _sendQueue.Dequeue();
				_pendingList.Add(buff);
			}
			_sendArgs.BufferList = _pendingList;

			bool pending = _socket.SendAsync(_sendArgs);
			if (pending == false)
				OnSendCompleted(null, _sendArgs);
		}

		void OnSendCompleted(object sender, SocketAsyncEventArgs args)
		{
			lock (_lock)
			{
				if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
				{
					try
					{
						_sendArgs.BufferList = null;
						_pendingList.Clear();

						OnSend(_sendArgs.BytesTransferred);

						if (_sendQueue.Count > 0)
							RegisterSend();
					}
					catch (Exception e)
					{
						Console.WriteLine($"OnSendCompleted Failed {e}");
					}
				}
				else
				{
					Disconnect();
				}
			}
		}

		void RegisterRecv()
		{
			_recvBuffer.Clean(); // 커서가 너무 뒤로 가있는 상태 방지
			// 유효한 범위 설정. 다음으로 버퍼를 받을 공간 Set.
			ArraySegment<byte> segment = _recvBuffer.WriteSegment;
			_recvArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); // count=freesize, 이만큼 받을 수 있다.

			bool pending = _socket.ReceiveAsync(_recvArgs);
			if (pending == false)
				OnRecvCompleted(null, _recvArgs);
		}

		void OnRecvCompleted(object sender, SocketAsyncEventArgs args)
		{
			if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
			{
				try
				{
					// Write 커서 이동
					if (_recvBuffer.OnWrite(args.BytesTransferred) == false)
					{
						Disconnect();
						return;
					}

					// 컨텐츠 쪽으로 데이터를 넘겨주고 얼마나 처리했는지 받는다
					int processLen = OnRecv(_recvBuffer.ReadSegment);
					if (processLen < 0 || _recvBuffer.DataSize < processLen)
					{
						Disconnect();
						return;
					}

					// Read 커서 이동
					if (_recvBuffer.OnRead(processLen) == false)
					{
						Disconnect();
						return;
					}

					RegisterRecv();
				}
				catch (Exception e)
				{
					Console.WriteLine($"OnRecvCompleted Failed {e}");
				}
			}
			else
			{
				Disconnect();
			}
		}

		#endregion
	}
}

SendBuffer

namespace ServerCore
{
	public class SendBufferHelper
	{
		// 전역이지만 내 쓰레드에서만 사용할 수 있음.
		public static ThreadLocal<SendBuffer> CurrentBuffer = new ThreadLocal<SendBuffer>(() => { return null; }); // 처음 만들어질때 null만 리턴하도록

		public static int ChunkSize { get; set; } = 4096 * 100;

		public static ArraySegment<byte> Open(int reserveSize)
		{
			if (CurrentBuffer.Value == null) // SendBuffer 한번도 사용 안한 상태
				CurrentBuffer.Value = new SendBuffer(ChunkSize);

			if (CurrentBuffer.Value.FreeSize < reserveSize)
				CurrentBuffer.Value = new SendBuffer(ChunkSize); // 기존 청크 없앤 후 새롭게 할당

			return CurrentBuffer.Value.Open(reserveSize);
		}

		public static ArraySegment<byte> Close(int usedSize)
		{
			return CurrentBuffer.Value.Close(usedSize);
		}
	}

	public class SendBuffer
	{
		// RecvBuffer처럼 Clean이 없는 이유는 내가 사용이 끝난 부분 이전 부분을 다른 세션에서 Session클래스의 _sendQueue에 넣어논 상태일 수 있기 때문에, 즉 누군가 앞부분을 참조 중인 상태일 수 있기 때문에 재위치 시킬 수 없다.
		// 패킷에 가변인자 들어가면 크기 예측하는게 까다롭기 때문에, 크게 할당 받아놓고 자르는 방식
		// [][][][][][][][][u][] : ChunkSize = 4096 * 100
		byte[] _buffer;
		int _usedSize = 0;

		public int FreeSize { get { return _buffer.Length - _usedSize; } }

		public SendBuffer(int chunkSize)
		{
			_buffer = new byte[chunkSize];
		}

		public ArraySegment<byte> Open(int reserveSize) // 요구할 예약 공간
		{
			if (reserveSize > FreeSize)
				return null;

			// 예약공간이기 때문에 usedSize 이동 x
			return new ArraySegment<byte>(_buffer, _usedSize, reserveSize); // usedSize 위치 포함해서 reserveSize만큼
		}

		public ArraySegment<byte> Close(int usedSize) // 예약 사이즈는 3이라도 실제로 2개 사용되었다면 2만큼 범위 리턴
		{
			ArraySegment<byte> segment = new ArraySegment<byte>(_buffer, _usedSize, usedSize);
			_usedSize += usedSize;
			return segment;
		}
	}
}

Server

ClientSession.cs

namespace Server
{
	class Packet
	{
		public ushort size;
		public ushort packetId;
	}

	class PlayerInfoReq : Packet
	{
		public long playerId;
	}

	class PlayerInfoOk : Packet
	{
		public int hp;
		public int attack;
	}

	public enum PacketID
	{
		PlayerInfoReq = 1,
		PlayerInfoOk = 2,
	}

	class ClientSession : PacketSession
	{
		public override void OnConnected(EndPoint endPoint)
		{
			Console.WriteLine($"OnConnected : {endPoint}");
			Thread.Sleep(5000);
			Disconnect();
		}

		public override void OnRecvPacket(ArraySegment<byte> buffer)
		{
			int pos = 0;

			ushort size = BitConverter.ToUInt16(buffer.Array, buffer.Offset); 
			pos += 2;
			ushort id = BitConverter.ToUInt16(buffer.Array, buffer.Offset + pos);
			pos += 2;

			// TODO
			switch ((PacketID)id)
			{
				case PacketID.PlayerInfoReq: // required
					{
						long playerId = BitConverter.ToInt64(buffer.Array, buffer.Offset + pos);
						pos += 8;
					}
					break;
				case PacketID.PlayerInfoOk:
					{
						int hp = BitConverter.ToInt32(buffer.Array, buffer.Offset + pos);
						pos += 4;
						int attack = BitConverter.ToInt32(buffer.Array, buffer.Offset + pos);
						pos += 4;
					}
					//Handle_PlayerInfoOk();
					break;
				default:
					break;
			}

			Console.WriteLine($"RecvPacketId: {id}, Size {size}");
		}

		// TEMP
		public void Handle_PlayerInfoOk(ArraySegment<byte> buffer)
		{

		}
        
        ...
}

DummyClient

ServerSession.cs

namespace DummyClient
{
	class Packet
	{
		public ushort size;
		public ushort packetId;
	}

	class PlayerInfoReq : Packet
	{
		public long playerId;
	}

	class PlayerInfoOk : Packet
	{
		public int hp;
		public int attack;
	}

	public enum PacketID
	{
		PlayerInfoReq = 1,
		PlayerInfoOk = 2,
	}

	class ServerSession : Session
	{
		// unsafe : 포인터 조작. 속도가 빠른 장점
		static unsafe void ToBytes(byte[] array, int offset, ulong value) 
		{
			fixed (byte* ptr = &array[offset])
				*(ulong*)ptr = value;
		}

		static unsafe void ToBytes<T>(byte[] array, int offset, T value) where T : unmanaged
		{
			fixed (byte* ptr = &array[offset])
				*(T*)ptr = value;
		}

		public override void OnConnected(EndPoint endPoint)
		{
			Console.WriteLine($"OnConnected : {endPoint}");
			
			// 패킷의 크기는 아래에서 정해짐
			PlayerInfoReq packet = new PlayerInfoReq() { size = 4, packetId = (ushort)PacketID.PlayerInfoReq, playerId = 1001 };


			// 보낸다
			for (int i = 0; i < 5; i++)
			{
				ArraySegment<byte> s = SendBufferHelper.Open(4096);
				//byte[] size = BitConverter.GetBytes(packet.size);
				//byte[] packetId = BitConverter.GetBytes(packet.packetId);
				//byte[] playerId = BitConverter.GetBytes(packet.playerId);

				ushort size = 0; // int로 만들면 TryWriteBytes의 버전 중 int 버전으로 넘겨줌. ToUInt16
				bool success = true;
			
				// destination(Span<byte>)의 공간보다 value의 크기가 크다면 실패
				// offset+size부터 offset-size만큼의 범위
				size += 2;
				success &= BitConverter.TryWriteBytes(new Span<byte>(s.Array, s.Offset + size, s.Count - size), packet.packetId);
				size += 2;
				success &= BitConverter.TryWriteBytes(new Span<byte>(s.Array, s.Offset + size, s.Count - size), packet.playerId);
				size += 8;
				success &= BitConverter.TryWriteBytes(new Span<byte>(s.Array, s.Offset, s.Count), size);

				ArraySegment<byte> sendBuff = SendBufferHelper.Close(size); // 실질적으로 보내줄 버퍼.

				if (success)
					Send(sendBuff);
			}
		}
        
        ...
}
profile
개인 공부 정리

0개의 댓글