CongWind
객체의 패킷 리스트에 저장한다.CongWind
객체 내 재전송 할 패킷을 모아두는 Queue에 저장한다.Sender
객체는 전송할 때 마다 CongWind
객체 내 Queue를 검사한다.public class Packet {
private int number;
private int timer;
private int ackCount;
private boolean duplicatedAck;
private boolean resendByDuplicatedAck;
private boolean timeout;
public Packet(int number) {
this.number = number;
reset();
}
public int getNumber() {
return number;
}
public void ack() {
this.ackCount++;
if (ackCount%3 == 0) {
duplicatedAck = true;
}
}
public void accumulateAck() {
this.ackCount = 1;
}
public void setResendByDuplicatedAck() {
this.resendByDuplicatedAck = true;
}
public void reset() {
this.timer = 0;
this.ackCount = 0;
this.duplicatedAck = false;
this.resendByDuplicatedAck = false;
this.timeout = false;
}
public boolean isResendByDuplicatedAck() {
return resendByDuplicatedAck;
}
public void increaseTimer() {
if (isAcked() || isDuplicatedAck()) {
return;
}
if (timer == 2) {
timeout = true;
timer = 0;
return;
}
timer++;
}
public boolean isDuplicatedAck() {
return duplicatedAck;
}
public boolean isAcked() {
return ackCount == 1;
}
public boolean isTimeout() {
return timeout;
}
public void setDuplicatedAck() {
this.duplicatedAck = false;
}
public void timerReset() {
this.timer = 0;
}
}
int number;
int timer;
int ackCount;
boolean duplicatedAck;
boolean resendByDuplicatedAck;
boolean timeout;
ack()
duplicatedAck
를 true를 설정한다.accumulateAck()
reset()
increaseTimer()
tiemr
를 증가시키는 메서드timer
가 2일경우 timeout
필드를 true를 설정public class CongWind {
private List<Packet> packetList = new ArrayList<>();
private Queue<Packet> timeoutList = new LinkedList<>();
private int size;
private int criticalPoint;
private int count = 1;
public CongWind() {
this.size = 9;
this.criticalPoint = 0;
}
public int getCriticalPoint() {
return criticalPoint;
}
public void linearIncrease() {
size++;
}
public void exponentialIncrease() {
size += Math.pow(2, count++);
}
public int getCount() {
return count;
}
// timeOut 발생시
public void slowStart() {
criticalPoint = (int)Math.ceil((double)size/2);
size = 1;
count = 1;
}
// 3Dup 발생시
public void fastRecovery() {
criticalPoint = (int)Math.ceil((double)size/2);
size = criticalPoint + 3;
count = 1;
}
// Packet을 추가
public Packet addPacket(int number) {
Packet packet = new Packet(number);
packetList.add(packet);
return packet;
}
public boolean isAckedPacket(int number) {
Packet packet = packetList.stream()
.filter((p) -> number == p.getNumber())
.findFirst().orElse(addPacket(number));
accumulateAck(number);
packet.ack();
if (packet.isDuplicatedAck()) {
packet.setDuplicatedAck();
Packet resendPacket = packetList.stream()
.filter((p) -> number < p.getNumber())
.findFirst().get();
resendPacket.setResendByDuplicatedAck();
timeoutList.add(resendPacket);
}
return packet.isAcked();
}
public void accumulateAck(int number) {
packetList.stream()
.filter((p) -> number > p.getNumber())
.forEach(Packet::accumulateAck);
}
public void increaseTimer() {
for (Packet packet : packetList) {
if (packet.isTimeout()) {
timeoutList.add(packet);
return;
}
packet.increaseTimer();
}
}
public int getSize() {
return size;
}
public Queue<Packet> getTimeoutList() {
return timeoutList;
}
public void timerReset() {
packetList.stream()
.forEach(Packet::timerReset);
}
public void reset() {
timerReset();
packetList = new ArrayList<>();
}
}
List<Packet> packetList
Queue<Packet> timeoutList
size
criticalPoint
count
void linearIncrease()
void exponentialIncrease()
void slowStart()
size
와 count
를 1로 설정void fastRecovery()
size
는 임계점 + 3으로 설정count
는 1로 설정Packet addPacket(int number)
number
에 해당하는 패킷을 생성해서 packtetList
에 추가void accumulateAck(int number)
number
의 패킷과 그 이전 패킷에 ACK 를 표시boolean isAckedPacket(int number)
accumulateAck
에 number
를 전달duplicatedAck
를 true로 표시void increaseTimer()
packetList
내 모든 패킷의 timer
를 증가시킨다.pakcetList
내부에 timeout
이 ture
된 패킷이 존재하면 timeoutList
에 추가한다.void timerReset()
packetList
의 모든 패킷의 타이머를 초기화시킨다.void reset()
public class Sender {
private static final String RECEIVER_HOST = "localhost";
private static final int RECEIVER_PORT = 8002;
private static final int SENDER_PORT = 8001;
public static void main(String[] args) {
CongWind congWind = new CongWind();
Queue<Packet> timeoutList;
int latestSendNumber = 99;
int latestAckNumber = 99;
int ack;
try {
while (true) {
timeoutList = congWind.getTimeoutList();
if (!timeoutList.isEmpty()) {
Packet packet = timeoutList.poll();
congWind.reset();
if (packet.isResendByDuplicatedAck()) {
congWind.fastRecovery();
System.out.println("<<3-Dup ACK 사건 발생>>");
System.out.println("Fast Recovery 실행--> cwind : " + congWind.getSize());
} else if (packet.isTimeout()) {
congWind.slowStart();
System.out.println("<<타임아웃 사건 발생>>");
System.out.println("Slow Start 실행 --> cwind : " + congWind.getSize());
}
latestSendNumber = latestAckNumber + congWind.getSize();
congWind.addPacket(latestSendNumber);
sendDate(latestSendNumber);
System.out.println("임계값 : " + congWind.getCriticalPoint() + "로 설정");
System.out.println("----------> 패킷 " + latestSendNumber + " 재전송");
} else {
latestSendNumber += congWind.getSize();
congWind.addPacket(latestSendNumber);
sendDate(latestSendNumber);
System.out.println("----------> 패킷 " + latestSendNumber + " 송신");
}
ack = receiveDate();
if (ack == -1) {
congWind.increaseTimer();
} else if (congWind.isAckedPacket(ack)) {
congWind.timerReset();
latestAckNumber = ack;
if (congWind.getSize() > congWind.getCriticalPoint()) {
congWind.linearIncrease();
System.out.println(
"<---ACK" + ack + " 수신 => cwin " + 1 + " 증가" + "(" + congWind.getSize()
+ ")");
} else if (congWind.getSize() < congWind.getCriticalPoint()) {
congWind.exponentialIncrease();
System.out.println(
"<---ACK" + ack + " 수신 = >cwin " + Math.pow(2, congWind.getCount()-1) + " 증가" + "("
+ congWind.getSize() + ")");
}
} else {
System.out.println("<---ACK" + ack + " 수신");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static int receiveDate() {
try (DatagramSocket socket = new DatagramSocket(SENDER_PORT)) {
socket.setSoTimeout(1000);
byte[] receiveData = new byte[1024];
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
socket.receive(receivePacket);
String receivedMessage = new String(receivePacket.getData(), 0, receivePacket.getLength());
return Integer.parseInt(receivedMessage);
} catch (IOException e) {
return -1;
}
}
private static void sendDate(int dataToSend) throws IOException {
try (DatagramSocket socket = new DatagramSocket()) {
byte[] sendData = Integer.toString(dataToSend).getBytes();
InetAddress receiverAddress = InetAddress.getByName(RECEIVER_HOST);
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, receiverAddress, RECEIVER_PORT);
socket.send(sendPacket);
}
}
}
CongWind
객체에서 timeoutList
를 받아온다.timeoutList
에 값이 있다는 것은 재전송해야할 패킷이 존재한다는 의미이다.timeoutList
큐에서 poll()
을 통해 패킷을 가져온다.timeoutList
가 비어있다면 재전송해야 할 패킷이 없다는 의미로, 혼잡 윈도우의 크기만큼 패킷을 전송한다.receiveDate()
메서드를 통해 ACK응답번호
를 전송 받는다.ACK응답번호
가 수신됐을 경우, 해당 응답번호의 패킷이 응답 되었다고 표시하고, 누적 ACK에 의해 이전 패킷들의 타이머를 초기화한다.ackCount
를 증가시킨다.public class Receiver {
private static final String SENDER_HOST = "localhost";
private static final Integer SENDER_PORT = 8001;
private static final Integer RECEIVER_PORT = 8002;
private static final Integer BUFFER_SIZE = 20;
public static void main(String[] args) {
int rcvBase = 99;
try {
while (true) {
int firstPacket = receiveDate();
System.out.println("----------> 패킷 " + firstPacket + " 수신");
int secondPacket = receiveDate();
System.out.println("----------> 패킷 " + secondPacket + " 수신");
if (rcvBase + BUFFER_SIZE > firstPacket) {
rcvBase = firstPacket;
if (rcvBase + BUFFER_SIZE > secondPacket) {
rcvBase = secondPacket;
System.out.println("<--- ACK" + rcvBase + " 송신");
sendData(rcvBase);
} else {
sendData(rcvBase);
System.out.println("<--- ACK" + rcvBase + " 송신");
}
} else {
System.out.println("== 패킷" + firstPacket + "은 rcvWind 범위 밖 ==");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void sendData(int data) throws IOException {
String dataInteger = Integer.toString(data);
// DatagramSocket 생성
DatagramSocket socket = new DatagramSocket();
// 보낼 데이터를 바이트 배열로 변환
byte[] sendData = dataInteger.getBytes();
// 상대방의 주소 설정
InetAddress senderAddress = InetAddress.getByName(SENDER_HOST);
// DatagramPacket 생성
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, senderAddress, SENDER_PORT);
// 데이터 전송
socket.send(sendPacket);
// 소켓 닫기
socket.close();
}
private static int receiveDate() throws IOException {
int receivedMessageInt;
// DatagramSocket 생성
DatagramSocket socket = new DatagramSocket(RECEIVER_PORT);
// 수신용 바이트 배열 생성
byte[] receiveData = new byte[1024];
// DatagramPacket 생성
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
// 데이터 수신
socket.receive(receivePacket);
// 수신한 데이터 출력
String receivedMessage = new String(receivePacket.getData(), 0, receivePacket.getLength());
receivedMessageInt = Integer.parseInt(receivedMessage);
// 소켓 닫기
socket.close();
return receivedMessageInt;
}
}
rcvBase + 버퍼의 크기
를 초과하는지 검사한다.첫 번째 패킷 + 버퍼의 크기
를 초과했는지 검사한다.Reciver
측에 버퍼 크기를 초과한 패킷이 수신될 경우 ACK를 전송하지 않는다.Sender
는 ACK407
이후 ACK를 수신받지 못하기 때문에 타임아웃이 발생한다.ACK108
을 3번 수신받은 이후, Fast Recovery를 실행한다.108 + 혼잡윈도우 크기
만큼의 패킷을 재전송한다.https://github.com/JunRain2/computer-network