[Node.js 디자인패턴] 5주차

김련호·2021년 6월 16일
0

5.1 스트림의 중요성

Node.js와 같은 이벤트 기반 플랫폼에서 I/O를 처리하는 가장 효율적인 방법은 입력과 출력을 실시간으로 처리하는 것임.

5.1.1 버퍼링 VS 스트리밍

지금까지 다루었던 거의 모든 비동기 API는 버퍼 모드를 사용했음

  • 버퍼 모드: 입력 조작의 경우 버퍼 모드는 리소스로부터 오는 모든 데이터를 버퍼에 수집하고, 모두 읽은 후에 콜백을 통하여 전달한다.
  • 스트리밍: 리소스로부터 데이터 덩어리(chunk)가 수신되면 즉시 리스너(데이터 요청자)에게 전달한다.

스트리밍을 통하여 아래와 같은 이점을 얻을 수 있다.

공간 효율성
시간 표율성

5.1.2 공간 효율성

버퍼링하여 데이터를 처리할 때, 데이터의 크기만큼 메모리 공간이 필요하지만 스트리밍은 훨씬 적은 공간을 필요로 한다. V8 엔진 버퍼는 1GB보다 클 수 없는 문제가 있다.

// 버퍼링을 이용한 파일 압축
// node index.js [filename]
const fs = require('fs');
const zlib = require('zlib');

const file = process.argv[2];

fs.readFile(file, (err, buffer) => { // 파일 데이터 전체 메모리에 load
  zlib.gzip(buffer, (err, buffer) => { // 버퍼 압축
    fs.writeFile(file + '.gz', buffer, err => { // 압축된 데이터를 disk에 write
      console.log('File successfully compressed');
    });
  });
});


// 스트리밍을 이용한 파일 압축
// node index.js [filename]
const fs = require('fs');
const zlib = require('zlib');

const file = process.argv[2];

fs.createReadStream(file) // 파일 읽기 스트림 생성
  .pipe(zlib.createGzip()) // 데이터 압축
  .pipe(fs.createWriteStream(file + '.gz')) // 데이터 쓰기
  .on('finish', () => console.log('File successfully compressed'))
;
  • 코드가 깨끗하고 우아함
  • 다양한 크기의 파일에 대해 항상 일정한 메모리 사용률을 유지함

5.1.3 시간 효율성

파일을 압축하고 서버에 업로드하는 어플리케이션이 있다고 가정하였을 때,

  • 버퍼링의 경우에는 클라이언트에서 전체 파일을 모두 읽어 압축이 완료된 시점에 업로드를 시작할 수 있고, 서버의 경우에도 모든 데이터가 서버에 수신된 경우에만 압축해제 후 서버의 디스크에 저장할 수 있다.
  • 스트리밍의 경우에는 파일의 일부 데이터를 읽는 즉시 압축을 시작하여 압축된 부분을 먼저 전송할 수 있으며, 서버의 경우에도 수신되는 부분을 압축 해제 하여 저장할 수 있다.
// node gzipReceive.js
const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(zlib.createGunzip()) // req 객체 자체가 readable, 압축 해제에 함수에 연결
    .pipe(fs.createWriteStream(filename)) // 파일 write 스트림에 연결
    .on('finish', () => { // 완료 시 response 처리
      res.writeHead(201, {'Content-Type': 'text/plain'});
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));


// node gzipSend.js [file path] localhost
const fs = require('fs');
const zlib = require('zlib');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

// http reqeust 객체 생성
const req = http.request(options, res => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file) // 파일 읽기 스트림 생성
  .pipe(zlib.createGzip()) // 압축 스트림에 연결
  .pipe(req) // http request 스트림에 연결
  .on('finish', () => {
    console.log('File successfully sent');
  })
;

스트림을 통하여 데이터 전달 시에 고려해야하는 사항은, 스트림 내에 흐르고 있는 데이터 덩어리(chunk)의 순서를 보장해야 원래의 데이터 형태가 보존될 것임.
(이것은 Nodejs에서 알아서 처리해준다 !!!!)

5.1.4 결합성

pipe() 메소드를 통해 API를 연결하는 것임. 파이프라인의 다음 스트림이 이전 스트림에 의해 생성되어 전달된 데이터 타입을 지원해야함.

// gzipReceive.js
const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(crypto.createDecipher('aes192', 'a_shared_secret')) // 수신 후 암호화 해제
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {'Content-Type': 'text/plain'});
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    })
  ;
});

server.listen(3000, () => console.log('Listening'));


// gzipSend.js
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

const req = http.request(options, res => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_shared_secret')) // 보내기 전에 암호화
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  })
;

5.2 스트림

5.2.1 스트림의 구조

Node.js에서 모든 스트림은 스트림 코어 모듈에서 사용할 수 있는 4가지 추상 클래스 중 하나의 구현임.

  • stream.Readable
  • stream.Writable
  • stream.Duplex
  • stream.Transform

각 스트림 클래스는 EventEmitter의 인스턴스이고, 각 스트림은 어떤 상황이 발생했을 때 이벤트를 발생 시킨다. (예. Readable에서 읽기가 끝나면 end, 오류 발생 시 error 이벤트 발생)

스트림은 아래의 데이터를 처리할 수 있음(거의 모든 데이터 형태를 지원)

  • 바이너리 모드: 데이터가 버퍼, 문자열 같은 chunk 형태로 스트리밍되는 모드
  • 객체 모드: 스트리밍되는 데이터가 별도 객체로 취급되는 모드

5.2.2 Readable

  • non-flowing 모드
    readable 이벤트에 대하여 listener를 등록하고, readable 이벤트가 발생할 시에 read() 메소드를 이용하여 내부 버퍼에서 chunk를 수신한다.
