Reliable UDP를 Go-Back-N으로 구현을 해보았다.
사용 방법은 송신자측에서는 send 이후 wait로 패킷이 다 보내지길 기다리면 되고 수신자는 계속 receive를 하면된다.
문제는 경악할만한 속도. 윈도우 사이즈는 지금 4(...) 인데 이때가 가장 빨랐다..
3.5mb 전송에 20초..
그리고 Go-Back-N 특성상 순서가 하나만 잘못되도 이전에 보낸걸 다 버리는데, 이 비율이 40% 는 된다. 따라서 불필요하게 패킷을 너무 보내기도 하고, 윈도우 사이즈도 작아서 대역폭도 못쓰고..
다음은 Selective Repeat로 구현해봐야겠다.
UDP로 파일공유를 하겠다는 목표를 가지고 달리는중.. 속도는 지금보다 25배정도 빨라저야한다.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace FileSender
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ReliableUdp
{
class RUdpClient
{
protected class RUdpPacket : IEquatable<RUdpPacket>
{
public long time { get; set; }
public byte[] data { get; set; }
public IPEndPoint point { get; }
public ushort sequence { get; }
public RUdpPacket(long time, byte[] data, IPEndPoint point)
{
this.time = time;
this.data = data;
this.point = point;
sequence = BitConverter.ToUInt16(data);
}
public bool Equals(RUdpPacket other)
{
if (other.sequence == sequence)
return true;
else
return false;
}
}
private UdpClient client;
private Stopwatch timer;
public int resendCount = 0;
private IAsyncResult asyncResult;
private ushort packetNumber = 0;
private bool isFirst = true;
private ushort timerTargetSeq = 0;
private uint receivedNumber = 0;
private int sendBase = 0;
private int lastReceivedAck = -1;
private bool isReceiving = false;
private bool receiveStrange = false;
private long lastSendTime = 0;
private int windowSize { get; set; }
public byte maxReceiveCount { get; set; }
public int timeOut { get; set; }
private List<RUdpPacket> packetList; //유저가 보내고자 하는 패킷들
private List<RUdpPacket> waitForAckList; //유저가 보내서 ack를 기다리는 패킷들
private ConcurrentQueue<RUdpPacket> userPacketList; //실제로 사용자에게 반환되는 받은 패킷
private CancellationTokenSource tokenSource;
private CancellationToken messageLoopToken;
public RUdpClient(int port) //1대1 통신을 한다고 가정함. 1대 다수 통신의 경우 지원하지 않음.
{
windowSize = 4; //기본 윈도우 사이즈는 1024
timeOut = 500; //default timeout=500ms
client = new UdpClient(port);
packetList = new List<RUdpPacket>();
waitForAckList = new List<RUdpPacket>();
userPacketList = new ConcurrentQueue<RUdpPacket>();
tokenSource = new CancellationTokenSource();
messageLoopToken = tokenSource.Token;
timer = new Stopwatch();
Task.Run(new Action(() =>
{
while (!messageLoopToken.IsCancellationRequested)
messageLoop();
}), messageLoopToken);
}
~RUdpClient()
{
tokenSource.Cancel(); //Task 종료
}
public void messageLoop() //Go-Back -N ARQ 구현
{
if (client.Available != 0)
{
IPEndPoint other = new IPEndPoint(IPAddress.Loopback, 8888);
byte[] dgram = client.Receive(ref other);
ushort receivedPacketSeq = BitConverter.ToUInt16(dgram);
if (dgram.Length == 2) //ack 패킷
{
int targetIdx;
if (lastReceivedAck == receivedPacketSeq) //동일한 ack가 중복으로 왔을 때
{
//Console.WriteLine("중복된 ack 패킷 수신 : {0}", receivedPacketSeq);
targetIdx = packetList.IndexOf(new RUdpPacket(0, BitConverter.GetBytes(receivedPacketSeq), null));
if (targetIdx != -1)
{
timer.Stop();
sendBase = 0;
}
else
Console.WriteLine("target index is -1!");
resendCount++;
}
else
{
//Console.WriteLine("ack 패킷 수신 : {0}", receivedPacketSeq);
targetIdx = packetList.IndexOf(new RUdpPacket(0, BitConverter.GetBytes(receivedPacketSeq), null));
if (targetIdx != -1)
{
lock (packetList)
packetList.RemoveRange(0, targetIdx + 1); //해당 패킷까지 ack 처리함
sendBase -= targetIdx + 1;
if (sendBase < 0)
throw new Exception();
}
else
Console.WriteLine("target index is -1!");
}
lastReceivedAck = receivedPacketSeq;
}
else
{
ushort convertedSeq;
if (receivedPacketSeq == receivedNumber)
{
// Console.WriteLine("정상 패킷 수신 : {0}", receivedPacketSeq);
userPacketList.Enqueue(new RUdpPacket(0, dgram, other));
receiveStrange = true;
convertedSeq = (ushort)receivedNumber;
byte[] ack = BitConverter.GetBytes(convertedSeq); //마지막으로 수신한 seq 반환
client.Send(ack, 2, other);
Interlocked.Increment(ref receivedNumber);
//Console.WriteLine("ack 전송 : {0}", receivedPacketSeq);
if (receivedNumber == windowSize)
receivedNumber = 0;
}
else //잘못된 패킷이 왔을 때
{
//Console.WriteLine("비정상 패킷 수신 : {0}", receivedPacketSeq);
if (receivedNumber == 0)
convertedSeq = (ushort)(windowSize - 1);
else
convertedSeq = (ushort)(receivedNumber - 1);
byte[] ack = BitConverter.GetBytes(convertedSeq); //마지막으로 수신한 seq 반환
if (!receiveStrange)
{
client.Send(ack, 2, other);
receiveStrange = false;
}
//Console.WriteLine("ack 전송 : {0}", convertedSeq);
}
}
}
if (lastSendTime + timeOut <= timer.ElapsedMilliseconds && timer.IsRunning) //timeout
{
//Console.WriteLine("Timeout");
if (packetList.Count != 0)
{
if (packetList[0].sequence == timerTargetSeq)
{
//Console.WriteLine("패킷 {0} 타임아웃 ", timerTargetSeq);
timer.Stop();
resendCount++;
sendBase = 0;
}
else //타이머 타겟 패킷이 이미 ack 처리된 경우
{
lastSendTime = timer.ElapsedMilliseconds;
timerTargetSeq = packetList[0].sequence;
}
}
}
int packetCount = 0;
lock (packetList)
packetCount = packetList.Count;
if (packetCount != 0 && sendBase < windowSize && sendBase < packetCount) //보낼 패킷이 있고, 보낼 수 있을 때
{
if (!timer.IsRunning)
{
timer.Restart();
timerTargetSeq = packetList[0].sequence;
lastSendTime = timer.ElapsedMilliseconds;
}
while (sendBase < packetCount && sendBase < windowSize-1)
{
client.Send(packetList[sendBase].data, packetList[sendBase].data.Length, packetList[sendBase].point);
//Console.WriteLine("패킷 {0} 송신", BitConverter.ToUInt16(packetList[sendBase].data));
sendBase++;
}
}
}
public void Wait()
{
while (packetList.Count != 0) ;
}
public byte[] Receive(ref IPEndPoint ipEndPoint)
{
RUdpPacket packet = new RUdpPacket(0, BitConverter.GetBytes(0), null);
while (!userPacketList.TryDequeue(out packet)) ; //큐에 데이터가 있을때까지 blocking
byte[] data = new byte[packet.data.Length - 2];
Array.Copy(packet.data, 2, data, 0, data.Length); //패킷넘버 지우고 반환
return data;
}
public void Send(byte[] data, int length, IPEndPoint address)
{
while (packetList.Count > 256) ;
byte[] packedData = new byte[length + 2];
Array.Copy(data, 0, packedData, 2, length);
Array.Copy(BitConverter.GetBytes(packetNumber++ % windowSize), packedData, 2); //앞에 2바이트의 sequence 넘버 붙여줌
RUdpPacket packet = new RUdpPacket(0, packedData, address);
lock (packetList)
packetList.Add(packet); //패킷을 큐에 넣는다.
}
}
}
}