How to increase mongodb data processing performance

azi_zero·2021년 7월 27일
0

leave a mark 🐾

목록 보기
4/6
post-thumbnail

🐥 주절주절 🐥

회사에 들어와서 처음 맡게 된 업무가 DB에서 데이터를 추출해서
원하는 형태에 맞게 가공한 후 csv파일로 만드는 것이었다.

관련 스크립트에 대해서는 이미 내부에서 만들어 진게 있어서 처음에는 그걸 사용해서 다듬는 작업을 했다.

하지만 그 코드는 읽으면 읽을 수록
가독성이 떨어지고, 보수가 필요한 부분이 많이 보여서
거의 다 새로 만들었고, 현재까지 csv 추출 모듈로 잘 사용되고 있다.

중간중간에 오류도 많이 났고, 문제점도 많이 생겨서 고군분투했던 순간이 많이 기억난다.

다행히도 공부할 때는 자존심이 센 스타일이라 오기로 구글링하고 공부하며
처음에는 데이터 양이 너무 많아서 서버가 터지거나 몇십분이 걸리던 것을
이제는 ✨ 어지간해도 2분도 채 안걸리는 수준✨ 까지 끌어올렸다.

만족해서는 안되지만 뿌듯한 지금이기에, 그동안의 코드 history를 공유하고자한다.



history

1. Performance에 관심이 없던 시절

가장 처음에 사용되었던 코드는 아래와 같다.

const data = db.collection('COLLECTION_NAME').find({}).toArray();

위의 방법은 mongodb 공식문서에도 알려주는,
DB에서 가져온 데이터를 array형태로 바로 볼 수 있게 해주는 코드이다.

참고로 이때는 query속도에 대해서 1도 관심이 없던 시절이었다..

하나씩 살펴보면,

코드 분석

Collection.find({})
  • parameter
    -query object를 넣어야 한다.
    -보통은 아래와 같은 형태로 들어간다.
const query = {date: {$gte: start, $lte: end}};
(mongodb 공식 문서로 가면 더 다양한 query 방법을 볼 수 있다)
  • return
    -query filtering을 통해 filtering 된 documents들을 순회하는 cursor(포인터 개념)을 반환한다.
    -cursor를 활용하는 방법은 굉장히 많다.

Cursor.toArray();

cursor를 활용하는 여러가지 방법들 중, 가장 흔한 방법이다.

  • return
    -query filtering을 통해 걸러진 모든 데이터들을 array안에 넣어서 반환해준다.
    -각각의 document들은 object형태로 존재하며, 반환되는 array는 모든 documents(objects)를 담고있다.

문제점

위의 코드를 그대로 사용할 경우, 모든 데이터를 다 받아오고 그 많은 document들을 돌면서 하나의 array안으로 넣는 작업이 이루어 지면서 시간이 진----짜 오래 걸린다!!

만약 전체 document의 갯수도 적고, 하나의 document가 갖고있는 데이터도 많지 않다면 그대로 써도 되지만 그게 아니라면 다른 방법을 생각해봐야 한다.

필자의 경우 분단위 심박수 데이터를 추출하는 상황이었다.
하루치 심박수 데이터가 하나의 document에 들어가있는데, 
하루치 심박수는 분단위로 한다고 해도 1440개가 있고, 
1분당 데이터 안에도 심박수 수치나 다른 요소들이 많았기 때문에 저 방법은 무조건 피해야 했다. 

2. covered query 시행

속도를 어떻게 향상 시켜야 하는지 계속 공부하다가, 아래의 문서를 발견했다.
Performance Best Practices: Indexing
혹시나 아직 index, query에 관해서 잘 모르는 사람들은 꼭! 한번 읽어봤으면 좋겠다.

db.collection('COLLECTION_NAME')
  .find({
     date: {$gte:start, $lte:end},
     user: {$in:target_users},
  })
  .forEach(data=>{})

query에 관해서 공부하면서 진짜 크게 느낀 점은
내가 DB를 만들어 쓰는 것도 아니고, DB에서 데이터를 긁어와서 쓰는거면
💡 잘 만들어진 시스템을 아주 말라 비틀어질 때까지 짜서 써야한다 💡는 것이었다.

