[Node.js 디자인패턴] 6주차

김련호·2021년 6월 30일
0

5.2.4 양방향(Duplex) 스트림

Readable, Writable의 성질을 동시에 가지는 스트림으로 내부적으로 _read(), _write() 모두 구현이 필요하다.

5.2.5 Transform 스트림

Duplex 스트림의 경우에는 Read, Write 스트림 사이에 어떠한 직접적인 관계가 없다. 하지만 Transform 스트림의 경우에는, Writable에서 받은 데이터를 가공하여 Readable에서 사용할 수 있도록 한다.
_transform(), _flush()를 구현해야한다.

const stream = require('stream');

class ReplaceStream extends stream.Transform {
  constructor(searchString, replaceString) {
    super();
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
  }

  _transform(chunk, encoding, callback) {
    const pieces = (this.tailPiece + chunk)
      .split(this.searchString);
    const lastPiece = pieces[pieces.length - 1];
    const tailPieceLen = this.searchString.length - 1;

    this.tailPiece = lastPiece.slice(-tailPieceLen);
    pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);

    this.push(pieces.join(this.replaceString));
    callback();
  }

  _flush(callback) {
    this.push(this.tailPiece);
    callback();
  }
}

module.exports = ReplaceStream;

파이프를 통한 스트림 연결

const ReplaceStream = require('./replaceStream');
process.stdin
  .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
  .pipe(process.stdout);

5.3 스트림을 사용한 비동기 제어

5.3.1 순차 실행

// concatFiles.js
const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');

function concatFiles(destination, files, callback) {
  const destStream = fs.createWriteStream(destination);
  fromArray.obj(files)             
    .pipe(through.obj((file, enc, done) => {   
      const src = fs.createReadStream(file);
      src.pipe(destStream, {end: false});
      src.on('end', done); 
    }))
    .on('finish', () => {         
      destStream.end();
      callback();
    });
}

module.exports = concatFiles;

// concat.js
const concatFiles = require('./concatFiles');

concatFiles(process.argv[2], process.argv.slice(3), () => { 
  console.log('Files concatenated successfully');
});

5.3.2 비순차 병렬 실행

// parallelStream.js
const stream = require('stream');

class ParallelStream extends stream.Transform {
  constructor(userTransform) {
    super({objectMode: true});
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
  }

  _transform(chunk, enc, done) {
    this.running++;
    this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
    done();
  }

  _flush(done) {
    if(this.running > 0) {
      this.terminateCallback = done;
    } else {
      done();
    }
  }

  _onComplete(err) {
    this.running--;
    if(err) {
      return this.emit('error', err);
    }
    if(this.running === 0) {
      this.terminateCallback && this.terminateCallback();
    }
  }
}

module.exports = ParallelStream;


// checkUrls.js
const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');

fs.createReadStream(process.argv[2])         
  .pipe(split())                             
  .pipe(new ParallelStream((url, enc, done, push) => {     
    if(!url) return done();
    request.head(url, (err, response) => {
      push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))   
  .on('finish', () => console.log('All urls were checked'))
;
profile
기본이지만 잘 모르고 있던 것들, 모호한 것들을 기록합니다.

0개의 댓글