[Node.js] Stream

Vorhandenheit ·2022년 9월 26일
0

JS/Node 

목록 보기
61/63

Stream

  • Readable
  • Writable
  • Duplex
  • Transform

1. 스트림

(1) Readable 스트림

데이터를 수신하는 방법에 non-flowing 과 flowing 두 가지 방법이 있습니다.

Non-flowing 모드

  • 작동 : readable 이벤트에 대해 리스너를 연결한 후, 다음 루프에서 내부 버퍼가 비워질 때까지 데이터를 계속 읽습니다. 이는 read함수를 통해 가능합니다.
    read함수는 Readble 스트림 내부 버퍼에서 데이터 청크를 가져오는 동기작업입니다. 반환된 청크는 Buffer 객체입니다.
process.stdin
.on('readable', () => {
    let chunk
    while ((chunk = process.stdin.read()) !== null) { }
})
.on('end', () => console.log('end of stream'))

Flowing 모드

스트림에서 읽는 또 다른 방법은 데이터 이벤트에 리스너를 연결하는 겁니다. 이렇게하면 flowing모드를 사용하도록 전환합니다.

process.stdin
.on('data', (chunk) => {

})
.on('end', () => console.log('End of Stream'))

데이터 이벤트에 리스너를 연결합니다. 도착하자마자 바로 리스너로 전달됩니다.

단순화된 생성자

const chance = new Chance()
let emittedBytes = 0
const randomStream = new Readable({
    read (size) {
        const chunk = chance.string({ length : size})
        this.push(chunk, 'utf-8')
        emittedBytes += chunk.length
        if (chance.bool({likelihood : 5})) {
            this.push(null)
        }
    }
})

(2) Writable 스트림

writable.write(chucnk, [encoding], [callback])

writable.end([chunk], [encoding], [callback])

배압

스트림이 소비하는 것보다 더 빨리 데이터가 기록되는 병목현상 => 버퍼링
이를 방지하기위해 writable.write() 내부 버퍼가 highWaterMark 제한을 초과하면 false를 반환
=> 버퍼가 비워지면 drain 이벤트 발생 다시 써도 안전함을 알림
이러한 매커니즘을 배압 이라고 합니다.

const server = createServer((req, res) => {
    res.writeHead(200, {'Content-type': 'text/plain'})

    function generateMore () {
        while (chance.bool({ likelihood : 95})) {
            const randomChunk = chance.string({
                length : (16 * 1024) -1
            })
            const shouldContinue = res.write()
            ir (!shouldContinue) {
                return res.once('drain', generateMore)
            }
        }
        res.end()
    }
    generateMore()
    res.on('finish', () => {})
})

단순화된 생성자

const tfs = new WritableStream({
    objectMode : true,
    write (chunk, encoding, cb) {
        mkdirp(dirname(chunk.path))
        .then(() => fs.writeFile(chunk.path, chunk.content))
        .then(() => cb())
        .catch(cb)
    }
})

(3) Duplex 스트림

읽기 및 쓰기가 가능한 스트림, 네트워크 소켓과 같이 데이터 소스이자 데이터 목적지인 엔티티를 설명하려는 경우 유용

(4) Transfrom 스트림

데이터 변환ㄴ을 처리하도록 설계된 특수한 종류의 Duplex 스트림

export class ReplcaeStream extends Transform {
    constructor (searchStr, replaceStr, options) {
        super({...options})
        this.searchStr = searchStr
        this.replaceStr = replaceStr
        this.tail = ''
    }
    _transform (chunk, encoding, callback) {
        const pieces = (this.tail + chunk).split(this.searchStr)
        const lastPiece = pieces[pieces.length -1]
        const tailLen = this.searchStr.length -1
        this.tail = lastPiece.slice(-tailLen)
        pieces[pieces.length -1] = lastPiece.slice(0, -tailLen)

        this.push(pieces.join(this.replaceStr))
        callback()
    }
    _flush (callback) {
        this.push(this.tail)
        callback()
    }
}

단순화된 생성자

const replaceStream = new Transform({
    defaultEncoding = 'utf8',

    transform (chunk, encoding, cb) {
    const pieces = (tail + chunk).split(searchStr)
    const lastPiece = pieces[pieces.length -1]
    const tailLen = searchStr.length -1
    tail = lastPiece.slice(-tailLen)
    pieces[pieces.length -1] = lastPiece.slice(0, -tailLen)
    this.push(pieces.join(replaceStr))
    cb()  
    },

    flush (cb) {
        this.push(tail)
        cb()
    }
})

(5) Passthrough

변환을 적용하지않고 모든 데ㅣ터 청크를 출력하는 특수한 유형의 변환

