데이터의 연속적인 흐름을 추상화한 개념이다.
대용량 데이터를 청크(chunk) 단위로 나누어 순차적으로 처리하는 방식을 제공한다.
청크(Chunks): 데이터는 청크라는 작은 단위로 나누어져 순차적으로 읽힌다. 청크는 1바이트일 수도 있고, 특정 크기의 타입 배열일 수도 있다.
백프레셔(Backpressure): 내부 큐 매커니즘을 통해 데이터 생산과 소비 속도의 불균형을 조절할 수 있다.(버퍼가 비워졌을 때 추가 데이터를 쓸 수 있도록 설정)
Web Streams API
ReadableStream
, WritableStream
, TransformStream
을 제공한다.Node.js Stream API
ReadableStream
, WritableStream
, DuplexStream
, TransformStream
을 제공한다.💡 EventEmitter란?
이벤트 기반 프로그래밍의 핵심 요소로, 비동기적인 이벤트 처리를 가능하게 하는 클래스이다.
- 특정 이벤트에 대한 리스너(콜백 함수)를 등록하고, 필요할 때 해당 이벤트를 발생시킬 수 있다.
- 하나의 이벤트에 여러 개의 리스너를 등록할 수 있어, 유연한 이벤트 처리가 가능하다.
- 이벤트 리스너는 비동기적으로 실행되어, Node.js의 논블로킹 I/O 모델과 잘 어울린다.
Node.js 버전(v18 이상)에서는 Web Streams API를 기본적으로 지원한다.
데이터를 읽을 수 있는 소스를 나타낸다. 이는 파일 시스템, 네트워크 연결 등 다양한 소스로부터 데이터를 효율적으로 읽을 수 있게 해준다.
data
이벤트로 전달된다.read()
메서드를 통해 명시적으로 데이터를 읽어야 한다.read([size])
: 지정된 크기만큼 데이터를 읽는다. 인자를 생략하면 내부 버퍼의 반환한다.pause()
: 흐름 모드의 스트림을 일시 정지 모드로 전환한다.resume()
: 일시 정지 모드의 스트림을 흐름 모드로 전환한다.pipe(destination[, options])
: 읽은 데이터를 Writable Stream으로 전달한다.unpipe([destination])
: pipe()
로 연결된 스트림을 해제한다.data
: 데이터 청크가 사용 가능할 때 발생한다.(흐름 모드에서만 해당)end
: 더 이상 읽을 데이터가 없을 때 발생한다.error
: 데이터 읽기 중 오류가 발생했을 때 발생한다.close
: 스트림이 완전히 닫혔을 때 발생한다.readable
: 데이터를 읽을 수 있을 때 발생한다.const fs = require("fs");
const readableStream = fs.createReadStream("example.txt", {
encoding: "utf8",
highWaterMark: 64, // 64 바이트 단위로 읽기
});
// 흐름 모드 사용
readableStream.on("data", (chunk) => {
console.log(`데이터의 총 바이트는 ${chunk.length}입니다.`);
console.log(chunk);
});
readableStream.on("end", () => {
console.log("데이터 읽기 끝");
});
readableStream.on("error", (error) => {
console.error("에러 발생:", error);
});
// 일시 정지 모드 사용
readableStream.pause();
readableStream.on("readable", () => {
let chunk;
while (null !== (chunk = readableStream.read())) {
console.log(`데이터의 총 바이트는 ${chunk.length}입니다.`);
console.log(chunk);
}
});
데이터를 쓸 수 있는 대상을 추상화한다. 이는 파일, 네트워크 소켓, 다른 출력 대상에 데이터를 순차적으로 쓸 수 있게 해준다.
write()
메서드의 반환 값을 통해 백프레셔를 수동으로 관리할 수 있다.pipe()
메서드를 지원한다.write(chunk[, encoding][, callback])
: 데이터를 스트림에 쓴다.end([chunk][, encoding][, callback])
: 스트림에 더 이상 쓸 데이터가 없음을 알린다.cork()
: 데이터를 강제로 버퍼링한다.uncork()
: 버퍼링된 데이터를 모두 플러시한다.drain
: 버퍼가 비워져 더 많은 데이터를 쓸 수 있을 때 발생한다.finish
: 모든 데이터가 시스템에 플러시되었을 때 발생한다.error
: 쓰기 작업 중 오류가 발생했을 때 발생한다.close
: 스트림이 완전히 닫혔을 때 발생한다.const fs = require("fs");
const writableStream = fs.createWriteStream("output.txt");
writableStream.write("안녕하세요 ");
writableStream.write("프론트엔드 개발자 이우혁입니다.");
writableStream.end("\n쓰기 작업 끝.");
writableStream.on("finish", () => {
console.log("데이터 쓰기 끝");
});
writableStream.on("error", (error) => {
console.error("에러 발생:", error);
});
데이터를 읽을 수 있는 소스를 추상화한다. 이는 파일, 네트워크 요청, 실시간으로 생성되는 데이터 등 다양한 소스로부터 데이터를 순차적으로 읽을 수 있게 해준다.
getReader()
: 스트림을 읽기 위한 reader 객체를 생성한다.tee()
: 스트림은 두 개의 동일한 스트림으로 분기한다.read()
: 스트림에서 다음 청크를 읽는다. Promise를 반환하며, { done, value }
객체로 resolve 된다.cancel()
: 스트림 읽기를 취소한다.const stream = new ReadableStream({
start(controller) {
console.log("start");
let num = 0;
const interval = setInterval(() => {
controller.enqueue(num++);
if (num === 10) {
controller.close();
clearInterval(interval);
}
}, 1000);
},
});
const reader = stream.getReader();
const readChunks = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
};
readChunks();
React Server Components(RSC)에서는 주로 ReadableStream을 사용하여 서버에서 클라이언트로 데이터를 스트리밍한다. Writable Stream은 주로 서버 측에서 응답을 구성할 때 사용된다.
<서버 측 처리>
RSC Payload 생성: 서버 컴포넌트는 RSC Payload라는 특별한 형식의 데이터를 생성한다.
Readable Stream 생성: 생성된 RSC Payload를 Readable Stream으로 변환한다.
WritableStream 설정: renderToPipeableStream
함수를 사용하여 WritableStream을 설정한다. 이 스트림은 렌더링된 HTML을 HTTP 응답으로 전송하는 데 사용된다.
응답 스트리밍: 생성된 Readable Stream은 HTTP 응답을 통해 직접 클라이언트로 스트리밍된다. 동시에, WritableStream을 통해 렌더링된 HTML이 응답으로 전송된다.
<클라이언트 측 처리>
Web Stream API 사용: 클라이언트는 받은 데이터를 Web Stream API인 ReadableStream으로 처리한다.
스트림 읽기: 클라이언트는 ReadableStream.getReader()
메서드를 사용하여 스크림리더를 얻고, read()
메서드를 통해 데이터를 청크 단위로 읽는다.
RSC Payload 처리: 클라이언트는 받은 RSC Payload를 해석하여 서버 컴포넌트의 결과를 클라이언트 컴포넌트와 결합한다.
이러한 방식으로 서버와 클라이언트 간의 효율적인 데이터 스트리밍이 가능해진다.
<초기 요청 단계>
<스트림 변환 단계>
Readable.fromWeb(response.body)
를 통해 Node.js의 Readable Stream으로 변환된다.fs.createWriteStream(outputPath)
를 통해 파일 쓰기를 위한 Writable Stream이 생성된다.<파일 쓰기 단계>
stream.pipeline()
을 통해 Readable Stream과 Writable Stream이 연결된다.<완료 및 에러 처리>
finish
이벤트가 발생하며 파일 다운로드가 완료된다.error
이벤트가 발생하여 에러를 처리할 수 있다.Node.js Stream 당신이 알아야할 모든 것 1편
Node.js Stream 당신이 알아야할 모든 것 2편
Node.js Stream 당신이 알아야할 모든 것 3편
WritableStream - MDN 문서
ReadableStream - MDN 문서
스트림 데이터 사용을 위한 Stream 모듈 사용하기
자바스크립트에서 데이터 스트림 읽기 (ReadableStream)
Node.js Readable streams distilled
Readable Streams in Node.js