
fe assistant로 일하고 있는 회사에서 Rag 기반 LLM 챗봇을 만드는 프로젝트를 진행하게 되었다!
이 때 LLM agent의 실시간 스트리밍 응답을 처리하기 위해 브라우저의 Streams API을 사용했다
나는 이 API를 처음 사용해봤기 때문에 이에 대해서 공부한 내용을 기록하려고 한다!!!!
ndjson 형식 (json 본문을
\n단위로 나누어 한줄씩 보내주는 형식) 스트리밍 데이터를 처리하는 과정이니 참고하시길,,,
ex){ event: example, data: 10tacion}\n{ e...
스트리밍이란 네트워크를 통해 수신하려는 리소스를 작은 bit 단위로 나누어 순차적으로 처리하는 것을 말한다. 브라우저는 영상과 같은 미디어 자산을 수신할 때 이러한 작업을 수행한다.

이사를 한다고 생각해보자. 이사를 할 때 모든 짐을 한번에 옮긴다면, 모든 짐이 도착한다음에 짐을 정리할 수 있으므로 짐 정리 시간이 오래걸릴 것이다. 반면 짐을 여러개의 박스로 포장한 뒤 한 박스 씩 나누어서 옮긴다면 한 박스씩 받아서 짐을 조금씩 정리해나갈 수 있기 때문에 짐 정리를 점진적으로 이어나갈수 있다.
이렇게 짐(데이터)을 박스 단위(청크)로 나누어 옮기는 방식이 스트리밍 방식이다.

웹 개발을 할 때 react를 주로 사용한다. 주요한 데이터 페칭 라이브러리로는 ky와 axios 등이 있다. ky는 fetch 기반, axios(브라우저 기준)는 XHR기반이다.
결론 부터 말하자면 스트리밍 데이터를 처리하기 위해서는 fetch를 사용해야한다! 왜일까?
XHR은 스트림을 처리하기 위한 형식(타입)의 응답 본문을 제공하지 않는다. 항상 응답을 완성된 데이터 형식으로 받은 뒤에 사용할수있다
반면 fetch를 사용하게 되면 응답 본문이 ReadbleStream으로 온다. (Stream API의 인터페이스). 따라서 이 객체를 사용하여 스트리밍 데이터를 처리할 수 있다.
const { body } = await ky.get(`API_주소_${id}`); // readableStream
const { body } = await fetch(`API_주소_${id}`);// readableStream
우리가 ky(fetch) 를 사용하여 정적인 데이터를 받을 때 response.json() 을 호출해 주는데
이 메서드가 ReadableStream 으로 받은 응답을 끝까지 읽어 JSON 형식으로 반환해주는것
자 위 예시에서 fetch를 통해 ReadableStream를 받아왔다.
이 스트리밍 데이터를 어떻게 읽을까...?
이 데이터는 그냥 스트리밍 형식이 아니다. Uint8bitArray(부호없는 8비트 정수들의 배열)형식의 스트리밍 데이터이다. 즉 ReadableStream<Uint8bitArray> 타입이다.
내가 진행한 프로젝트에서는 서버측에서 ndjson 형식으로 청크를 보내주고 있는데, 이 데이터는 UTF-8 형식으로 인코딩이 되어있다. 따라서 이를 UTF-8 형식으로 다시 디코딩을 해주어야한다.
따라서 총 두가지 과정을 거쳐야 한다.
const response = await ky.get(`API_주소_${jobId}`);
const reader = response.body?.getReader(); // ReadableStreamDefaultReader
ReadableStream(응답 본문 객체)에서 getReader 메서드를 실행하면 스트림 데이터에서 청크를 읽기 위한 reader를 반환받을 수 있다. 정확하게 말하면 ReadableStreamDefaultReader를 반환한다.
const decoder = new TextDecoder();
fetch로 받아온 body는 스트림 데이터이고, 이 스트림 데이터의 각 청크는 Uint8Array 형식이다. TextDecoder는 이 배열들을 UTF-8(default 옵션)으로 디코딩 후 스트링 형식으로 변경해준다.
const response = await ky.get(`API_주소_${jobId}`); // ReadableStream<Uint8Array>
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = ''
while (true) {
// 새로 읽은 청크인 value와 스트림 종료 flag인 done 을 반환
const { value, done } = await reader.read(); // 스트림 읽기
if (done) break;
// Uint8Array 타입의 value를 decoder로 string형식으로 변환
buffer += decoder.decode(value, { stream: true }); // string 형식으로 변환
const lines = buffer.split('\n') //
buffer = buffer.pop()
// line 처리 로직 ...
}
read()는 Promise를 반환하며, { done, value } 객체로 resolve 된다. 반복문을 돌면서 반환된 value 값으로 청크를 읽어주고, done을 통해서 스트림이 종료되었는지 판단하여 반복문을 계속하거나 종료한다.
읽은 청크를 decode() 메서드를 통해 UTF-8 디코딩 후 String 형식으로 변환한다.
그리고 받아온 String 데이터는 자기가 알맞게 조절하여 사용하면 된다!
브라우저의 Encording api에는 Transform Stream이라는 인터페이스가 존재한다. 이를 사용하면 byte 스트림을 특정 형식의 스트림으로 변환해줄 수 있다.
예를 들어서 Uint8bitArray 형식의 스트림을 String 형식의 스트림으로 변환해줄 수 있는것이다. 그러면 우리는 decoder를 사용할 필요 없이 String 형식의 스트림에서 청크를 바로 읽어 사용할 수 있다.
그럼 이 TransformStream을 어떻게 사용해야할까?
바로 ReadableStream의 메서드 중 하나인 pipeThrough를 사용하면 된다. 이를 통해 현재 스트림을 Transfrom stream 으로 연결해줄 수 있다. 이름 그대로 파이프의 역할을 한다고 생각하면 된다.
그럼 이를 적용해보자
// pipeThrough
const response = await ky.get(`API_주소_${jobId}`);
const textStream = response.body!.pipeThrough(new TextDecoderStream());
let buffer = ''
for await (const value of textStream) {
buffer += value
const lines = buffer.split('\n')
buffer = buffer.pop()
// line 처리 로직 ...
}
TextDecoderStream은 byte 스트림을 string 형식의 스트림으로 반환해주는 Transfrom Stream 이다. 기존 스트림을 pipeThrough 메서드를 통해 이 Transform stream과 연결해주었다. 이로써 decode 하는 작업을 한줄에 끝내버렸다. 코드의 가독성도 더 직관적이다.
근데 위 코드에서 스트리밍 청크를 읽기 위한 reader가 없다. 왜일까?
바로 for await of 구문을 사용했기 때문이다. 이는 async iterable에 대해서 반복문을 실행할 수 있게 하는 구문이다.
async iterable 익숙하지 않을 수 있는데, iterable과 비슷하게 생각하면 된다.
아래의 경우를 이터레이터라고 하는데,
well-known Symbol인 Symbol.iterator를 프로퍼티 키로 사용한 메서드를 직접 구현하거나 프로토타입 체인을 통해 상속받은 Symbol.iterator 메서드를 호출하면 이터레이터 프로토콜을 준수한 이터레이터를 반환한다.
Symbol.asyncIterator를 프로퍼티 키로 사용한 메서드를 직접구현하거나 상속 받으면 이를 async iterable 이라고 한다. 쉽게 말해 비동기 작업들의 순회를 위해 만들어진 프로토콜을 준수하는 객체를 async iterable 이라고 할수있다.
다시 본론으로 돌아와서, 그럼 저 for await of 의 textStream은 async iterator 일까?
맞다! ReadableStream 객체는 사파리를 제외한 브라우저에서 Symbol.asyncIterator메서드가 구현되어 있다. 내부적으로 스트림의 청크를 read()하고 각 청크의 값을 반환한다. 그래서 for await of 부분의 value 은 청크의 값과 같다. 이를 통해서 reader로 청크를 read()하는 동작이 생략된것이다.
+++ done 값을 기준으로 반복문을 종료하는 기능도 내부적으로 수행한다
ReadableStream객체의 Symbol.asyncIterator메서드 는 사파리에서 구현되어있지 않다. 즉 사파리 브라우저에서는 ReadableStream에 대해서 for await of 구문을 사용할수 없다.

그래서 모든 브라우저의 호환을 위해 다음과 같이 폴리필 파일을 만들어 전역으로 주입한 뒤에 사용해야한다.
아래는 async 제너레이터 함수를 활용한 폴리필 코드의 예시이다.
// src/polyfills.ts
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
};
}
import './polyfills'; // 최상위에서 import