bull
라이브러리 설치 const queue = new Queue('itemQueue', {
redis: {
host: '127.0.0.1',
port: 6666,
password: 1234,
},
});
await queue
.add(
// 작업에 사용될 데이터 => ex: job.data.userId
{ userId: socket.userId, itemId, newInventorySlot },
{ jobId: `getItem:${itemId}`, removeOnComplete: true },// 불큐 옵션
);
옵션 종류
-jobId
: 작업의 고유id로 동일한 id의 작업이 큐에 이미 있다면 새 작업이 추가되지 않음
-priority
: 작업의 우선순위, 숫자가 낮을수록 높은 우선순위
-delay
: 지정된 시간(ms) 이후 작업실행
-attempts
: 작업 실패 시 재시도(횟수 지정)
-removeOnComplete
: 작업 완료 후 큐에서 제거 여부
=> 기본값이 false라 true로 지정하지 않으면 jobId가 제거되지 않아 같은 jobId의 작업을 실행하지 않음
-removeOnFail
: 작업 실패 시 큐에서 제거 여부
await queue.process(4, async (job) => {
//작업에 사용될 데이터 구조 분해 할당
const { userId, inventorySlot, itemId } = job.data;
//데이터를 활용한 작업 실행
});
- concurrency는 값을 설정하지 않으면 1
위의 코드는 4로 지정
export const itemGetRequestHandler = async ({ socket, payload }) => {
//--A구간
const { itemId, inventorySlot } = payload;
const user = getUserById(socket.userId);
if (!user) {
throw new CustomError(ErrorCodesMaps.USER_NOT_FOUND);
}
const gameSession = getGameSessionById(user.gameId);
if (!gameSession) {
throw new CustomError(ErrorCodesMaps.GAME_NOT_FOUND);
}
const item = gameSession.getItem(itemId);
if (!item.mapOn) {
return;
}
const [bool, newInventorySlot] = await checkSetInventorySlotRedis(
socket.userId,
inventorySlot,
);
// 모든 슬롯에 아이템이 있을경우 처리 중지
if (!bool) {
return;
}
//--A구간
//--B구간
const time = 640;
const key = `${config.redis.user_set}:${userId}:${newInventorySlot}`;
await redisManager.getClient().set(key, itemId, 'EX', time);
user.character.itemCount++;
//--B구간
//--C구간
// 응답 보내주기
itemGetResponse(user.socket, itemId, newInventorySlot);
// 손에 들어주기
itemChangeNotification(gameSession, socket.userId, itemId);
//--C구간
};
//--A구간
~~~
//--A구간
//--B구간
await itemQueueManager
.getQueue()
.add(
{ userId: socket.userId, itemId, newInventorySlot },
{ jobId: `getItem:${itemId}`, removeOnComplete: true },
);
//--B구간
//--C구간
~~~
//--C구간
getState는 현재 진행중이 작업의 진행도를 나타냄
wait
: 작업이 대기열에 추가되었지만 아직 처리되지 않은 상태active
: 작업이 현재 처리 중completed
: 작업이 성공적으로 완료failed
: 작업 실행 중 오류가 발생하여 실패delayed
: 작업이 특정 시간만큼 지연되도록 설정된 상태paused
: 큐 자체가 일시 중지되어 작업이 대기 상태stuck
: 예상치 못한 이유로 진행 중이거나 처리되지 않은 상태로 멈춘 경우
//--B구간
job = await queueManager.getItemQueue().add(
{ userId: socket.userId, itemId, newInventorySlot },
{ jobId: `getItem:${itemId}` },
)
const state = await job.getState();
if (state !== 'wait') {
console.log(`Duplicate job detected for item ${itemId}.`);
return; // 중복 작업인 경우 중단
}
//--B구간
await job.finished(): job이 종료될 때 까지 기다림
=> 만약 동시에 두 작업이 들어오면 한 작업은 add로 큐에 추가되고 완료를 기다리고 두번째 작업은 add로 큐에 추가되지는 않지만 첫번째 작업이 완료될때 까지 기다리는 기현상이 일어난다.
export const itemGetRequestHandler = async ({ socket, payload }) => {
const { itemId, inventorySlot } = payload;
const [bool, newInventorySlot] = await checkSetInventorySlotRedis(
socket.userId,
inventorySlot,
);
// 모든 슬롯에 아이템이 있을경우 처리 중지
if (!bool) {
return;
}
// 동시성 제어1
// 불큐
// 실질적인 아이템 저장 메서드
await itemQueueManager
.getQueue()
.add(
{ userId: socket.userId, itemId, newInventorySlot },
{ jobId: `getItem:${itemId}`, removeOnComplete: true },
);
};