Node 내장 모듈 (4) - Worker-threads

김무연·2023년 12월 12일

Backend

목록 보기
19/49

Worker-threads

Node에서 멀티 스레드 방식으로 작업할 수 있는 방법

하지만 Node에서 멀티 스레드 방식은 극히 드뭄, cpu를 많이 써야되는 암호화나, 압축같은 경우를 직접 구현할 때 사용하거나 함

보통은 싱글 스레드로 작업하는 것으로 생각하고, Worker-threads방식이 메인이 되면 안됨 ( 너무 복잡하기 때문)

  • isMainThread : 현재 코드가 메인 스레드에서 실행되는지, 워커 스레드에서 실행되는지 구분
  • 메인 스레드에서는 new Worker를 통해 현재 파일(__filename)을 워커 스레드에서 실행시킴
  • worker.postMessage로 부모에서 워커로 데이터를 보냄
  • parentPort.on('message')로 부모로부터 데이터를 받고, postMessage로 데이터를 보냄
const {
  Worker, isMainThread, parentPort,
} = require('worker_threads');

if (isMainThread) { // 메인 스레드
  const worker = new Worker(__filename);
  worker.on('message', message => console.log('from worker', message));
  worker.on('exit', () => console.log('worker exit'));
  worker.postMessage('ping');
} else { // 워커 스레드
  parentPort.on('message', (value) => {
    console.log('from parent', value);
    parentPort.postMessage('pong');
    parentPort.close();
  });
}

  • 메인스레드가 실행되서 메인스레드 안에서 워커스레드들을 실행되고 분배한 뒤, 워커스레드 들이 일을 마치면 다시 메인 스레드로 보낸 후 메인스레드에서 합친 후 return을 해주는 방식으로 작동
  • 자동으로 일이 분배가 되는 것이 아닌 우리가 직접 일을 분배해주고, 합쳐서 return해줘야 되서 구현하기 복잡합니다
  • 위 코드에서는 워커 스레드가 하나 밖에 없어서 비교적 간단해보이지만 실제로는 여러개의 워크 스레드를 이용하기 때문에 코드가 복잡해짐

예시

const min = 2;
const max = 10_000_000;
const primes = [];

function generatePrimes(start, range) {
  let isPrime = true;
  const end = start + range;
  for (let i = start; i < end; i++) {
    for (let j = min; j < Math.sqrt(end); j++) {
      if (i !== j && i % j === 0) {
        isPrime = false;
        break;
      }
    }
    if (isPrime) {
      primes.push(i);
    }
    isPrime = true;
  }
}

console.time('prime');
generatePrimes(min, max);
console.timeEnd('prime');
console.log(primes.length);
  • 위 코드로 2, 10,000,000 까지의 소수를 에라토스테네의 체를 통해서 구하려고 하면 싱글스레드 일시 3.582s가 걸린다.

  • 이를 워커 스레드를 통해 멀티 스레드로 계산하게 되면 아래와 같다
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

const min = 2;
let primes = [];

function findPrimes(start, range) {
  let isPrime = true;
  const end = start + range;
  for (let i = start; i < end; i++) {
    for (let j = min; j < Math.sqrt(end); j++) {
      if (i !== j && i % j === 0) {
        isPrime = false;
        break;
      }
    }
    if (isPrime) {
      primes.push(i);
    }
    isPrime = true;
  }
}

if (isMainThread) {
  const max = 10000000;
  const threadCount = 8;
  const threads = new Set();
  const range = Math.floor((max - min) / threadCount);
  let start = min;
  console.time('prime');
  for (let i = 0; i < threadCount - 1; i++) {
    const wStart = start;
    threads.add(new Worker(__filename, { workerData: { start: wStart, range } }));
    start += range;
  }
  threads.add(new Worker(__filename, { workerData: { start, range: max - start } }));
  for (let worker of threads) {
    worker.on('error', (err) => {
      throw err;
    });
    worker.on('exit', () => {
      threads.delete(worker);
      if (threads.size === 0) {
        console.timeEnd('prime');
        console.log(primes.length);
      }
    });
    worker.on('message', (msg) => {
      primes = primes.concat(msg);
    });
  }
} else {
  findPrimes(workerData.start, workerData.range);
  parentPort.postMessage(primes);
}

  • 시간이 줄어든 모습을 볼 수 있다
profile
Notion에 정리된 공부한 글을 옮겨오는 중입니다... (진행중)

0개의 댓글