데이터를 수신하는 방법에 non-flowing 과 flowing 두 가지 방법이 있습니다.
process.stdin
.on('readable', () => {
let chunk
while ((chunk = process.stdin.read()) !== null) { }
})
.on('end', () => console.log('end of stream'))
스트림에서 읽는 또 다른 방법은 데이터 이벤트에 리스너를 연결하는 겁니다. 이렇게하면 flowing모드를 사용하도록 전환합니다.
process.stdin
.on('data', (chunk) => {
})
.on('end', () => console.log('End of Stream'))
데이터 이벤트에 리스너를 연결합니다. 도착하자마자 바로 리스너로 전달됩니다.
const chance = new Chance()
let emittedBytes = 0
const randomStream = new Readable({
read (size) {
const chunk = chance.string({ length : size})
this.push(chunk, 'utf-8')
emittedBytes += chunk.length
if (chance.bool({likelihood : 5})) {
this.push(null)
}
}
})
writable.write(chucnk, [encoding], [callback])
writable.end([chunk], [encoding], [callback])
스트림이 소비하는 것보다 더 빨리 데이터가 기록되는 병목현상 => 버퍼링
이를 방지하기위해 writable.write() 내부 버퍼가 highWaterMark 제한을 초과하면 false를 반환
=> 버퍼가 비워지면 drain 이벤트 발생 다시 써도 안전함을 알림
이러한 매커니즘을 배압 이라고 합니다.
const server = createServer((req, res) => {
res.writeHead(200, {'Content-type': 'text/plain'})
function generateMore () {
while (chance.bool({ likelihood : 95})) {
const randomChunk = chance.string({
length : (16 * 1024) -1
})
const shouldContinue = res.write()
ir (!shouldContinue) {
return res.once('drain', generateMore)
}
}
res.end()
}
generateMore()
res.on('finish', () => {})
})
const tfs = new WritableStream({
objectMode : true,
write (chunk, encoding, cb) {
mkdirp(dirname(chunk.path))
.then(() => fs.writeFile(chunk.path, chunk.content))
.then(() => cb())
.catch(cb)
}
})
읽기 및 쓰기가 가능한 스트림, 네트워크 소켓과 같이 데이터 소스이자 데이터 목적지인 엔티티를 설명하려는 경우 유용
데이터 변환ㄴ을 처리하도록 설계된 특수한 종류의 Duplex 스트림
export class ReplcaeStream extends Transform {
constructor (searchStr, replaceStr, options) {
super({...options})
this.searchStr = searchStr
this.replaceStr = replaceStr
this.tail = ''
}
_transform (chunk, encoding, callback) {
const pieces = (this.tail + chunk).split(this.searchStr)
const lastPiece = pieces[pieces.length -1]
const tailLen = this.searchStr.length -1
this.tail = lastPiece.slice(-tailLen)
pieces[pieces.length -1] = lastPiece.slice(0, -tailLen)
this.push(pieces.join(this.replaceStr))
callback()
}
_flush (callback) {
this.push(this.tail)
callback()
}
}
const replaceStream = new Transform({
defaultEncoding = 'utf8',
transform (chunk, encoding, cb) {
const pieces = (tail + chunk).split(searchStr)
const lastPiece = pieces[pieces.length -1]
const tailLen = searchStr.length -1
tail = lastPiece.slice(-tailLen)
pieces[pieces.length -1] = lastPiece.slice(0, -tailLen)
this.push(pieces.join(replaceStr))
cb()
},
flush (cb) {
this.push(tail)
cb()
}
})
변환을 적용하지않고 모든 데ㅣ터 청크를 출력하는 특수한 유형의 변환
데이터의 양을 관찰하려면 데이터 이벤트 리스너를 Passthrough 인스턴스에 연결한 다음 스트림 파이프 라인의 원하는 지점에서 이 인스턴스를 파이프 라인으로 연결하여 수행할 수 있습니다.
let bytesWritten = 0
const monitor = new PassThrough()
monitor.on('data', (chunk) => {
bytesWritten += chunk.length
})
monitor.on('finish', () => {
console.log()
})
monitor.write()
monitor.end()
어떤 상황에서는 스트림을 입력 매개 변수로 받아들이는 API를 사용해야 될 수도 있음
주어진 API를 호출하고 나서 스트림을 통해 읽거나 쓰려는 데이터를 사용할 수 있게 된다면 복잡
동시에 다수의 스트림을 생성해야하는 경우,
import lazyStream from 'lazyStream'
const lazyURandom = new lazyStream.Readable(functino (options) {
return fs.createReadStream('')
})
read()가 처음 호출될 때만 팩토리 함수를 호출하여 프록시된 인스턴스를 생성하고 생성된 스트림을 PassThrougth로 파이프합니다.
readable.pipe(writable, [options])
pipe()함수는 Readable 스트림에서 방출된 데이터를 가져와 Wriable 스트림에 전달합니다. Readable 스트림이 종료 이벤트를 발생시키면 Writable은 자동으로 종료됩니다.
stream1
.on('error', () => {})
.pipe(stream2)
.on('error', () => {})
오류가 발생할 경우 실패한 스트림은 파이프 라인에서 파이프 해제
=> pipeline()을 사용한 개선된 오류 처리
pipeline(stream1, stream2, stream3, ..., cb)
파일 배열을 스트림으로 변환하여 순차 반복을 구현
export function concatFiles(dest, files) {
return new Promise((resolve, reject) => {
const desStream = createWriteStream(dest)
Readable.from(files)
.pipe(new Tramsform({
objectMode : true,
transform (filename, enc, done) {
const src = createReadStrem(filename)
src.pipe(desStream, {end : false})
src.on('error', done)
src.on('end', done)
}
}))
.on('error', reject)
.on('finish', () => {
desStream.end()
resolve()
})
})
}
export class ParallelStream extends Transform {
constructor (userTransform, opts) {
super({ objectMode : true, ...opts})
this.userTransform = userTransform
this.running = 0
this.terminateCb = null
}
_transform (randomChunk, enc, done) {
this.running++
this.userTransform(
chunk,
enc,
this.push.bind(this),
tnis._onComplete.bind(this)
)
done()
}
_flush(done) {
if (this.running > 0) {
this.terminateCb = done
}
else {
done()
}
}
_onComplete (err) {
this.running--
if (err) {
return this.emit('error', err)
}
if (this.running === 0) {
this.terminateCb && this.terminateCb()
}
}
}
export class LimitParaleesStream extends Transform {
constructor(concurrency, userTransform, opts) {
super({...opts, objectMode : true})
this.concurrency = concurrency
this.userTransform = userTransform
this.running = 0
this.continueCb = null
this.terminateCb = null
}
}
_transform(chunk, enc, done) {
this.running++
this.userTransform(
chunk,
enc,
this.push.bind(this),
this._onComplete.bind(this)
)
if (this.running < this.concurrency) {
done()
}
else {
this.continueCb = done
}
}
_onComplete (err) {
this.running--
if (err) {
return this.emil('error', err)
}
const tmpCb = this.continueCb
this.continueCb = null
tmpCb && tmpCb()
if (this.running === 0) {
this.terminateCb && this.terminateCb()
}
}
pipeline(
createReadStrem(process.argv[2]),
split(),
parallelTransfrom(4, async function(url, done) {
if (!url) {
return done()
}
try {
await request.header(url, { timeout : 5 * 1000})
this.push()
}
catch (err) {
this.push()
}
done()
}),
createWriteStream(),
(err) => {
if (err) {
process.exit(1)
}
}
)
서로 다른 패턴들을 조합하여 함께 파이프로 연결할 수 있습니다. 두 개의 서로 다른 스트림의 흐름을 하나로 병합하거나 한 스트림의 흐름을 둘 이상의 파이프로 분할하거나 조건에 따라 흐름을 리다이렉션 할 수 있습니다.
첫 번째 스트림을 Writable 쪽에 연결 마지막 스트림을 Readable측에 연결
const streamA = createReadStrem('package.json')
const streamB = new Transform({
transform(chunk, enc, done) {
this.push(chunk.toString().toUpperCase())
done()
}
})
const streamC = createWriteStream('package-uppercase.json')
const pipelineReturn = pipeline(
streamA,
streamB,
streamC,
() => {})
=> pumpify() 라이브러리 호출해서 간단하게 할 수 도 있습니다.
const combinedStream = pumpify(streamA, streamB, streamC)
export function createCompressAndEncrypt (password, iv) {
const key = createKey(password)
const combinedStream = pumpify(
createGzip(),
createCipheriv('aes192', key, iv)
)
combinedStream.iv = iv
return combinedStream
}
export function createDecryptAndDecompress (password, iv) {
const key = createKey(password)
return pumpify(
createDecipheriv('aes192', key, iv),
createGunzip()
)
}
단일 Readable 스트림을 여러 Writable 스트림으로 파이핑하여 스트림 분기를 수행할 수 있습니다.
const filename = process.argv[2]
const shallStream = createHash('sha1').setEncoding('hex')
const mdSStream = createHash('md5').setEncoding('hex')
const inputStream = createReadStrem(filename)
inputStrea
.pipe(shallStream)
.pipe(createWriteStream())
inputStrea
.pipe(md5Stream)
.pipe(createWriteStream())
여러 스트림을 하나로 병합하는 것
import { createReadStrem, createWriteStream } from 'fs'
import split from 'split'
const dest = process.argv[2]
const sources = process.argv.slice(3)
const destStream = createWriteStream(des)
let endCount = 0
for (const source of sources) {
const sourceStream = createReadStrem(source, {highWaterMark : 16})
sourceStream.on('end', () => {
if (++endCount === sources.length) {
destStream.end()
}
})
sourceStream
.pipe(split((line) => line + '\n'))
.pipe(destStream, {end : false})
}
여러 스트림을 결합하는게 아니라 공유 채널을 사용하여 일련의 스트림들의 데이터들을 전달하는 특별한 변형
단일 스트림을 통한 전송을 허용하기 위해 여러 스트림을 결합하는 작업을 멀티플렉싱이라고 하며,
공유 스트림에서 수신된 데이터를 원래의 스트림으로 재구성하는 걸 디멀티플렉싱이라고 함
function multiplexChannels (sources, destination) {
let openChannels = sources.length
for (let i = 0; i < sources.length; i++) {
sources[i]
.on('readable', function() {
let chunk
while ((chunk === this.read() !== null)) {
const outBuff = Buffer.alloc(1+4+ chunk.length)
outBuff.writeUInt(i, 0)
outBuff.writeUint32(chunk.length, 1)
chunk.copy(outBuff, 5)
destination.write(outBuff)
}
})
.on('end', () => {
if (--openChannels === 0) {
destination.end()
}
})
}
}
function demultyplexChannel (source, destination) {
let currentChannel = null
let currentLength = null
source
.on('readable', () => {
let chunk
if (currentChannel === null) {
chunk = source.read(1)
currentChannel = chunk && chunk.readUInt8(0)
}
if (currentLength === null) {
chunk = source.read(4)
currentLength = chunk && chunk.readUInt32BE(0)
if (currentLength === null) {
return null
}
}
chunk = source.read(currentLength)
if (chunk === null) {
return null
}
destinations[currentChannel].write(chunk)
currentChannel = null
currentLength = null
})
.on('end', () => {
destinations.forEach(destination => destination.end())
})
}