Introducing - KIKI

최혜성·3일 전
0

KIKI

NAT Packet Tunneling Program

화면 캡처 2024-06-27 153250

What?

NAT (Network Address Translation)에 의해 외부 클라이언트에서 NAT 내부 서버에 접속하지 못하는 문제를 리버스 프록시를 이용해 접속할 수 있도록 도와주는 프로그램입니다.

대중적인 프로그램으로는 Ngrok가 있습니다.

키키는 지브리의 애니메이션에서 따온 이름입니다. 중계 서버가 마치 터널링 서버에 패킷을 '배달'해주는 것에서 따왔습니당

How?

터널링 될 서버는 NAT에 의해 포트포워딩, DMZ가 전부 적용되지 않아 외부에서 NAT을 우회하고 들어올 방법이 없습니다. 따라서 '서버'가 아닌 '클라이언트'로써 연결하는것 밖에는 안됩니다.

위 사진처럼, 각 클라이언트가 외부 서버에 요청할경우, NAT Router는 요청자의 ip를 자신의 공인 ip로 변경하고, 서버에게 응답받은 패킷을 다시 NAT Table을 참조해 내부 IP로 변경하여 통신을 수행합니다.

하지만, 만약 NAT 내부에서 서버를 연다면 어떻게 될까요?
우선, 포트포워딩, DMZ가 설정되어 있지 않다면, NAT Table이 구성되어 있지 않기 때문에 내부에 있는 목적지 주소 (내부 IP)중 어디로 패킷을 전달해야할지 모릅니다.
만약 내부 ip 192.168.0.4:8080에서 서버를 열었는데 외부 ip 200.14.132.15:8080으로 접속한다면 어느 내부 ip로 연결시켜줘야 할지 모른다는거죠. 192.168.0.5가 될수도 있고, 192.168.0.7이 될수 있고..

그래서 이걸 연결시킬 수 있도록 mac주소와 함께 내 아이피(내 맥주소)로 포트를 연결시켜줘라~ 하는게 포트포워딩 (광범위한 경우 DMZ)가 되는데, 이게 잘 작동하면 상관이 없는데, 가끔 네트워크 구성이 복잡하게 되어 있어, NAT안에 NAT이 구성되어 있는경우 작동하지 않습니다.
(구성 잘못한거긴 한데.. 늘상 있는일 - 모뎀안에 공유기를 물리는경우 NAT 중첩등)

이런경우 눈물을 머금고 VPN을 결제해서 가상 망 내에서 접속하거나, 유지비가 많이 나가는 서버를 사용하기도 합니다.

하지만, NAT이 있더라도 '클라이언트'로써 연결하는것은 자유롭다는것에 착안해, 외부 접속이 가능한 서버 'B'를 생성하고, 터널링될 서버('A')를 클라이언트로써 연결한 뒤, 실제 외부에서 접속할땐 B에 연결해 데이터를 보내고, B는 A와 서버 - 클라이언트 관계를 통해 패킷을 주고 받는 방법을 통해 우회 (터널링)할 수 있습니다.

그걸 어떻게?

리버스 프록시라는 개념이 있습니다. Spring Cloud Gateway, Nginx(무중단 배포시 사용), BungeeCord(MC)등에서 사용되며, 분명 클라이언트는 B라는 서버에 연결했는데, 접속해보니 A서버에 접속한것과 동일한 경험을 얻을 수 있었습니다.

이 처럼, 프록시가 클라이언트 앞단이 아닌, 서버 앞단에 위치하여, 해당 프록시로 들어오는 패킷을 적절한 '서버'에 매핑시켜주는 기법을 의미합니다.

여기에서는 리버스 프록시 개념을 응용해, B서버로 접속시 A와 연결된 소켓으로 입력값을 보내주고, 그에 따른 출력값을 A로부터 받아와 클라이언트에 반환하게 됩니다.

A는 B에게 받은 입력값에 따라 연결이 필요한 내부 서버와 연결하는 소켓을 생성하고, 입력값을 보내고 출력값을 B서버로 보내게 됩니다.

이처럼 B 서버는 패킷을 '중계'하는 역할을 맡고 있습니다.

이게 어떻게 작동되는건가? 하고 의문이 들 수 있는데, '이론상' input에 적절한 값을 전달하고, output으로 적절하게 값을 읽어온다면 직접 소켓을 연결한것과 동일하게 동작할 수 있습니다.
따라서, 클라이언트가 B서버로 보내는 패킷을 '완벽하게' 읽어와서 A에게 전달해주고, A의 내부 소켓으로 입력값을 '완벽하게' 전달한다면 문제가 없습니다.
데이터 수신도 역순으로 '완벽하게 A에서 읽어서 B로 전달하고, 이를 Client로 전달하면 가능합니다.

