Java와 UDP 를 이용해 TCP 혼잡제어 구현

이준우·2023년 12월 21일
0

구현 계기

  • 학교에서 배우는 컴퓨터 네트워크 과목의 설계과제로 UDP 소켓을 이용하여 TCP의 혼잡제어를 구현하는 것이 과제 내용이였다.
  • 내 생각에 UDP로 구현해야하는 이유는 몇몇 언어의 TCP 소켓에서는 혼잡제어에 대한 라이브러리를 제공해주기 때문인 것 같다.
  • 언어의 제한은 없기 때문에 가장 익숙한 Java를 사용하여 구현하였다.

설계 과제 요구사항

  • 타임아웃 발생, 3개의 중복 ACK 시나리오
  • 혼잡 윈도우의 크기 조절
    • 3개의 중복 ACK 발생시, Fast Recovery 적용
    • 타임아웃 발생시, Slow Start 적용
  • 수신측에서는 누적 ACK 방식과 지연된 ACK 방식을 사용
  • Selective repeat 사용
    • 누적 ACK인데 Selective repeat를 사용?
      • 교수님 : 누적 ACK는 Go-Back-N 이지만 각 패킷에 Timeout을 적용하는 정도로 구현해라!
  • 송신측은 패킷의 크기는 일정하며, 혼잡 윈도우는 패킷 단위로 표시되고 처리된다.
    • 혼잡 윈도우 크기만큼의 패킷을 ACK 없이 전송할 수 있다.

선행 지식

Selective repeat

  • 선택적 반복
  • 송신자는 최대 N개 만큼의 확인응답이 안된 패킷을 보낼 수 있다.
  • 수신자는 각각의 패킷에 대해 확인 응답을 보낸다.
    • [rcvbase, rcvbase+N-1] 범위의 패킷
      • ACK(n)을 보냄
      • 순서가 맞지 않으면 : 버퍼에 저장
      • 순서가 맞으면 : 위의 계층으로 보내고, 윈도우를 다음의 아직 받지 못한 패킷으로 이동
    • [rcvbase-N, rcvbase -1] 범위의 패킷
      • ACK(n) 를 보냄
    • 그렇지 않으면
      • 무시한다.
  • 송신자는 각각의 확인 응답이 안된 패킷에 대한 Timer를 가진다.
    • Timer가 초과되면, 확인 응답이 안된 패킷만을 전송한다.

지연된 ACK 전송

  • 누적 ACK에서 사용하는 기법으로, 송신자측에서 ACK를 늦게 수신하는 방법으로 트래픽이 쌓이는 것을 방지하는 기술이다.
    • 누적 ACK에 의해 마지막으로 보낸 ACK를 기준으로 수신측에서 응답 처리하기 때문
    • 누적 ACK란, 수신측에서 응답된 ACK 번호 이하 패킷들은 모두 응답처리하는 것이다.

혼잡제어

  • 너무 빠르고 많은 데이터를 너무 많은 송신자들에게 전송할 경우 혼잡 현상이 발생한다.
  • 수신측의 윈도루를 혼잡 윈도우(Cong Wind)라고 한다.
  • 임계점은 손실이 발생한 혼잡 윈도우의 1/2 를 의미한다.
  • AIMD
    • 가법 증가, 승법 감소
    • 손실이 발생할 때 까지, 사용가능한 대역폭을 탐지해 송신률을 증가시킨다.
    • ACK 응답이 타이머 내에 돌아오지 않아 발생하는 Timeout 발생 시 Slow Start를 실행
      • Slow Start 란 Cong Wind를 1로 설정하고 임계점 까지는 지수적으로 증가하다가 임계점을 넘으면 선형적으로 증가하는 것을 의미한다.
    • 중간에 패킷이 손실되어 같은 ACK를 3번 보내면 중복된 ACK 신호라 하며 Fast Recovery를 실행한다.
      • Fast Recovery란 Cong Wind를 임계점 +3 으로 설정하고 Cong Wind의 크기가 선형적으로 증가하는 것을 의미한다.

설계 아이디어

  • 패킷을 전송하면 CongWind 객체의 패킷 리스트에 저장한다.
  • 타임아웃이나 중복된 ACK가 감지된 패킷은 CongWind 객체 내 재전송 할 패킷을 모아두는 Queue에 저장한다.
  • Sender 객체는 전송할 때 마다 CongWind 객체 내 Queue를 검사한다.
  • Queue 내부에 패킷이 존재한다면 해당 패킷이 Timeout 이라면 Slow Start를 중복된 ACK라면 Fast Recovery를 실행한다.

