2024.11.29 Bull Queue(JS), 트러블 슈팅2

장재영·2024년 11월 29일
0

Bull Queue

  1. bull 라이브러리 설치
  2. 세팅
	const queue = new Queue('itemQueue', {
  		redis: {
    		host: '127.0.0.1',
    		port: 6666,
            password: 1234,
  		},
	});
  1. 큐에 작업 추가: Queue.add
	await queue
    .add(
      // 작업에 사용될 데이터 => ex: job.data.userId
      { userId: socket.userId, itemId, newInventorySlot },
      { jobId: `getItem:${itemId}`, removeOnComplete: true },// 불큐 옵션
    );

옵션 종류
- jobId: 작업의 고유id로 동일한 id의 작업이 큐에 이미 있다면 새 작업이 추가되지 않음
- priority: 작업의 우선순위, 숫자가 낮을수록 높은 우선순위
- delay: 지정된 시간(ms) 이후 작업실행
- attempts: 작업 실패 시 재시도(횟수 지정)
- removeOnComplete: 작업 완료 후 큐에서 제거 여부
=> 기본값이 false라 true로 지정하지 않으면 jobId가 제거되지 않아 같은 jobId의 작업을 실행하지 않음
- removeOnFail: 작업 실패 시 큐에서 제거 여부

  1. 큐의 작업 지시: Queue.process(concurrency, (job) => {})
    await queue.process(4, async (job) => {
      //작업에 사용될 데이터 구조 분해 할당
      const { userId, inventorySlot, itemId } = job.data;
      
      //데이터를 활용한 작업 실행
    });
  • concurrency는 값을 설정하지 않으면 1
    위의 코드는 4로 지정
  1. 그 외 실패할경우 실행될 이벤트 등 지정할 수 있음.

동시성 제어(트러블슈팅2)

  • 원하는 것 A, B, C구간 에서 B구간을 큐에서 작업하고 C구간을실행
  • 동시성 제어로 동일한 작업이 B구간 진입 시 C구간 작업을 못하게 함
    => 동시에 아이템을 잡을 경우 한명한테는 인벤토리에 아이템이 들어가게 다른 한명은 아무 일도 없게
  • 코드 원본
export const itemGetRequestHandler = async ({ socket, payload }) => {
//--A구간
  const { itemId, inventorySlot } = payload;

  const user = getUserById(socket.userId);
  if (!user) {
    throw new CustomError(ErrorCodesMaps.USER_NOT_FOUND);
  }

  const gameSession = getGameSessionById(user.gameId);
  if (!gameSession) {
    throw new CustomError(ErrorCodesMaps.GAME_NOT_FOUND);
  }

  const item = gameSession.getItem(itemId);

  if (!item.mapOn) {
    return;
  }
  const [bool, newInventorySlot] = await checkSetInventorySlotRedis(
    socket.userId,
    inventorySlot,
  );

  // 모든 슬롯에 아이템이 있을경우 처리 중지
  if (!bool) {
    return;
  }
//--A구간

//--B구간
  const time = 640;

  const key = `${config.redis.user_set}:${userId}:${newInventorySlot}`;

  await redisManager.getClient().set(key, itemId, 'EX', time);

  user.character.itemCount++;
//--B구간

//--C구간

  // 응답 보내주기
  itemGetResponse(user.socket, itemId, newInventorySlot);

  // 손에 들어주기
  itemChangeNotification(gameSession, socket.userId, itemId);
//--C구간
};

1차 시기

  • 처음 A구간 B구간 C구간이 나누어져 있을 때 B구간만 불큐에 넣어 작업하는 형태로 만들었다.
//--A구간
~~~
//--A구간
//--B구간
await itemQueueManager
.getQueue()
.add(
  { userId: socket.userId, itemId, newInventorySlot },
  { jobId: `getItem:${itemId}`, removeOnComplete: true },
);
//--B구간
//--C구간
~~~
//--C구간

문제점

  • 거의 동시에 B구간 진입 시 한명은 B구간에서 제외가 되는 것은 확인
  • 하지만 C구간은 B구간과 상관없이 그대로 진행됨

2차 시기

  • B구간에서 getState()로 제어를 시도

    getState는 현재 진행중이 작업의 진행도를 나타냄

    • wait: 작업이 대기열에 추가되었지만 아직 처리되지 않은 상태
    • active: 작업이 현재 처리 중
    • completed: 작업이 성공적으로 완료
    • failed: 작업 실행 중 오류가 발생하여 실패
    • delayed: 작업이 특정 시간만큼 지연되도록 설정된 상태
    • paused: 큐 자체가 일시 중지되어 작업이 대기 상태
    • stuck: 예상치 못한 이유로 진행 중이거나 처리되지 않은 상태로 멈춘 경우
//--B구간
    job = await queueManager.getItemQueue().add(
        { userId: socket.userId, itemId, newInventorySlot },
        { jobId: `getItem:${itemId}` },
     )
    const state = await job.getState();
    if (state !== 'wait') {
        console.log(`Duplicate job detected for item ${itemId}.`);
        return; // 중복 작업인 경우 중단
    }
//--B구간

문제점

  • state가 같은작업의 첫 작업에서 wait이 나올 가능성이 높지만 100퍼는 아님
  • 좀더 확실한 처리를 원하기에 폐기

3차 시기

  • B구간이 완료된 이후에 C구간을 처리하는 로직을 찾아보고 싶었음
  • 결론만 따지면 불가능, 아시는분 있으면 알려주셨으면...

문제점

  • job.finished()를 이용해 보려 했으나 전혀 관련없던 기능

    await job.finished(): job이 종료될 때 까지 기다림
    => 만약 동시에 두 작업이 들어오면 한 작업은 add로 큐에 추가되고 완료를 기다리고 두번째 작업은 add로 큐에 추가되지는 않지만 첫번째 작업이 완료될때 까지 기다리는 기현상이 일어난다.


4차 시기

  • ABC구간을 섞어서 C구간 일부를 최상위에 두고 전부 queue.process에 때려밖음

결과

  • 동시성 제어는 성공 했으나 3차구간에서 원하던 바를 얻지못해 아쉬움.
export const itemGetRequestHandler = async ({ socket, payload }) => {
  const { itemId, inventorySlot } = payload;

  const [bool, newInventorySlot] = await checkSetInventorySlotRedis(
    socket.userId,
    inventorySlot,
  );

  // 모든 슬롯에 아이템이 있을경우 처리 중지
  if (!bool) {
    return;
  }

  // 동시성 제어1
  // 불큐
  // 실질적인 아이템 저장 메서드
  await itemQueueManager
    .getQueue()
    .add(
      { userId: socket.userId, itemId, newInventorySlot },
      { jobId: `getItem:${itemId}`, removeOnComplete: true },
    );
};
profile
개발 하고 싶은 비버

0개의 댓글