Readable, Writable의 성질을 동시에 가지는 스트림으로 내부적으로 _read(), _write() 모두 구현이 필요하다.
Duplex 스트림의 경우에는 Read, Write 스트림 사이에 어떠한 직접적인 관계가 없다. 하지만 Transform 스트림의 경우에는, Writable에서 받은 데이터를 가공하여 Readable에서 사용할 수 있도록 한다.
_transform(), _flush()를 구현해야한다.
const stream = require('stream');
class ReplaceStream extends stream.Transform {
constructor(searchString, replaceString) {
super();
this.searchString = searchString;
this.replaceString = replaceString;
this.tailPiece = '';
}
_transform(chunk, encoding, callback) {
const pieces = (this.tailPiece + chunk)
.split(this.searchString);
const lastPiece = pieces[pieces.length - 1];
const tailPieceLen = this.searchString.length - 1;
this.tailPiece = lastPiece.slice(-tailPieceLen);
pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);
this.push(pieces.join(this.replaceString));
callback();
}
_flush(callback) {
this.push(this.tailPiece);
callback();
}
}
module.exports = ReplaceStream;
const ReplaceStream = require('./replaceStream');
process.stdin
.pipe(new ReplaceStream(process.argv[2], process.argv[3]))
.pipe(process.stdout);
// concatFiles.js
const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');
function concatFiles(destination, files, callback) {
const destStream = fs.createWriteStream(destination);
fromArray.obj(files)
.pipe(through.obj((file, enc, done) => {
const src = fs.createReadStream(file);
src.pipe(destStream, {end: false});
src.on('end', done);
}))
.on('finish', () => {
destStream.end();
callback();
});
}
module.exports = concatFiles;
// concat.js
const concatFiles = require('./concatFiles');
concatFiles(process.argv[2], process.argv.slice(3), () => {
console.log('Files concatenated successfully');
});
// parallelStream.js
const stream = require('stream');
class ParallelStream extends stream.Transform {
constructor(userTransform) {
super({objectMode: true});
this.userTransform = userTransform;
this.running = 0;
this.terminateCallback = null;
}
_transform(chunk, enc, done) {
this.running++;
this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
done();
}
_flush(done) {
if(this.running > 0) {
this.terminateCallback = done;
} else {
done();
}
}
_onComplete(err) {
this.running--;
if(err) {
return this.emit('error', err);
}
if(this.running === 0) {
this.terminateCallback && this.terminateCallback();
}
}
}
module.exports = ParallelStream;
// checkUrls.js
const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');
fs.createReadStream(process.argv[2])
.pipe(split())
.pipe(new ParallelStream((url, enc, done, push) => {
if(!url) return done();
request.head(url, (err, response) => {
push(url + ' is ' + (err ? 'down' : 'up') + '\n');
done();
});
}))
.pipe(fs.createWriteStream('results.txt'))
.on('finish', () => console.log('All urls were checked'))
;