코드

Packet

  • 패킷 클래스
public class Packet {
	private int number;
	private int timer;
	private int ackCount;
	private boolean duplicatedAck;
	private boolean resendByDuplicatedAck;
	private boolean timeout;

	public Packet(int number) {
		this.number = number;
		reset();
	}

	public int getNumber() {
		return number;
	}

	public void ack() {
		this.ackCount++;
		if (ackCount%3 == 0) {
			duplicatedAck = true;
		}
	}

	public void accumulateAck() {
		this.ackCount = 1;
	}

	public void setResendByDuplicatedAck() {
		this.resendByDuplicatedAck = true;
	}

	public void reset() {
		this.timer = 0;
		this.ackCount = 0;
		this.duplicatedAck = false;
		this.resendByDuplicatedAck = false;
		this.timeout = false;
	}

	public boolean isResendByDuplicatedAck() {
		return resendByDuplicatedAck;
	}

	public void increaseTimer() {
		if (isAcked() || isDuplicatedAck()) {
			return;
		}
		if (timer == 2) {
			timeout = true;
			timer = 0;
			return;
		}
		timer++;
	}

	public boolean isDuplicatedAck() {
		return duplicatedAck;
	}

	public boolean isAcked() {
		return ackCount == 1;
	}

	public boolean isTimeout() {
		return timeout;
	}

	public void setDuplicatedAck() {
		this.duplicatedAck = false;
	}

	public void timerReset() {
		this.timer = 0;
	}
}
  • 변수
    • int number;
      • 패킷의 번호
    • int timer;
      • 패킷 타이머
    • int ackCount;
      • 중복 ACK를 감지하기 위해 ACK를 카운트
    • boolean duplicatedAck;
      • 해당 패킷이 중복된 ACK가 발생 했으면 true를 표시
    • boolean resendByDuplicatedAck;
      • true이면 이전 패킷의 중복된 ACK에 의해, 재전송해야 할 패킷임을 의미
    • boolean timeout;
      • 타임아웃이 발생 했으면 true를 설정
      • true이면 재전송 해야할 패킷임을 의미
  • 메서드
    • ack()
      • 패킷에 대한 ACK 가 수신됐을 때 호출
      • 호출 때 마다 ackCount 를 1씩 증가시킨다
      • ackCount가 3이 되면 duplicatedAck 를 true를 설정한다.
    • accumulateAck()
      • 누적 ACK 때문에, ackCount를 1로 설정하여 응답이 완료된 패킷임을 나타낸다.
    • reset()
      • 패킷이 재전송 큐에 들어갔을 때, 숫자를 제외한 필드들을 초기값으로 초기화한다.
    • increaseTimer()
      • tiemr 를 증가시키는 메서드
      • ACK 응답이 왔거나 중복된 ACK 발생한 패킷일 경우 타이머를 증가시키지 않는다.
      • timer가 2일경우 timeout 필드를 true를 설정

CongWind

  • 혼잡 윈도우 클래스
public class CongWind {
	private List<Packet> packetList = new ArrayList<>();
	private Queue<Packet> timeoutList = new LinkedList<>();
	private int size;
	private int criticalPoint;
	private int count = 1;

	public CongWind() {
		this.size = 9;
		this.criticalPoint = 0;
	}

	public int getCriticalPoint() {
		return criticalPoint;
	}

	public void linearIncrease() {
		size++;
	}

	public void exponentialIncrease() {
		size += Math.pow(2, count++);
	}

	public int getCount() {
		return count;
	}

	// timeOut 발생시
	public void slowStart() {
		criticalPoint = (int)Math.ceil((double)size/2);
		size = 1;
		count = 1;
	}

	// 3Dup 발생시
	public void fastRecovery() {
		criticalPoint = (int)Math.ceil((double)size/2);
		size = criticalPoint + 3;
		count = 1;
	}

	// Packet을 추가
	public Packet addPacket(int number) {
		Packet packet = new Packet(number);
		packetList.add(packet);
		return packet;
	}

	public boolean isAckedPacket(int number) {
		Packet packet = packetList.stream()
			.filter((p) -> number == p.getNumber())
			.findFirst().orElse(addPacket(number));

		accumulateAck(number);
		packet.ack();

		if (packet.isDuplicatedAck()) {
			packet.setDuplicatedAck();
			Packet resendPacket = packetList.stream()
				.filter((p) -> number < p.getNumber())
				.findFirst().get();

			resendPacket.setResendByDuplicatedAck();
			timeoutList.add(resendPacket);
		}

		return packet.isAcked();
	}