Detail

* Server

우선 터널링 될 서버랑 Connection을 맺을 소켓 서버를 구성합니다. 해당 포트를 통해 터널링될 패킷의 정보가 전달됩니다.

val serverSocket = SocketServer(6960) 

이후, 터널링 서버(클라이언트)와 소켓 연결이 성사된경우, 해당 클라이언트에게 전달할 패킷을 받을 서버를 구성합니다.
이때, 0번 포트로 서버를 열어 해당 클라이언트 전용으로 서버가 동작하게 설정합니다. (specific client)

val tunnelingClient = serverSocket.accept()
val tunnelingServer = ServerSocket(0) //random port

이후 해당 서버로 외부에서 접속할 클라이언트가 접속합니다. 이때, 각 클라이언트 별로 고유한 id를 부여해, 하나의 서버 커넥션으로 전달하더라도, 적절하게 패킷을 처리해야하는 클라이언트를 찾는 디멀티플렉싱을 수행합니다.

그런데, 외부 클라이언트가 접속했다는 사실은 현재 터널링 클라이언트 (A)가 알 수 없습니다. accept가 성공적으로 이루어졌으나, 현재 A랑은 소켓으로 연결되어 있기 때문에 알 수 없는 문제가 있습니다.
따라서, 연결됐다는 패킷을 A에게 전달합니다.

패킷 객체는 JSON으로 변환되어 String 형식으로 서버를 통해 전달됩니다. ACCEPT 패킷은 다음과 같이 전달될것입니다.

{"id" : id, "packetType": "ACCEPT", "isSuccess" : true, "data" : "", "exception" : null}

해당 패킷의 구조는 추후 Common에서 설명드리겠습니다.

