[소켓 프로그래밍 - Listen]에서 Accept의 블로킹을 해결하기 위해 비동기로 클라이언트의 요청을 받는 Listen Class를 구현했는데 Receive와 Send에 대한 블로킹을 해결해야할 필요가 있다.
Listen Class로부터 생성되는 Socket에 대해서 비동기적인 Send, Receive를 구현하기위한 Session Class의 인터페이스는 다음과 같다.
Session Class의 비동기 구조는 아래와 같다. Listen Class와 작동방식은 비슷하며 Listen Class는 콜백함수로 Action.Invoke()를 해주었지만 Session Class는 Send, Receive하는 데이터를 출력하는 작업을 진행한다.
Session Class의 초기화는 Start에서 할 것이고 처음에는 클라이언트와 연결된 Socket만 가지고 있으며 이후에 기능들을 추가하는 예제를 구현할 것이다.
class Session
{
Socket _socket;
public void Start(Socket socket)
{
_socket = socket;
}
}
기존 서버의 Disconnect 코드는 아래와 같다.
public void Disconnect()
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
하지만 실행환경에서 Disconnect가 연속 2번 호출된다면 충돌이 일어나면서 오류가 발생할 것이다. 이를 위해 Socket의 연결이 끊겼는지에 대한 flag
변수를 둬야할 필요가 있다.
int _disconnected = 0;
flag
변수를 두었으니 Disconnect함수내에 Socket.Close()를 하기 전, 이미 끊긴 상태인지 확인 후 연결을 끊어야 한다. int형 flag
변수의 검사와 변경을 동시에 하는 것은 Interlocked
의 함수를 이용한다.
int _disconnected = 0;
public void Disconnect()
{
if (Interlocked.Exchange(ref _disconnected, 1) == 1)
return;
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
위의 코드로 Disconnect를 구현하면 멀티쓰레드환경에서 연속된 Disconnect 호출이 발생해도 안전하게 연결을 끊을 수 있다.
기존의 Socket.Receive()는 상대로부터 데이터가 올때까지 블로킹이 발생하는데 이를 위해 Socket.ReceiveAsync()를 이용한 비동기 Receive방식을 사용한다.
Socket.ReceiveAsync()
의 호출을 위해 RegisterRecv()
를 호출한다. public void Start(Socket socket)
{
SocketAsyncEventArgs recvArgs = new SocketAsyncEventArgs();
recvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnRecvCompleted);
recvArgs.SetBuffer(new byte[1024], 0, 1024);
RegisterRecv(recvArgs);
}
이벤트 핸들러로 Receive가 발생하면 OnRecvCompleted
를 실행하도록 지정하고 버퍼의 크기는 기존 예제와 같은 1024로 설정하였다.
Socket.ReceiveAsync()
를 호출하는 함수로 비동기 Receive를 실행하지만 클라이언트로부터 바로 데이터가 도착했을 때에 대한 처리로 pending
을 사용해 직접 콜백함수를 실행한다. void RegisterRecv(SocketAsyncEventArgs args)
{
bool pending = _socket.ReceiveAsync(args);
if (pending == false)
OnRecvCompleted(null, args);
}
void OnRecvCompleted(object sender, SocketAsyncEventArgs args)
{
if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
string recvData = Encoding.UTF8.GetString(args.Buffer, args.Offset, args.BytesTransferred);
Console.WriteLine($"[From Client] {recvData}");
RegisterRecv(args);
}
catch (Exception e)
{
Console.WriteLine($"OnRecvCompleted Failed {e}");
}
}
else
{
Disconnect();
}
}
만약 수신된 데이터의 크기가 0이거나 에러가 발생했다면 연결을 종료하도록 한다.
Send는 Receive와 다르게 추가적인 작업이 필요하다.
Send에 위와 같은 추가작업을 더하는 이유는 다음과 같다.
object _lock = new object();
Queue<byte[]> _sendQueue = new Queue<byte[]>();
List<ArraySegment<byte>> _pendingList = new List<ArraySegment<byte>>();
SocketAsyncEventArgs _sendArgs = new SocketAsyncEventArgs();
비동기 통신 정보를 다루기 위한 SocketAsyncEventArgs
외에 Send메세지를 관리하기 위한 Queue
, 전송 버퍼 List를 관리하기 위한 pendingList
들이 필요하다.
멀티쓰레드환경에서 Send함수는 여러 쓰레드에 동시에 접근가능 하기 때문에 sendQueue
와 pendingList
의 접근에 대한 lock이 필요하다.
public void Send(byte[] sendBuff)
{
lock (_lock)
{
_sendQueue.Enqueue(sendBuff);
if (_pendingList.Count == 0)
RegisterSend();
}
}
보내고자하는 데이터를 Buffer에 담아 Send의 매개변수로 넘겨준다. Send를 실행한 쓰레드는 sendQueue
에 메세지를 넣고 만약 pendingList
가 비어있다면 SendAsync를 예약한다.
void RegisterSend()
{
while (_sendQueue.Count > 0)
{
byte[] buff = _sendQueue.Dequeue();
_pendingList.Add(new ArraySegment<byte>(buff, 0, buff.Length));
}
_sendArgs.BufferList = _pendingList;
bool pending = _socket.SendAsync(_sendArgs);
if (pending == false)
OnSendCompleted(null, _sendArgs);
}
SendAsync를 실행하는 Register함수로 Queue에 있는 모든 메세지들을 pendingList에 추가한 뒤 SocketAsyncEventArgs
를 설정한다.
SendAsync으로 비동기Send를 호출하고 만약 시간내에 완료되었다면 콜백함수를 직접 실행한다.
void OnSendCompleted(object sender, SocketAsyncEventArgs args)
{
lock (_lock)
{
if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
_sendArgs.BufferList = null;
_pendingList.Clear();
// Doing
if (_sendQueue.Count > 0)
RegisterSend();
}
catch (Exception e)
{
Console.WriteLine($"OnSendCompleted Failed {e}");
}
}
}
}
Send가 완료되었을때 실행할 함수로 SocketAsyncEventArgs
의 버퍼지우기, pendingList 비우기, 추가연산을 진행할 수 있다.
위 부분을 실행하는동안 새로운 메세지가 Queue에 들어왔을 수도 있기 때문에 Queue가 비어있지 않다면 다시 SendAsync를 예약한다.
만약 특정 기능을 수행할 때 연산할 작업들(위 예제에서는 CMD 출력)을 이후에 원하는대로 바꾸고 싶다면 아래와 같은 추상 메소드를 추가하고 상세한 구현은 서브클래스에서 하도록 할 수 있다. 추상클래스는 구현을 강제하기 때문에 Session Class를 상속받은 클래스는 아래의 함수를 무조건 구현해야 한다.
public abstract void OnConnected(EndPoint endPoint);
public abstract void OnRecv(ArraySegment<byte> buffer);
public abstract void OnSend(int numOfBytes);
public abstract void OnDisconnected(EndPoint endPoint);
기본골격에 특정 메소드를 자식클래스에서 구현한다. 템플릿 메소드 패턴을 참고하자!
개발자로서 배울 점이 많은 글이었습니다. 감사합니다.