관찰 가능성

데이터의 양을 관찰하려면 데이터 이벤트 리스너를 Passthrough 인스턴스에 연결한 다음 스트림 파이프 라인의 원하는 지점에서 이 인스턴스를 파이프 라인으로 연결하여 수행할 수 있습니다.

let bytesWritten = 0
const monitor = new PassThrough()
monitor.on('data', (chunk) => {
    bytesWritten += chunk.length
})
monitor.on('finish', () => {
    console.log()
})
monitor.write()
monitor.end()

느린 파이프 연결

어떤 상황에서는 스트림을 입력 매개 변수로 받아들이는 API를 사용해야 될 수도 있음
주어진 API를 호출하고 나서 스트림을 통해 읽거나 쓰려는 데이터를 사용할 수 있게 된다면 복잡

지연 스트림

동시에 다수의 스트림을 생성해야하는 경우,

import lazyStream from 'lazyStream'
const lazyURandom = new lazyStream.Readable(functino (options) {
	return fs.createReadStream('')
})

read()가 처음 호출될 때만 팩토리 함수를 호출하여 프록시된 인스턴스를 생성하고 생성된 스트림을 PassThrougth로 파이프합니다.

파이프를 사용하여 스트림 연결하기

readable.pipe(writable, [options])

pipe()함수는 Readable 스트림에서 방출된 데이터를 가져와 Wriable 스트림에 전달합니다. Readable 스트림이 종료 이벤트를 발생시키면 Writable은 자동으로 종료됩니다.

파이프 오류 처리

stream1
.on('error', () => {})
.pipe(stream2)
.on('error', () => {})

오류가 발생할 경우 실패한 스트림은 파이프 라인에서 파이프 해제

=> pipeline()을 사용한 개선된 오류 처리

pipeline(stream1, stream2, stream3, ..., cb)

2. 스트림을 사용한 비동게 지어 흐름 패턴

(1) 순차적 실행

파일 배열을 스트림으로 변환하여 순차 반복을 구현

export function concatFiles(dest, files) {
    return new Promise((resolve, reject) => {
        const desStream = createWriteStream(dest)
        Readable.from(files)
        .pipe(new Tramsform({
            objectMode : true,
            transform (filename, enc, done) {
                const src = createReadStrem(filename)
                src.pipe(desStream, {end : false})
                src.on('error', done)
                src.on('end', done)
            }
        }))
        .on('error', reject)
        .on('finish', () => {
            desStream.end()
            resolve()
        })
    })
}

(2) 순서가 없는 병렬 실행

export class ParallelStream extends Transform {
    constructor (userTransform, opts) {
        super({ objectMode : true, ...opts})
        this.userTransform = userTransform
        this.running = 0
        this.terminateCb = null
    }

    _transform (randomChunk, enc, done) {
        this.running++
        this.userTransform(
            chunk,
            enc,
            this.push.bind(this),
            tnis._onComplete.bind(this)
        )
        done()
    }
    _flush(done) {
        if (this.running > 0) {
            this.terminateCb = done
        }        
        else {
            done()
        }
    }

    _onComplete (err) {
        this.running--
        if (err) {
            return this.emit('error', err)
        }
        if (this.running === 0) {
            this.terminateCb && this.terminateCb()
        }
    }
}

(3) 순서가 없는 제한된 병렬 실행

export class LimitParaleesStream extends Transform {
    constructor(concurrency, userTransform, opts) {
        super({...opts, objectMode : true})
        this.concurrency = concurrency
        this.userTransform = userTransform
        this.running = 0
        this.continueCb = null
        this.terminateCb = null
    }
}

_transform(chunk, enc, done) {
    this.running++
    this.userTransform(
        chunk,
        enc,
        this.push.bind(this),
        this._onComplete.bind(this)
    )
    if (this.running < this.concurrency) {
        done()
    }
    else {
        this.continueCb = done
    }
}

_onComplete (err) {
    this.running--
    if (err) {
        return this.emil('error', err)
    }
    const tmpCb = this.continueCb
    this.continueCb = null
    tmpCb && tmpCb()
    if (this.running === 0) {
        this.terminateCb && this.terminateCb()
    }
}

(4) 순서가 있는 병렬 실행

pipeline(
    createReadStrem(process.argv[2]),
    split(),
    parallelTransfrom(4, async function(url, done) {
        if (!url) {
            return done()
        }
        try {
            await request.header(url, { timeout : 5 * 1000})
            this.push()
        }
        catch (err) {
            this.push()
        }
        done()
    }),
    createWriteStream(),
    (err) => {
        if (err) {
            process.exit(1)
        }
    }
)

3. 파이핑 패턴