	public void accumulateAck(int number) {
		packetList.stream()
			.filter((p) -> number > p.getNumber())
			.forEach(Packet::accumulateAck);
	}

	public void increaseTimer() {
		for (Packet packet : packetList) {
			if (packet.isTimeout()) {
				timeoutList.add(packet);
				return;
			}

			packet.increaseTimer();
		}
	}

	public int getSize() {
		return size;
	}

	public Queue<Packet> getTimeoutList() {
		return timeoutList;
	}

	public void timerReset() {
		packetList.stream()
			.forEach(Packet::timerReset);
	}

	public void reset() {
		timerReset();
		packetList = new ArrayList<>();
	}
}
  • 변수
    • List<Packet> packetList
      • 전송한 packet들의 리스트
    • Queue<Packet> timeoutList
      • 재전송해야할 packet들의 큐
    • size
      • 혼잡 윈도우의 사이즈
    • criticalPoint
      • 임계점
    • count
      • 지수적 증가에서 사용할 카운트
  • 메서드
    • void linearIncrease()
      • Slow Start 시 사용할 선형적 증가
    • void exponentialIncrease()
      • Fast Recovery에서 사용할 지수적 증가
    • void slowStart()
      • 타임아웃 발생시 실행할 메서드
      • 임계점을 손실이 발생한 혼잡 윈도우의 절반으로 설정
      • sizecount 를 1로 설정
    • void fastRecovery()
      • 중복된 ACK 발생시 실행할 메서드
      • 임계점을 손실이 발생한 혼잡 윈도우의 절반으로 설정
      • size 는 임계점 + 3으로 설정
      • count 는 1로 설정
    • Packet addPacket(int number)
      • 패킷을 전송할 때 실행하는 메서드
      • number에 해당하는 패킷을 생성해서 packtetList에 추가
    • void accumulateAck(int number)
      • 누적된 ACK로 인해 number 의 패킷과 그 이전 패킷에 ACK 를 표시
    • boolean isAckedPacket(int number)
      • accumulateAcknumber 를 전달
      • 만약 해당 패킷이 중복된 ACK일 경우, 해당 패킷의 다음 패킷을 재전송하기 위해 duplicatedAck 를 true로 표시
    • void increaseTimer()
      • 지정된 시간동안 ACK가 도착하지 않으면 packetList 내 모든 패킷의 timer 를 증가시킨다.
      • pakcetList 내부에 timeoutture 된 패킷이 존재하면 timeoutList에 추가한다.
    • void timerReset()
      • packetList 의 모든 패킷의 타이머를 초기화시킨다.
    • void reset()
      • ACK가 올 경우, 누적 ACK에 의해 이전에 전송되었던 패킷들을 삭제하고 새로운 리스트를 할당

Sender

  • 송신측 역할
public class Sender {
	private static final String RECEIVER_HOST = "localhost";
	private static final int RECEIVER_PORT = 8002;
	private static final int SENDER_PORT = 8001;

