각 핸들러에서 sendPacket을 넣어줄 때 연속적으로 패킷을 넣으면 패킷이 정상 발송은 되나, 서버 로그상에는 에러가 생기는
문제가 발생했다.
deQueueSend() 부분에서 값을 제대로 가져오지 못해 구조분해 할당이 되지 않는 상황이다.
위 메서드의 경우 뮤텍스 락을 이용하여 동시성 제어가 된 메서드인데, 비동기 처리가 되어있어 원래는 대기하고 꺼내서
가져오는게 맞다고 생각했다.
나는 공유자산인 큐의 접근만 막을 생각을 했지, 이를 이용하는 메서드들에 대한 동시성은 생각을 하지 못했던 것이다.
processQueue 의 진입 조건이 queue의 크기가 1이상일 때 인데,
빠르게 sendPacket을 연속 실행 시 processSendPacket 메서드 2개 이상 실행되면 수행 절차는 아래와 같다.
async enQueueSend(socket, packet) {
await this.sendLock.runExclusive(() => {
this.sendQueue.push({ socket, packet });
this.processSendPacket();
});
}
async deQueueSend() {
return await this.sendLock.runExclusive(() => {
return this.sendQueue.shift();
});
}
async processSendPacket() {
if (!this.sendQueue.length) return;
try {
while (this.sendQueue.length) {
const { socket, packet } = await this.deQueueSend();
if (!socket || !packet) throw new Error('패킷 보내기 오류');
await socket.write(packet);
}
} catch (err) {
handleErr(null, err);
}
}
위의 코드를 보면 enQueueSend
가 실행되었을 때와 deQueueSend
가 실행되었을 때 뮤텍스 락을 실행한다.
만약 enQueSend
에 있는 .processSendPacket
에 await
를 처리하게 된다면 처리 과정은 다음과 같다.
async processSendPacket() {
if (!this.sendQueue.length || this.sendProcessing) return;
this.sendProcessing = true;
try {
while (this.sendQueue.length) {
const { socket, packet } = await this.deQueueSend();
if (!socket || !packet) throw new Error('패킷 보내기 오류');
await socket.write(packet);
}
} catch (err) {
handleErr(null, err);
} finally {
this.sendProcessing = false;
}
}
processSendPacket이 실행중일 경우 return 시키는 플래그를 하나 세워서 이미 처리중이면 접근하지 않도록 처리하였다.