서로 다른 패턴들을 조합하여 함께 파이프로 연결할 수 있습니다. 두 개의 서로 다른 스트림의 흐름을 하나로 병합하거나 한 스트림의 흐름을 둘 이상의 파이프로 분할하거나 조건에 따라 흐름을 리다이렉션 할 수 있습니다.

(1) 스트림 결합

첫 번째 스트림을 Writable 쪽에 연결 마지막 스트림을 Readable측에 연결


const streamA = createReadStrem('package.json')
const streamB = new Transform({
    transform(chunk, enc, done) {
        this.push(chunk.toString().toUpperCase())
        done()
    }
})

const streamC = createWriteStream('package-uppercase.json')

const pipelineReturn = pipeline(
    streamA,
    streamB,
    streamC,
    () => {})

=> pumpify() 라이브러리 호출해서 간단하게 할 수 도 있습니다.

const combinedStream = pumpify(streamA, streamB, streamC)

결합된 스트림 구현

  • 데이터를 암축하고 암호화 ==> 암호를 복호화하고 압축해제
export function createCompressAndEncrypt (password, iv) {
    const key = createKey(password)
    const combinedStream = pumpify(
        createGzip(),
        createCipheriv('aes192', key, iv)
    )
    combinedStream.iv = iv
    return combinedStream
}

export function createDecryptAndDecompress (password, iv) {
    const key = createKey(password)
    return pumpify(
        createDecipheriv('aes192', key, iv),
        createGunzip()
    )
}

(2) 스트림 분기

단일 Readable 스트림을 여러 Writable 스트림으로 파이핑하여 스트림 분기를 수행할 수 있습니다.

다중 체크썸 생성기 구현

const filename = process.argv[2]
const shallStream = createHash('sha1').setEncoding('hex')
const mdSStream = createHash('md5').setEncoding('hex')
const inputStream = createReadStrem(filename)

inputStrea
    .pipe(shallStream)
    .pipe(createWriteStream())

inputStrea
    .pipe(md5Stream)
    .pipe(createWriteStream())

(3) 스트림 병합

여러 스트림을 하나로 병합하는 것

텍스트 파일 병합

 import { createReadStrem, createWriteStream } from 'fs'
import split from 'split'
const dest = process.argv[2]
const sources = process.argv.slice(3)

const destStream = createWriteStream(des)

let endCount = 0
for (const source of sources) {
    const sourceStream = createReadStrem(source, {highWaterMark : 16})
    sourceStream.on('end', () => {
        if (++endCount === sources.length) {
            destStream.end()
        }
    })
    sourceStream
        .pipe(split((line) => line + '\n'))
        .pipe(destStream, {end : false})
}

(4) 멀티플렉싱 및 디멀티플렉싱

여러 스트림을 결합하는게 아니라 공유 채널을 사용하여 일련의 스트림들의 데이터들을 전달하는 특별한 변형
단일 스트림을 통한 전송을 허용하기 위해 여러 스트림을 결합하는 작업을 멀티플렉싱이라고 하며,
공유 스트림에서 수신된 데이터를 원래의 스트림으로 재구성하는 걸 디멀티플렉싱이라고 함

클라이언트 측 - 멀티플렉싱

function multiplexChannels (sources, destination) {
    let openChannels = sources.length
    for (let i = 0; i < sources.length; i++) {
        sources[i]
            .on('readable', function() {
                let chunk
                while ((chunk === this.read() !== null)) {
                    const outBuff = Buffer.alloc(1+4+ chunk.length)
                    outBuff.writeUInt(i, 0)
                    outBuff.writeUint32(chunk.length, 1)
                    chunk.copy(outBuff, 5)
                    destination.write(outBuff)
                }
            })
            .on('end', () => {
                if (--openChannels === 0) {
                    destination.end()
                }
            })
    }
}

서버측 - 디멀티플렉스

function demultyplexChannel (source, destination) {
    let currentChannel = null
    let currentLength = null

    source
        .on('readable', () => {
            let chunk
            if (currentChannel === null) {
                chunk = source.read(1)
                currentChannel = chunk && chunk.readUInt8(0)   
            }

            if (currentLength === null) {
                chunk = source.read(4)
                currentLength = chunk && chunk.readUInt32BE(0)
                if (currentLength === null) {
                    return null
                }
            }
            chunk = source.read(currentLength)
            if (chunk === null) {
                return null
            }

            destinations[currentChannel].write(chunk)
            currentChannel = null
            currentLength = null
        })
        .on('end',  () => {
            destinations.forEach(destination => destination.end())
        })
}
profile
읽고 기록하고 고민하고 사용하고 개발하자!

0개의 댓글