1번에서 2번으로 오기까지 사실 우여곡절도 많았고, 
query만 공부하고 바로 수정작업에 들어가면서 
toArray()가 너무나도 강력한 것(array를 만들어야 forEach를 쓸 수 있다고 생각함..) 이라고 생각했기 때문에..
cursor.forEach()가 있는 줄도 모르고 
toArray().filter()이런 식으로 
쓸데없이 데이터를 다 갖고와서 다시 filtering하는 지옥을 몇주간 사용한 적도 있었다..

코드 분석

아래의 query가 제대로 작동(collection scan되면 안됨)해야했기 때문에, compound index를 만들어서 진행했다.

Collection.find({
     date: {$gte:start, $lte:end},
     user: {$in:target_users},
  })

여기에서 눈여겨봐야 할 부분은 아래이다.

user: {$in: target_users}

user field는 ObjectId를 값으로 가지는 field이며,
target_users 변수는 필터링 할 유저들의 고유 id 담고있는 array이다.
ObjectId는 mongoDB에서 제공해준다

user와 관련된 query가 제대로 작동을 하려면
target_users속 id들이 string형태가 아니라 ObjectId형태로 존재해야한다!


cursor.forEach(el=>{})

query filtering을 통해 걸러진 documents들을 하나씩 전부다 도는 것!
Array.forEach()랑 형태가 아주 그냥 똑같지만 array가 아니라 cursor다!
arrow function의 el 변수는 document를 담고있게 된다.

개선점

  1. 첫번째에 비해서 query를 제대로 사용했고, 그 query가 covered query였기 때문에 속도가 더 빨라졌다.
  2. toArray()를 기다릴 필요없이 각 document를 돌면서 data processing을 처리할 수 있기 때문에 더 효율적이다.

문제점

여전히 심박수와 같은 대용량 데이터를 다룰 때에는 시간이 오래 걸린다.
앞에서 부터 하나씩 차례차례 진행되기 때문에 오래걸릴 수 밖에 없는 구조이다.


3 🌈 worker thread 사용

1, 2번과 달라진 세번째 방식의 가장 큰 변화는
nodejs의 worker thread를 사용한다는 것이다!!!!!!

아 솔직히 진짜 이 방법 너무 좋고,, 
속도도 진짜 엄청 빨라지고,,
이제 모든 사용자의 심박수 6개월치 데이터 정도는 그냥 뽑을 수 있게 되었다.
매번 너무 느리다, 서버 터진다, 더 빠르게는 안되나 하는 컴플레인을 듣다가
진짜 속이 뻥 뚫리는 기분이었다.
내부적으로 슈퍼컴퓨터라는 별멍도 얻었다ㅋㅋㅋㅋㅋ 
그냥 너무 뿌듯해서 자랑한번 해봤다..ㅎㅎㅎ

물론 worker thread도 한계가 있을 거고,
다른 문제는 계속 생기겠지만
현재는 이 방법이 최선이라고 생각한다.

exports.user = new Promise(async (resolve, reject) => {
  if (isMainThread) {
    const num_docs = await getNumOfDocs();
    const num_threads = getNumOfThreads(num_docs, QUERY_LIMIT);

    let collection = {};
    const threads = createThreads(path, num_threads, worker_data);
    for (let worker of threads) {
      worker.on('message', (data) => {
        //collect data in collection
      });
      worker.on('exit', () => {
        threads.delete(worker);
        if (threads.size === 0) {
          // collection variable contains all the data
          resolve(/*return data | collection*/);
        }
      });
    }
  } else {
    const {workerData, parentPort} = require('worker_threads');
    const {limit, skip} = workerData;
    let result = {};
    await db
      .collection('COLLECTION_NAME')
      .find({})
      .skip(skip)
      .limit(limit)
      .forEach((data) => {
    	//process data and push it to result object;
      	result.push(processed_data);
      });

    client.close();
    parentPort.postMessage(result);
    parentPort.close();
  }
});
const getNumOfDocs = async()=>{
  const nums = await db.collection('COLLECTION_NAME').find(query_obj).count();
  return nums;
}
const getNumOfThreads = (num_docs, limit) => {
  return Math.floor(num_docs/limit) + 1;
}
const createThreads = (path, num_threads, worker_data) => {
  /*
    worker_data[i] = {limit: number, skip: number, ...}
  */
  const threads = new Set();
  for(let i=0; i<num_threads; i++){
    threads.add(new Worker(path, {workerData: worker_data[i]}))
  }
  return threads;
}

코드 분석