	public static void main(String[] args) {
		CongWind congWind = new CongWind();
		Queue<Packet> timeoutList;
		int latestSendNumber = 99;
		int latestAckNumber = 99;
		int ack;

		try {
			while (true) {
				timeoutList = congWind.getTimeoutList();

				if (!timeoutList.isEmpty()) {
					Packet packet = timeoutList.poll();
					congWind.reset();

					if (packet.isResendByDuplicatedAck()) {
						congWind.fastRecovery();
						System.out.println("<<3-Dup ACK 사건 발생>>");
						System.out.println("Fast Recovery 실행--> cwind : " + congWind.getSize());
					} else if (packet.isTimeout()) {
						congWind.slowStart();
						System.out.println("<<타임아웃 사건 발생>>");
						System.out.println("Slow Start 실행 --> cwind : " + congWind.getSize());
					}
					latestSendNumber = latestAckNumber + congWind.getSize();

					congWind.addPacket(latestSendNumber);
					sendDate(latestSendNumber);
					System.out.println("임계값 : " + congWind.getCriticalPoint() + "로 설정");
					System.out.println("----------> 패킷 " + latestSendNumber + " 재전송");
				} else {
					latestSendNumber += congWind.getSize();
					congWind.addPacket(latestSendNumber);
					sendDate(latestSendNumber);
					System.out.println("----------> 패킷 " + latestSendNumber + " 송신");
				}

				ack = receiveDate();

				if (ack == -1) {
					congWind.increaseTimer();
				} else if (congWind.isAckedPacket(ack)) {
					congWind.timerReset();
					latestAckNumber = ack;
					if (congWind.getSize() > congWind.getCriticalPoint()) {
						congWind.linearIncrease();
						System.out.println(
							"<---ACK" + ack + " 수신 => cwin " + 1 + " 증가" + "(" + congWind.getSize()
								+ ")");
					} else if (congWind.getSize() < congWind.getCriticalPoint()) {
						congWind.exponentialIncrease();
						System.out.println(
							"<---ACK" + ack + " 수신 = >cwin " + Math.pow(2, congWind.getCount()-1) + " 증가" + "("
								+ congWind.getSize() + ")");
					}

				} else {
					System.out.println("<---ACK" + ack + " 수신");
				}

			}
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}

	private static int receiveDate() {
		try (DatagramSocket socket = new DatagramSocket(SENDER_PORT)) {
			socket.setSoTimeout(1000);
			byte[] receiveData = new byte[1024];
			DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
			socket.receive(receivePacket);
			String receivedMessage = new String(receivePacket.getData(), 0, receivePacket.getLength());
			return Integer.parseInt(receivedMessage);
		} catch (IOException e) {
			return -1;
		}
	}

	private static void sendDate(int dataToSend) throws IOException {
		try (DatagramSocket socket = new DatagramSocket()) {
			byte[] sendData = Integer.toString(dataToSend).getBytes();
			InetAddress receiverAddress = InetAddress.getByName(RECEIVER_HOST);
			DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, receiverAddress, RECEIVER_PORT);
			socket.send(sendPacket);
		}
	}
}
  • 매 루프마다 CongWind 객체에서 timeoutList 를 받아온다.
  • timeoutList 에 값이 있다는 것은 재전송해야할 패킷이 존재한다는 의미이다.
    • timeoutList 큐에서 poll() 을 통해 패킷을 가져온다.
    • 혼잡 윈도우를 초기화해 기존에 ACK를 받은 패킷들을 제거한다.
    • 해당 패킷이 중복된 ACK 이 발생했다면 Fast Recovert를 실행한다.
    • 해당 패킷이 timeout이 발생했다면 Slow Strat를 실행한다.
    • 해당 패킷 + 혼잡 윈도우의 크기만큼의 패킷을 재전송한다.
  • timeoutList 가 비어있다면 재전송해야 할 패킷이 없다는 의미로, 혼잡 윈도우의 크기만큼 패킷을 전송한다.
  • receiveDate() 메서드를 통해 ACK응답번호 를 전송 받는다.
    • 만약 1초 이내에 ACK 가 수신되지 않을 경우 혼잡 윈도우의 패킷의 타이머를 증가시킨다.
    • ACK응답번호 가 수신됐을 경우, 해당 응답번호의 패킷이 응답 되었다고 표시하고, 누적 ACK에 의해 이전 패킷들의 타이머를 초기화한다.
    • 이미 전송 받았던 ACK를 전송 받았을 경우 혼잡 윈도우는 증가시키지 않고, 해당 패킷의 ackCount를 증가시킨다.
    • 임계점보다 혼잡 윈도우 크기가 클 경우 혼잡 윈도우를 선형적으로 증가시킨다.
    • 임계점보다 혼잡 윈도우의 크기가 작을 경우 혼잡 윈도우를 지수적으로 증가시킨다.

Reciver

  • 수신측 역할
public class Receiver {
	private static final String SENDER_HOST = "localhost";
	private static final Integer SENDER_PORT = 8001;
	private static final Integer RECEIVER_PORT = 8002;
	private static final Integer BUFFER_SIZE = 20;

