bullmq 2

김민재·2024년 11월 16일
0

어제 말했던데로 기존에 구현했던 bullmq에는 이제 들어온 작업들을 테스크 큐(임시)에 저장해서 한번에 flush하는 기능이 없었고 오늘은 그 기능에 대해서 구현하면서 생긴 에러와 그 에러를 해결한 방법에 대해서 알아보고자 한다.

bullmq 클래스 구현


class bullmqClass {
  constructor(queueName, maxSize) {
    this.queue = new Queue(queueName, {
      connection: new ioredis(process.env.REDIS_URL, redisOptions),
    });
    this.jobs = [];
    this.maxSize = maxSize;
  }

  async addJob(infos) {
    this.jobs.push(infos);
    console.log(`job added ${infos}`);

    if (this.jobs.length >= this.maxSize) {
      await this.flush();
    }
  }

  async flush() {
    if (this.jobs.length === 0) return;

    this.jobs.forEach(async (job) => {
      console.log(`job`, job);
      const { name, socketId, data } = job; // 꼭 나눠야 함
      await this.queue.add(name, { socketId, data });
    });
    this.jobs = [];
  }
}

먼저 클래스를 선언하여 내부에 jobs라는 임시 테스크 큐를 만들어서 maxSize보다 같거나 많아질시에 한번에 flush하도록 구현하였다.

트러블 슈팅

기본적으로 bullmq에서 제공하는 add 함수의 인자는 add(name: NameType, data: DataType, opts?: JobsOptions)
이름과 데이터와 추가 옵션을 설정하게끔 되어있는데 나는 이것을 {이름, 데이터} = job으로써 합쳐서 queue에 추가를 보내고있었다.

async flush() {
    if (this.jobs.length === 0) return;

    this.jobs.forEach(async (job) => {
      console.log(`job`, job);
      const { name, socketId, data } = job;
      await this.queue.add(name, { socketId, data });
    });
    this.jobs = [];
  }

이렇게 job을 넣을 때 이름과 데이터를 분리해서 넣으니까 제대로 작동하는 모습을 보여줬다.

worker.on 추가 기능

worker.on('completed', (job) => console.log(`${job.id} completed`));
worker.on('failed', (job, error, prev) => console.log(`${job.id} failed , ${error}`));

worker.on에 인자로써 job말고도 error나 prev(string)와 같은 인자들이 있어서 해당하는 작업들을 추적하는 기능이 있다.
작업 진행(process),실패(failed),완료(completed)와 같이 다양한 기능들을 제공하고 있어서 작업이 제대로 완료되는지 확인하는데 정말 유용하다.

피드백

이렇게 실제로 bullmq를 통해서 직접 job queue를 구현해보았다.
이걸 좀 응용을한다면 priority를 구현해서 특정 이벤트에서 높은 우선순위를 줌으로써 먼저 처리되어야할 이벤트 => 죽고나서 싱크 맞추기와 같은 것들이 가능해지지 않을까 라고 생각하고 있다.

profile
ㅇㅇ

0개의 댓글