new Promise(async(resolve, reject)=>{});

Promise가 아니라 그냥 async()=>{} 였다면,
표면적으로 모듈 호출이 의미하는 것은
worker thread를 만들고, 각 worker thread가 어떤 코드를 실행할지 정해주고, 어떤 메세지를 주고 받을지 결정하는 것이 끝이다.

즉, 우리가 원하는 일(각각의 worker thread들이 추출한 데이터를 main thread에서 마지막에 모아서 반환하는 것)은 흐름에 맞게 이루어질 수 없다.

그렇기 때문에 promise의 resolve()를 이용해야 한다.
resolve에 각각의 worker threads를 통해서 모은 데이터를 모아서 반환해주면 된다!

 worker.on('exit', () => {
        threads.delete(worker);
        if (threads.size === 0) {
          // collection variable contains all the data
          resolve(/*return data | collection*/);
        }
      });

const worker_data = [
  {skip:0, limit:1000,},
  {skip:1000, limit: 1000,},
  ...
]

각각의 worker thread가 꼭 갖고있어야 하는 값이다.
data query를 할 때, pagination(skip, limit)을 사용하기 때문에 각 worker thread는 자신에게 맞는 값을 갖고 있어야한다.


// in MainThread
const threads = createThreads(path, num_threads, worker_data);

const createThreads = (path, num_threads, worker_data) => {
  /*
    worker_data[i] = {limit: number, skip: number, ...}
  */
  const threads = new Set();
  for(let i=0; i<num_threads; i++){
    threads.add(new Worker(path, {workerData: worker_data[i]}))
  }
  return threads;
}

각각의 worker thread는 Set 자료구조에 저장된다.
특히 path는 worker thread가 시행되는 파일의 경로를 담고 있어야하는데,
필자의 경우 main thread와 같은 모듈이 시행되어야 하기 때문에 해당 모듈로 연결해주는 파일을 만들어주고 해당 파일의 경로로 설정했다.
이렇게 하지 않으려면 ``안에 worker thread가 실행되어야하는 코드를 넣으면 되는데, 가독성이 떨어져서 사용하지 않았다.


// in MainThread
for(let worker of threads){
  worker.on('message',(data)=>{});
  worker.on('exit',()=>{
  	threads.delete(worker);
    if(threads.size === 0){}
  })
}

main thread에서 worker thread로 부터 전달받은 signal을 처리하는 코드이다.
message를 전달 받으면, 전달받은 데이터를 모으고
exit를 전달받으면, 해당 worker thread를 삭제하며 모든 worker thread가 삭제 되었을 때(작업이 다 끝났을 때) 우리가 원하는 일을 처리해주도록 했다.


// in worker thread
await db
      .collection('COLLECTION_NAME')
      .find({})
      .skip(skip)
      .limit(limit)
      .forEach((data) => {
    	//process data and push it to result object;
      	result.push(processed_data);
      });

worker thread가 수행하는 코드이다.
pagination을 수행한다. 이 부분이 가장 큰 속도 향상을 가져다 주었다.


// in worker thread
client.close();
parentPort.postMessage(result);
parentPort.close();

pagination을 통해 원하는 데이터를 query한 worker thread는
해당 데이터를 main thread로 전달해주고 종료 메세지를 남겨주면 된다.

위에 보이는 client.close()는 db연결을 종료하는 것이다.
이 작업을 꼭!!!! 해줘야한다! 그래야 worker thread가 종료될 수 있다.

개선점

thread 여러개를 동시에 사용하고, pagination을 사용하면서 속도가 엄청 빨라졌다.

문제점

현재까지 발생한 문제점은 없다.
지금은 pagination limit을 1000으로 두어서 진행하고있는데, 나중에 데이터가 더 많아져서 limit을 더 올리게 되었을 때 속도가 그대로 유지될 지는 의문이다..



conclusion

DB에서 데이터를 다룰 때 가장 중요한 부분은 Index이다.
본인의 상황에 맞게 index를 설정하고 그걸 잘 이용해서 query를 진행해야 반은 먹고 들어간다.
또한 필자의 경우 data query와 동시에 CPU사용량이 높아서 worker threads가 잘 사용되었다.
각자의 operation이 어떤 타입인지를 보고 다시 결정하면 좋을 것 같다.

profile
잘 하고 싶은 욕심을 가득 갖고 태어남

0개의 댓글