	public static void main(String[] args) {
		int rcvBase = 99;

		try {
			while (true) {
				int firstPacket = receiveDate();
				System.out.println("----------> 패킷 " + firstPacket + " 수신");

				int secondPacket = receiveDate();
				System.out.println("----------> 패킷 " + secondPacket + " 수신");

				if (rcvBase + BUFFER_SIZE > firstPacket) {
					rcvBase = firstPacket;

					if (rcvBase + BUFFER_SIZE > secondPacket) {
						rcvBase = secondPacket;
						System.out.println("<--- ACK" + rcvBase + " 송신");
						sendData(rcvBase);
					} else {
						sendData(rcvBase);
						System.out.println("<--- ACK" + rcvBase + " 송신");
					}
				} else {
					System.out.println("== 패킷" + firstPacket + "은 rcvWind 범위 밖 ==");
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private static void sendData(int data) throws IOException {
		String dataInteger = Integer.toString(data);

		// DatagramSocket 생성
		DatagramSocket socket = new DatagramSocket();

		// 보낼 데이터를 바이트 배열로 변환
		byte[] sendData = dataInteger.getBytes();

		// 상대방의 주소 설정
		InetAddress senderAddress = InetAddress.getByName(SENDER_HOST);

		// DatagramPacket 생성
		DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, senderAddress, SENDER_PORT);

		// 데이터 전송
		socket.send(sendPacket);

		// 소켓 닫기
		socket.close();
	}

	private static int receiveDate() throws IOException {
		int receivedMessageInt;

		// DatagramSocket 생성
		DatagramSocket socket = new DatagramSocket(RECEIVER_PORT);

		// 수신용 바이트 배열 생성
		byte[] receiveData = new byte[1024];

		// DatagramPacket 생성
		DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);

		// 데이터 수신
		socket.receive(receivePacket);

		// 수신한 데이터 출력
		String receivedMessage = new String(receivePacket.getData(), 0, receivePacket.getLength());

		receivedMessageInt = Integer.parseInt(receivedMessage);

		// 소켓 닫기
		socket.close();

		return receivedMessageInt;
	}
}
  • 지연된 ACK를 구현하기 위해, 패킷을 연속 2번 전송 받는다.
  • 첫 번째 패킷이 rcvBase + 버퍼의 크기 를 초과하는지 검사한다.
    • 초과할 경우 아무것도 하지 않는다.
    • 수신측에서 타임아웃이 발생
    • 초과하지 않을경우, 두번째 패킷이 첫 번째 패킷 + 버퍼의 크기 를 초과했는지 검사한다.
      • 초과할 경우 첫 번째 패킷의 번호를 수신측으로 응답한다.
      • 초과하지 않을 경우 두 번째 패킷의 번호를 수신측으로 응답한다.

시나리오

일반적인 경우

  • 혼잡 윈도우가 선형적으로 증가한다.

  • 선형적으로 증가하다가 Reciver 측에 버퍼 크기를 초과한 패킷이 수신될 경우 ACK를 전송하지 않는다.

  • SenderACK407 이후 ACK를 수신받지 못하기 때문에 타임아웃이 발생한다.
  • 따라서 Slow Start를 실행하고 임계점까지 지수적으로 증가하다가 임계점을 넘은 이후 선형적으로 증가한다.

중복된 ACK

  • 강제로 중복된 ACK를 발생시킨 시나리오
  • ACK108 을 3번 수신받은 이후, Fast Recovery를 실행한다.
  • 108 + 혼잡윈도우 크기 만큼의 패킷을 재전송한다.
  • 이후 혼잡윈도우의 크기는 선형적으로 증가한다.

Git 주소

https://github.com/JunRain2/computer-network


소감

  • 이론으로만 배웠던 Selective repeat와 혼잡제어를 구현해봄으로써, 내가 어떤 디테일을 놓치고 있었는지 확인할 수 있었던 좋은 시간이었다.
  • 구현을 통해 기존에 애매하게 잡혀있던 개념을 확실하게 잡고 갈 수 있어서 좋았던 것 같다.
  • 처음 구현을 시작할 때는 어떤 식으로 설계해야할지 감도 잡히지 않아서 막막해서 그냥 생각나는대로 코드를 작성하는 것부터 시작했다.
  • 코드를 작성하면서 점점 과제를 어떤 식으로 해결하면 될지 감을 잡았고, 해당 코드를 완성할 수 있었다.
  • 여기서 느낀점은 책을 읽는 것만으로는 내것으로 만들 수 없고, 스스로 코드를 구현하면서 내 것으로 만들 필요가 있다는 것을 느꼇다.
  • 아쉬운점은 코드를 처음부터 설계하고 들어간 것이 아니라 중간에 수정해야 하는 부분이 많아서 쓸모없는 메서드나 변수가 존재하게 되었다는 것이다.
  • 또 객체지향적으로 탄탄하게 설계한 것이 아닌, 그냥 돌아가주라는 생각으로 작성한 코드이기 때문에 한눈에 보기에도 깔끔하지가 않다.
  • 그래서 추후 객체지향을 연습하기 위해, 해당 코드를 리팩토링할 생각이다.
profile
잘 살고 싶은 사람

0개의 댓글