process.stdin
  .on('readable', () => { // readable 이벤트
    let chunk;
    console.log('New data available');
    while((chunk = process.stdin.read()) !== null) { // read()
      console.log(
        `Chunk read: (${chunk.length}) "${chunk.toString()}"`
      );
    }
  })
  .on('end', () => process.stdout.write('End of stream'))
;

read() 메소드는 Readable 스트림 내부 버퍼에서 데이터를 읽어들이는 동기 작업임

  • flowing 모드
    data 이벤트에 listener를 등록하는 모드임. read() 메소드를 사용하지 않고, 데이터가 도착하자마자(data 이벤트가 발생하자마자) listener를 통해 핸들러에 전달된다.
process.stdin 
  .on('data', chunk => {  // data 이벤트
    console.log('New data available'); 
    console.log( 
      `Chunk read: (${chunk.length}) "${chunk.toString()}"` 
    ); 
  }) 
  .on('end', () => process.stdout.write('End of stream')) 
;
  • flowing 모드는 이전 버전의 스트림 인터페이스(Stream1)의 상속을 받아서 구현된 모드이며, 데이터 흐름을 제어하기 위한 유연성이 떨어짐
  • Stream2 인터페이스의 도입 후 flowing 모드는 기본 모드가 아님
  • flowing 모드를 사용하려면, 위의 예와 같이 data 이벤트를 listener에 등록하거나, resume() 메소드를 명시적으로 호출해야함.

Readable 스트림 구현

  • stream.Readable을 상속받아 구현 필요
  • _read() 메소드 구현 필요

    read()는 API 사용자에 의해 호출되는 메소드이고, _read()는 stream 클래스의 서브 클래스에서 구현되어야 하며 직접 호출하는 함수가 아님

// randomStream.js
const stream = require('stream');
const Chance = require('chance');

const chance = new Chance();

class RandomStream extends stream.Readable {
  constructor(options) {
    super(options); // stream 생성자에 options 인자 그대로 전달
    // encoding, objectMode, highWatermak
  }

  _read(size) {
    const chunk = chance.string();         // 랜덤 스트링 생성
    console.log(`Pushing chunk of size: ${chunk.length}`);
    this.push(chunk, 'utf8');             // 생성된 데이터를 stream 내부 버퍼에 밀어 넣음. 생성된 데이터가 string 타입이라 utf8로 지정
    if(chance.bool({likelihood: 5})) {    // 5%의 확률로 true 반환
      this.push(null); // EOF
    }
  }
}

module.exports = RandomStream;


// generateRandom.js
const RandomStream = require('./randomStream');
const randomStream = new RandomStream();

randomStream.on('readable', () => {
  let chunk;
  while((chunk = randomStream.read()) !== null) {
    console.log(`Chunk received: ${chunk.toString()}`);
  }
});

5.2.3 Writable

스트림에 쓰기

구현되어 있는 Writable 스트림에 write() 메소드를 사용하면 된다.
그리고 더 이상 기록할 데이터가 없다면 end() 메소드를 호출하면 된다.

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {'Content-Type': 'text/plain'});   // writable 스트림의 메소드가 아님, httpServerResponse 클래스의 기능
  while(chance.bool({likelihood: 95})) {       // 95% 확률로 true
    res.write(chance.string() + '\n');         // 랜덤 스트링 생성하여 writable 스트림에 씀
  }
  res.end('\nThe end...\n');           // 마지막 데이터를 쓰고 stream 종료. 마지막 데이터가 없다면 res.end() 로 대체 가능  
  res.on('finish', () => console.log('All data was sent'));   // 정상 종료에 대한 listener 등록. 모든 데이터가 하위 소켓에 플러시 될 때 호출 
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

Back-pressure

내부 버퍼가 비워지기 전에 데이터가 버퍼의 크기를 초과하여 수신되는 경우 병목 발생하며, 이런 경우 write()는 false를 반환한다. application에서는 write()가 false를 반환하는 경우에 스트림에 데이터를 쓰지 않아야 한다. 버퍼가 비워지면 drain 이벤트가 발생하며, 이 때부터 다시 데이터를 쓰기 시작하면 된다. 이러한 메커니즘을 백프레셔 라고 한다.

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {'Content-Type': 'text/plain'});

  function generateMore() {             // 함수로 생성
    while(chance.bool({likelihood: 95})) {
      const shouldContinue = res.write(
        chance.string({length: (16 * 1024) - 1})     // 랜덤 데이터의 크기롤 크게 highWatermark에 가까운 크기
    );
      if(!shouldContinue) {             // 내부 버퍼가 가득찼는지 체크
        console.log('Backpressure');
        return res.once('drain', generateMore); // drain listener 등록
      }
    }
    res.end('\nThe end...\n',() => console.log('All data was sent'));
  }
  generateMore();
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

Writable 구현

stream.Writable 상속 받아서 구현. _write()를 구현해야함.

// toFileStream.js
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');

class ToFileStream extends stream.Writable {
  constructor() {
    super({objectMode: true});
    // highWaterMark, decodeStrings(object 모드에서 무시), objectMode
  }

  _write (chunk, encoding, callback) {
    mkdirp(path.dirname(chunk.path), err => { // objectMode: true 이기 때문에 chunk 객체의 내부 속성에 접근 가능
      if (err) {
        return callback(err);
      }
      fs.writeFile(chunk.path, chunk.content, callback);
    });
  }
}
module.exports = ToFileStream;


// writeToFile.js
const ToFileStream = require('./toFileStream.js');
const tfs = new ToFileStream();

tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log("All files created"));
profile
기본이지만 잘 모르고 있던 것들, 모호한 것들을 기록합니다.

0개의 댓글