val realClient = tunnelingServer.accept()
sendPacket(Packet(id, PacketType.ACCEPT, PayloadResult.success(""))

이제 A도 내부 Socket을 형성해 Destination Server와 연결이 이루어졌습니다. 따라서 중계를 시작합니다.

while(isRunning) {
	// read logic - 터널링 서버에서 읽어온 값을 실제 클라이언트에게 전달 
    val input = tunnelingClient.inputStream().bufferedReader().readLine() ~
    val packet = PacketUtil.deserialize(input);
    realClient.outputStream.write(packet.payload.convertToByteArray) //native (byte array)로 전달
    // write logic - 클라이언트에서 읽은 값을 터널링 서버로 전달
    val clientInput = realClient.inputStream()~.readLine()
    tunnelingClient.outputStream().writeObject(Packet(id, PacketType.MESSAGE, PayloadResult.success(clientInput)) //서버로 전달할때는 json형식 Packet으로 전달
    
}

이때 과정은 다음과 같습니다.

  • 클라이언트에서 데이터를 읽은경우
    클라이언트 소켓에서 "asdf1234"라는 데이터를 읽은경우(이때 읽은값은 String이 아니라 ByteArray로 읽음. 손상 최소화) 패킷으로 가공합니다. 가공된 패킷의 json은 다음과 같습니다.
{"id" : 2, "packetType": "MESSAGE", "isSuccess" : true, "exception" : null, "data" : "adfd4564=="}

ACCEPT가 이미 수행됐으므로 다음 패킷은 MESSAGE로 전달되며, data는 기존 ByteArray를 Base64로 안전하게 인코딩한 값을 문자열로 바꿔 전달합니다.

  • 터널링 서버에서 데이터를 읽은경우
    터널링 서버가 클라이언트에게 데이터를 전달해야 하는 경우 이 또한 json형식의 Packet으로 전달됩니다. json은 다음과 같습니다.
{"id" : 3, "packetType": "MESSAGE", "isSuccess" : true, "exception" : null, "data" : "aerqwer4151dd5=="}

이러한 패킷을 받은경우, 서버에서 해당 id를 가진 소켓을 찾고, 거기로 데이터를 전달합니다.
이때, 클라이언트에게 데이터를 올바르게 전달해야 하므로, 패킷 객체가 아닌 data를 base64 decode한 값을 write합니다.

위 과정을 Rough하게 작성되어 있어, 실제 코드와는 다른 부분이 있을 수 있습니다. 실제 코드는 클라이언트 소켓 accept시 atomic하게 id를 부여하며, read와 write 모두 다른 쓰레드를 이용해 좀더 안전하고 효율적으로 처리합니다.

* Client

클라이언트는 최초에 중계할 서버의 ip와 터널링할 자신의 포트번호를 입력받습니다.

  val scanner : Scanner = Scanner(System.`in`)
  print("서버의 ip를 입력해주세요 : ")
  val ip = scanner.next()
  print("터널링 할 포트를 입력해주세요 : ")
  // 예외처리 안됨. 포트 범위 bound 체크 안됨
  val port = scanner.nextInt()
  Client(ip, port).startClient()

이때, 포트번호는 굳이 서버로 보낼 필요는 없어, 자신만 알고 있다 들어오는 패킷에 따라 내부 소켓 연결을 수립할때 사용합니다.
중요한건 포트번호가 아닌, 주고받는 데이터이기 때문에, 어떤 포트로 연결하더라도 데이터만 잘 주고받으면 잘 작동했습니다.

이제 서버와 연결을 수립합니다. 서버가 터널링 클라이언트와 연결하는 서버의 포트는 6960으로 되어 있기 때문에 연결이 수립됩니다.

val socket = Socket(ip, 6960) 

이때, 아까전에 내부 패킷을 주고 받기 위한 서버 소켓을 하나 생성했는데, 이 소켓의 포트는 0으로 되어 있어 랜덤 포트로 결정됩니다.
이 포트가 생성됐다는걸 클라이언트는 어떻게 알 수 있을까요?

  socket.getInputStream().bufferedReader().readLine().also { println(it) } //url print 하기 위해 맨처음 listen

패킷 json을 주고받기전 bufferedReader를 이용해 임시적으로 포트를 받아옵니다. (서버도 최초로 print 수행)
그러면 client는 Server Connected! - {port} 를 반환하게 됩니다. 그러면 외부 접속시 아까 입력한 ip:port형식으로 접속하면 되겠죠

이제 핵심 과정입니다.

  • ACCEPT
    외부 접속 클라이언트가 서버(B)에 연결했을때 연결됨을 알려주기 위해 ACCEPT 패킷을 전달한다 했었습니다. 이 경우 클라이언트는 다음과 같이 동작합니다.
val stream = //서버와 연결된 socket input stream 읽기)
val readPacket : Packet = PacketUtil.deserialize(stream.read~)
if (readPacket.packetType == PacketType.ACCEPT)
	createSocket(readpacket.id, Socket(127.0.0.1, port) //터널링할 포트로 내부 소켓 생성
else 
	notifyPacket(readPacket) //message인경우 notify

해당 패킷의 id를 가진 소켓을 생성하는 요청을 보냅니다. 이렇게 되면 클라이언트 자체에서 소켓을 생성하고, 연결을 수립할 수 있습니다.

이 점에서, 리버스 프록시와는 좀 다르다고 생각했습니다. 리버스 프록시는 프록시 서버 자체에서 연결을 생성하고 주고 받는데, NAT을 우회하기 위해선, 클라이언트 자체가 연결을 주도해야하기 때문에 (프록시 서버도 터널링 클라이언트에 서버로써 연결 못함) 클라이언트에서 소켓을 생성하게 되었습니다.

이렇게 생성된 소켓은 다음과 같이 수행됩니다.

while(isRunning) {
	// read logic - 터널링 서버에서 읽어온 값을 내부 소켓으로 전달 
    val input = serverSocket.inputStream().bufferedReader().readLine() ~
    val packet = PacketUtil.deserialize(input);
    internelSocket.outputStream.write(packet.payload.convertToByteArray) //native (byte array)로 전달
    // write logic - 클라이언트에서 읽은 값을 터널링 서버로 전달
    val internalInput = internalSocket.inputStream()~.readLine()
    serverSocket.outputStream().writeObject(Packet(id, PacketType.MESSAGE, PayloadResult.success(internalInput)) //서버로 전달할때는 json형식 Packet으로 전달
    
}

어.. 위에서 봤던 서버 로직에서 클라이언트가 수행하는거랑 동일하지 않나요?

맞습니다. 사실, 외부 접속 클라이언트 끝단과, 내부적으로 생성되어 터널링 클라이언트에서 동작하는 소켓의 로직은 동일합니다.
둘다 ByteArray로 입력받은 값을 Packet의 형태로 json화 시켜 전달하고, 입력받은 Packet을 역직렬화 해서 ByteArray로 decode 후 전달하는것은 같습니다.
이 점을 들어 Client와 Server모두 ClientSocket이라는 동일한 클래스를 활용하고 있습니다.

  • MESSAGE
    그러면 소켓 생성은 그렇다 하면, 패킷 전달은?
<notifyPacket>
socketQueue.stream().forEach {it.handlePacket}

fun handlePacket(packet : Packet) {
	if (!isHandleAble(packet)) - {this.id == packet.id} 비교
    	return //early return
    
    //위에서 봤던 packet Write 로직
}

다음과 같이 옵저버 패턴?? 을 이용해서 연결된 내부 소켓이 핸들링 되는 패킷이면 전달, 아닌경우 return을 수행해서 다음 소켓에게 넘기는 과정을 수행합니다.

Common

  • Packet
/**
 * Client - Server에서 처리하는 패킷 클래스
 * @property id 연결된 소켓의 id. packetType이 ACCEPT일경우 해당 id로 된 소켓 연결을 생성함.
 * @property packetType 패킷의 타입 (최초 연결여부, 메시지 구분)
 * @property payload 패킷의 데이터. 만약 exception이 발생했을경우 exception이 포함될 수 있음. Base64 인코딩
 */
class Packet(
    val id : Int,
    val packetType: PacketType,
    val payload : PayloadResult<String>
) {
    constructor(id : Int, packetType: PacketType, byteArray: ByteArray) : this(id, packetType, PayloadResult.success(Base64.getEncoder().encodeToString(byteArray)))
}

// bytearray로 변환된 payload 얻거나 null반환
fun PayloadResult<String>.getBytePayloadOrNull() : ByteArray? {
    return this.getOrNull()?.let {
        Base64.getDecoder().decode(it)
    }
}

서버와 클라이언트 간의 소켓을 통해 전달되는 패킷입니다. id로 소켓을 구분하고, packetType으로 연결인지, 메시지인지 구분, payload로 메시지를 전달합니다. 이때 payload에 Exception이 있는경우 소켓 연결에 문제가 있는것으로 판단, 연결을 끊습니다.

공용 모듈입니다. Client와 Server가 활용하고 있습니다.

  • PacketUtil
fun serialize(packet : Packet) : ByteArray

fun deserialize(byteArray : ByteArray) : Packet

말 그대로 packet 객체를 ByteArray로 만들어 소켓을 통해 전달할 수 있게 하거나, 입력받은 ByteArray를 패킷으로 역직렬화 하는 경우 사용됩니다.

  • AbstractSocket - ClientSocket
/**
 * end 단에서 처리할 소켓 추상 클래스
 * @property id 각 소켓의 고유한 id입니다.
 * @property outBoundQueue 서버에 정보를 전달하기 위한 패킷을 담을 큐입니다.
 * @property socket 현재 연결된 소켓입니다. (끝단)
 */
abstract class AbstractSocket(val id: Int, private val outBoundQueue : PacketQueue, protected val socket: Socket) :
    Runnable {

    /**
     * 처리 가능한 패킷인지 확인
     */
    private fun isHandleable(packet: Packet): Boolean 

    /**
     * 패킷 핸들링하기. 만약 자신의 패킷인경우 전송
     */
    fun handlePacket(packet: Packet) 
    /**
     * 연결된 서버로 패킷 보내는 큐에 패킷 담기
     */
    fun sendPacket(packet: Packet) 
}

class ClientSocket(id : Int, outBoundQueue : PacketQueue, socket : Socket) : AbstractSocket(id, outBoundQueue, socket) {

    @Volatile
    private var isRunning: Boolean = false
    private lateinit var thread: Thread

    override fun run() {
        isRunning = true
        thread = readSocket() //내부 쓰레드
    }


    // 연결된 소켓에서 읽어서 - 서버로 전달
    private fun readSocket() : Thread

    //소켓 작동 중단
    fun stopSocket() 
}

위에서 설명드린것 처럼 양 끝단 (터널링 클라이언트 내부 소켓, 서버(B) - 실제 클라이언트)이 처리하는 로직은 동일합니다. 따라서 공용 소켓을 통해 묶었습니다.

여기서 outBoundQueue가 있는데, 이는 동시성 문제 해결을 위해 BlockingQueue를 통해 서버로 패킷을 전달하기 위해 제작된것입니다. 다음 차례인 Concurrency에서 자세히 알 수 있습니다.

  • SocketPool
    이 소켓들을 처리하기 위해선 각 쓰레드별로 처리하지 않으면 한번에 한개밖에 처리할 수 없습니다. 따라서 Executors를 이용해 쓰레드풀을 생성하고 이를 처리하게 되었습니다.
    그런데, 이 소켓풀 또한 ClientSocket을 관리하므로 동일한 로직이 중복될 수 있습니다. 따라서 추상 클래스로 선언하고, Client와 서버에서 이를 활용하도록 했습니다.
    @Volatile
    protected var isRunning: Boolean = false
    private val atomicInteger : AtomicInteger = AtomicInteger(1) //소켓 id 부여하는 atomic Integer
    protected val packetQueue : PacketQueue = PacketQueue(socket) //전송될 패킷이 대기하는 큐. (등록시 패킷 전송 됨)
    private val socketPool: Queue<ClientSocket> = ConcurrentLinkedQueue() //연결된 소켓 담는 큐
    private val threadPoolExecutor: ExecutorService = Executors.newVirtualThreadPerTaskExecutor() //쓰레드 풀


    // 해당 풀 시작하는 메소드 - init 호출 필요
    abstract fun startPool()

    // 해당 풀 종료하는 메소드 - close 호출 필요
    abstract fun stopPool()

    // 패킷 전송, 수신 관리 init 메소드
    fun init() {
        isRunning = true
        requestPacketRead()
        packetQueue.startHandle {
            stopPool() //에러시 풀 중단
        }
    }

    fun close() {
        isRunning = false
        threadPoolExecutor.shutdown()
        socketPool.forEach { it.stopSocket() }
        packetQueue.stopHandle()
    }

    /**
     * 소켓풀에 소켓을 추가하는데, 내부 카운터를 이용해 쓰레드 안전하게 추가합니다.
     * @param clientSocket 추가할 소켓입니다.
     */
    fun addSocketToPool(clientSocket: Socket) : ClientSocket 

    /**
     * 소켓 풀에 소켓 추가, id값을 직접 지정 가능한 메소드. 주의! addSocketToPool 메소드의 내부 카운터와 동기화 되지 않습니다. (3으로 WithId 호출후 addSocketToPool했을때 4 들어가는거 X)
     * @param id 해당 소켓의 id를 지정할 수 있습니다. 이미 지정된 소켓의 id인경우 문제가 발생할 수 있습니다.
     * @param clientSocket 추가할 소켓의 id입니다.
     * @throws IllegalArgumentException 이미 풀에 등록된 소켓의 id를 지정할경우 발생합니다.
     */
    protected fun addSocketToPoolWithId(id : Int, clientSocket: Socket) : ClientSocket {


    //서버로부터 패킷 읽어오는 작업 요청
    private fun requestPacketRead() 

    /**
     * 입력받은 패킷 어떻게 핸들링할지 정하기
     */
    protected abstract fun handlePacket(packet : Packet)

    // 풀 내에 있는 소켓들에게 패킷 notify
    protected fun notifyPacket(packet: Packet) 

여기에서 Client와 연결된 소켓을 ClientSocket으로 가공하고 id를 부여하는데, 이때, 서버에서 실제 클라이언트와 연결할때는 addSocketToPool을 이용해 id를 부여하지만, 이 과정에서 ACCEPT로 보내진 패킷을 읽어 내부 소켓을 형성하는 터널링 클라이언트는 id 동기화가 어렵습니다. (atomicInteger 값 할당해줘야 하나?)
따라서 좀 unsafe하지만, addSocketToPoolWithId를 이용해 특정 id값을 부여하여 풀에 추가하도록 했습니다.

자세한건 github https://github.com/choi-hyeseong/kiki 에서 확인할 수 있습니다.

Concurrency

TCP 패킷은 신뢰성이 중요합니다. 순서보장, 연결지향 등등의 개념이 있죠.
하지만 Socket은 신뢰성을 '확실하게' 보장하기 어렵습니다. 동시성의 문제로 인해, InputStream이나 OutputStream을 동시에 읽거나 쓰면 문제가 발생할 수 있습니다.

따라서, 패킷을 읽는 쓰레드와 쓰는 쓰레드를 한개만 접근하도록 한정지어, 동시성 문제를 해결하였습니다.

우선 서버(터널링 클라이언트)와 연결했을때 읽고 쓰는 기능 + 내부 소켓의 읽고 쓰는기능을 담당해야 합니다.

  • 내부 소켓 읽기
    내부 소켓을 읽는것은 쓰레드풀을 통해 내부 쓰레드에게 담당시켰습니다,

     override fun run() {
        isRunning = true
        readSocket() 
    }
    
    // 연결된 소켓에서 읽어서 - 서버로 전달
    private fun readSocket()  {
        try {
            val array = ByteArray(10000)
            val reader = socket.getInputStream()
            var readByte = reader.read(array)
            while (readByte != -1 && isRunning) {
                val readPacket = array.slice(IntRange(0, readByte - 1)).toByteArray()
                sendPacket(Packet(id, PacketType.MESSAGE, readPacket))
                readByte = reader.read(array)
            }
        }
        catch (e: Exception) {
            println("Read error - $id | ${e.message}")
            socket.close()
            sendPacket(Packet(id, PacketType.MESSAGE, PayloadResult.failure(PayloadException("Connection reset")))) //에러 핸들
        }
    }
  • 내부 소켓 쓰기 - 외부 소켓(서버, 클라이언트) 읽기
    이 두가지 로직을 하나로 묶었습니다. 원래는 handlepacket에서 내부에 패킷 큐로 전달하고, 내부 소켓쓰레드에서 이 큐를 읽어오는 방식으로 했으나, 퍼포먼스가 너무 안나와서 차라리 읽는 쓰레드에서 전달하자 해서 수정하였습니다.

  //서버로부터 패킷 읽어오는 작업 요청
    private fun requestPacketRead() {
        //터널링 서버 - 서버 연결하는 스트림 읽는 메소드. 서버와 서버의 연결 (json형식이기 때문에 buffered reader 써서 string으로 읽어도 안전)
        val runnable = Runnable {
            runCatching {
                val reader = socket.getInputStream().bufferedReader()
                var input = reader.readLine()
                while (input != null && isRunning) {
                    val packet = PacketUtil.deserialize(input.toByteArray(), Packet::class.java)
                    handlePacket(packet)
                    input = reader.readLine()
                }
            }.onFailure {
                println("Can't read packet - ${it.message}")
                stopPool() //소켓 연결 자체가 끊겼으므로 중단
            }
        }
        threadPoolExecutor.submit(runnable)
    }
    
    fun handlePacket(packet : Packet) {
    	socketQueue.stream().forEach ~
    }

즉, 소켓풀에서 읽어와서 handlePacket까지 호출하므로, 해당 쓰레드 하나만 내부 소켓 쓰기와 외부 소켓 읽기를 담당합니다.
그리고 handlePacket 또한, ConcurrentQueue에 저장되어 있고, Stream 메소드는 Spliterator 구현체에 의해 쓰레드 안전하게 forEach를 수행할 수 있는 stream을 반환합니다.

Failure

처음에는 ConcurrentLinkedQueue를 이용해서, 외부 소켓 읽기를 해서 notify를 해준경우 ConcurrentQueue에 패킷을 넣어주고, 내부 소켓 하나의 쓰레드가 while loop를 돌며 큐에 값이 있으면 처리하고 null일경우 무시하는 로직으로 구성했습니다.

while(true) {
	val packet : Packet? = queue.poll()
    if (packet != null)
    	handle~
}

하지만 소켓 연결이 수십개가 되자 cpu로드율이 100%를 찍었습니다.

너무 심각해 보여 IJ Profiler를 이용해 본 결과..

네.. poll이 문제였습니다.

일단 그래서 내부 큐를 제거하고, 위 방식으로 변경하여 로드율 문제를 해결했습니다.

소켓 연결이 수십개가 되더라도 로드율에는 문제가 없는모습

  • 외부 소켓 쓰기 - 이 또한, 동시성 문제가 있었습니다.

원래는 각 내부 소켓에서 외부 소켓에 대한 참조를 가진 상황이고, 패킷 쓰기가 필요할때마다 사용했었는데, 스트레스 테스트를 하던 도중 json 역직렬화 불가 오류가 발생했습니다.
알고보니 outputStream은 쓰레드 안전하지 않아 내부에 payload가 상당히 꼬인 상황이였습니다.

그때당시..
{"id" : 7, "payload" {"id : 6, "payload": "e123sed23e12e1="....}}

json이 끝나지도 않았는데 다른 소켓이 output Stream을 사용하려다가 터져버린 상황이였습니다.

StackOverFlow도 찾아보니 low level에서도 쓰레드 안전성을 보장해주지 못해 어쩔 수 없이 패킷 큐를 제작해야 했습니다.
하지만, 위에 Failure 부분처럼 괜히 또 cpu 과부하가 터질까 무서웠는데, StackOverFlow에서 해답을 제공했습니다.
https://stackoverflow.com/questions/66500191/concurrentlinkedqueue-and-poll

즉슨, CLQ는 poll을 하더라도 없으면 null을 리턴하는데, 이러면 while loop가 계속 돌아가 cpu 퍼포먼스가 떨어진다. 차라리 LinkedBlockingQueue의 take를 사용하면 큐에 값이 들어갈때까지 대기를 하기 때문에 컨텍스트 스위칭이 발생해 다른 자원에게 쓰레드가 할당되어 질 수 있다.

그래서 BlockingQueue의 take를 이용해 작성한 결과, 동시성 문제와 퍼포먼스또한 괜찮게 되었습니다.

/**
 * 패킷 write시 안전하게 처리하기 위한 큐
 * read - handlePacket은 각 소켓마다 생성해야 하기 때문에 성능이 낮아짐 - 싱글 쓰레드 기반 처리 
 * @property socket 패킷이 전송될 소켓입니다.
 */
class PacketQueue(private val socket: Socket) {
    
    @Volatile
    private var isRunning = false //작동여부 boolean
    private var thread : Thread? = null // 패킷 처리할 쓰레드 (virtual)
    private val queue : BlockingQueue<Packet> = LinkedBlockingQueue()
    
    fun addPacket(packet: Packet) {
        queue.add(packet)
    }
    
    fun startHandle(onError : (throwable : Throwable) -> Unit) {
        isRunning = true
        val runnable = Runnable {
            try {
                while (isRunning) {
                    kotlin.runCatching {
                        val packet: Packet = queue.take() //blocking queue이므로 안전하게 대기
                        socket.getOutputStream().writeObject(packet)
                    }.onFailure {
                        println("Can't write packet - ${it.message}")
                        onError(it)
                        return@Runnable
                    }
                }
            }
            catch (e : InterruptedException) {
                return@Runnable
            }
        }
        thread = Thread.startVirtualThread(runnable)
    }
    
    fun stopHandle() {
        isRunning = false
        thread?.interrupt()
    }
    
}

위 내부 패킷 읽기에서 - sendPacket을 통해 패킷 전송을 요청하는데, 이 또한 PacketQueue의 addPacket을 호출하게 됩니다.
따라서 안전하게 패킷 전송이 이루어지게 되었습니다.

Disconnection

연결이 끊어질경우 이 또한 Packet에 Exception을 담아 전송함으로써 반대쪽 소켓도 끊어질수 있도록 하였습니다.

sendPacket(Packet(id, MEssage, PayloadResult.error(PayloadException("Can't Connect"))

Coroutine? Virtual-Thread?

https://techblog.woowahan.com/15398/
https://perfectacle.github.io/2023/07/10/java-virtual-thread-vs-kotlin-coroutine/
처음엔 코루틴이랑 버추얼 쓰레드랑 뭔 차이지? 라고 생각했었는데, 각자 장단점이 있어 이번 프로젝트에서는 코루틴 대신 쓰레드 (버추얼)을 사용했습니다.

코루틴은 비동기적 작업 (await, suspend..)에 적합해 안드로이드, 혹은 비동기적 콜백이 많은 프로젝트에 적합한데 (CPU Usage보단 IO작업등이 많은 IO Bound 프로세스 등등.. - android ui update, database getObject Callback) 이번 프로젝트는 IO작업으로 이루어져 코루틴이 적합한 패킷단이였지만, 코루틴은 while루프 베이스로 풀링해두고 사용하는 방식에는 적합하지 않아 코루틴의 비동기 처리의 장점보다는 쓰레드가 나았습니다. - 현재 ThreadPool 사용

그리고 JDK-21부터 추가된 Virtual-Thread 가상쓰레드를 도입함으로써 더 나은 퍼포먼스를 볼 수 있었습니다. (cpu usage 소폭 감소)
JVM단에서 쓰레드를 가상으로 생성해 해당 쓰레드를 효율적으로 사용함으로써 더 나은 성능을 보여준다고는 하는데.. JDK-21 이전에는 지원하지 않아서.. Backward Compatibility를 만족할지는 모르겠습니다. 이 부분은 논의가 많이 필요해보입니다.
https://d2.naver.com/helloworld/1203723
이 글을 보면 WebMVC로 WebFlux와 유사한 효율을 낼 수 있다곤 한데 상당히 혁신적으로 보이긴 합니다.

Usage

외부 접속이 가능한 서버에서 Server를 빌드한 파일을 실행합니다. 'Server is Running..'이라고 나타나면 정상적으로 실행된것입니다.

이후 터널링이 필요한 서버에서 Client를 빌드한 파일을 실행합니다. 서버의 ip는 위 Server를 실행한 서버의 ip, 포트는 터널링 (외부에서 접속해야 하는 프로그램)이 필요한 포트 (ssh 22, http 8080...)를 입력하면 됩니다.

그러면 서버에서 Opened와 함께 포트번호를 응답받을 수 있는데, 서버의 ip:응답받은 포트번호 로 해서 외부에서 접속하시면 되겠습니다.

Tested - ON 5G Network

  • HTTP (Spring Boot) - 접속 확인
    KakaoTalk_20240627_164905051_01
  • SSH - 접속 확인
    KakaoTalk_20240627_164905051_02
  • Minecraft - 접속확인
    KakaoTalk_20240627_164905051
  • FTP - 접속실패 (21번 포트만 사용하는것이 아닌, 데이터를 주고 받는 추가 포트가 있어서 사용불가.. HandShake는 정상적으로 이루어짐)

Performance

  • Ryzen 3600 (6C/12T) 16GB 기준 cpu 1% 이하 유지.

  • Minecraft 과부하 (Speed 255사용, 청크 풀 로드) - 1~3%. 패킷 전달은 잘 되나, 내부 처리 로직에 의해 moved too quickly 발생. 청크(대용량 데이터) 로딩은 잘됨.

    화면 캡처 2024-06-27 131442

  • Multiple Socket Connection - MC에서 서버 새로고침시 서버의 정보를 받아오는 소켓이 새로고침 될때마다 Establish됨.
    화면 캡처 2024-06-27 130942
    원래는 새로고침 10번정도 해서 소켓 커넥션 10개 이상 생성시 cpu usage가 100%에 도달해서 정상적인 사용이 불가능 했지만, 최적화 해서 해결하였음. 1% 미만 유지하는 모습 확인가능

  • LOIC Stress Test
    디도스 툴로 알려진 LOIC.. 말 그대로 패킷에다가 쓰레기값을 최대치로 채워서 전송하는 도구입니다.
    전 여기서 스트레스 테스트 용도로 사용해볼것입니다.. 불법으로 쓰는거 아니에요..ㅜㅜ Jmeter도 써봤는데 뭔가 너무 악의적이지 않다 해야하나..?

10초동안 수행 - 기존 방식 (Packet Queue 도입 이전)에서는 3초만 돌려도 터졌는데, 큐 도입이후 안정성이 늘어 10초가 지나도 터지지 않는 모습 (server message 수신)

아쉬운점

  • 테스트 코드 부족
    테스트 주도 개발(TDD)등을 그렇게 찬양해놓고 막상 코드 짤려니 이것만 조금더 하고 짜자.. 요것만 해결하고 짜자.. 하다가 코드를 다 짜버렸다.
    common 모듈에서 packet 부분은 테스트 수행을 했는데, Socket 부분은 막상 실제 기기에서 어떻게 동작할지 몰라 가상 서버를 만들어 놓고 런타임 테스트로 버그를 잡다보니 안짜게 된게 있긴한데.. 음 다 핑계다
    추후 보완이 가능하다면 테스트 코드를 작성하며 좀더 효율적인 코드를 작성하고 싶다.

  • Result<T> 사용불가
    https://velog.io/@choi-hyeseong/%EC%BD%94%ED%8B%80%EB%A6%B0%EC%97%90%EC%84%9C-%EC%9E%88%EC%96%B4%EB%B3%B4%EC%9D%B4%EA%B2%8C-try-catch-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0
    여기에서 그렇게 찬양하던 Result 클래스를 이번 패킷에 넣어, 패킷이 성공한 패킷인지, 오류가 있는 패킷인지 구분하려 했는데, jackson이 이를 알아채지 못했다.
    가뜩이나 generic type이라 까다로운데, 내부 객체도 Any로 받고 있어 어쩔 수 없이 exception field와 data 필드를 받고 있는 클래스를 하나 만들었다..
    추후 수정이 가능하다면 Result 를 다시 사용할 수 있으면 좋겠다.

  • Packet Enhance

    패킷 객체를 json으로 변환하고, 복화하는데 0.2초정도 지연된다는 프로파일링 결과가 있었다.
    확실히 직렬화, 역직렬화에서 오버헤드가 있긴해서.. 만약 향상시킬 수 있다면, Packet 객체를 쓴다기 보단, 진짜 패킷 다루듯이 ByteArray를 primitive하게 이용해서 맨 앞 4바이트는 id, 그다음 1바이트는 에러 유무, 그 다음은 payload.. 를 처리해서 직렬화 없이 사용하면 오버헤드가 없지 않을까 라는 생각이 들었다.

profile
KRW 채굴기

0개의 댓글