서버에서 클라이언트로 단방향 실시간 데이터를 푸시하는 HTTP 기반 기술
SSE는 연결을 한 번만 맺고 단일화된 HTTP 연결을 유지하면서 서버에서 클라이언트로 데이터를 계속해서 보낼 수 있다
별도의 라이브러리 설치 없이 브라우저의 EventSource API를 통해 사용 가능하며, 웹소켓과 달리 양방향 통신은 지원하지 않는 대신 실시간 알림, 주가 정보 표시 등에 사용
- 실시간 알림시스템
- 주식 코인 가격
- 실시간 피드
- 전자상거래
- 실시간 재고
- 배송 추적
SSE 기술을 보니까 websocket이랑은 무슨 차이가 있는지 대략적으로만 정리함.
WebSocket
양방향 실시간 통신 (클라이언트 ↔ 서버)
별도 프로토콜 (ws://, wss://)
연결 설정 오버헤드 있음
SSE
단방향 통신 (서버 → 클라이언트)
표준 HTTP/HTTPS 사용
간단한 연결 설정
React + TypeScript로 SSE 스트리밍 챗봇 구현하기
AI 챗봇 서비스에서 ChatGPT처럼 텍스트가 실시간으로 타이핑되는 효과를 구현해야 했습니다. 단순한 HTTP 요청-응답 방식으로는 사용자 경험이 떨어져서, SSE(Server-Sent Events) 스트리밍을 도입
// 관심사 분리와 재사용성을 위한 커스텀 훅
export function useStreamingChat({ name }: Props) {
// 상태 관리
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
// 무한 스크롤 상태
const [hasMoreHistory, setHasMoreHistory] = useState(true);
const [isLoadingHistory, setIsLoadingHistory] = useState(false);
// 메모리 최적화를 위한 ref 활용
const currentMessageRef = useRef<string>('');
const currentBotMessageId = useRef<string>('');
}
설계 방법: 스트리밍 로직을 커스텀 훅으로 캡슐화, 컴포넌트는 UI 렌더링만 하도록 분리
export interface ChatMessage {
id: string;
content: string;
sender: 'user' | 'bot'; // 유니온 타입으로 제한
timestamp: Date;
isStreaming?: boolean; // 스트리밍 상태 추적
type: string; // 챗봇 타입별 구분
}
export interface StreamEvent {
event: string;
data: any; // SSE 이벤트 구조
}
// EventSource는 GET만 지원, POST 불가
const eventSource = new EventSource('/api/stream');
// fetch는 POST로 복잡한 데이터 전송 가능
const response = await fetch(url, {
method: 'POST',
headers: { 'Accept': 'text/event-stream' },
body: JSON.stringify({ userId, chatbotId, message })
});
판단: 챗봇은 사용자 ID, 챗봇 ID, 메시지 내용 등 request 데이터가 있어서 fetch 방식을 선택
일반 fetch는 응답이 끝날 때까지 기다리지만,
스트리밍 fetch는 응답 바디를 chunk 단위로 읽어서
챗봇처럼 점진적으로 UI를 업데이트할 수 있다.
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 청크 단위로 데이터 수신
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || '';
// SSE 형식 파싱 (event: / data:)
for (const line of lines) {
if (line.startsWith('event: ')) {
currentEventType = line.substring(7).trim();
}
if (line.startsWith('data: ')) {
const data = JSON.parse(line.substring(6));
handleStreamEvent({ event: currentEventType, data });
}
}
}
핵심 포인트: 네트워크는 청크 단위로 도착하므로, 띄어쓰기 별로 분리하여 lines에 대한 포문 돌린다. 불완전한 JSON을 버퍼링하여 완전한 이벤트만 파싱.
// 의존성을 정확히 관리하여 불필요한 리렌더링 방지
const handleStreamEvent = useCallback((event: StreamEvent) => {
switch (event.event) {
case 'content_block_delta':
const deltaText = event.data.delta?.text || event.data.text;
currentMessageRef.current += deltaText; // ref로 실시간 누적
setCurrentStreamingMessage(currentMessageRef.current);
break;
}
}, [name]); // name만 의존성으로 설정
const sendMessage = useCallback(async (userId, chatbotId, message) => {
}, [isStreaming, handleStreamEvent, addUserMessage, initBotMessage]);
메모리 최적화 포인트:
useRef 로 처리// 실시간 텍스트 누적 (ref 활용)
currentMessageRef.current += deltaText;
setCurrentStreamingMessage(currentMessageRef.current);
// 스트리밍 완료 시 최종 상태로 변환
setMessages(prev => prev.map(msg =>
msg.id === currentBotMessageId.current
? { ...msg, content: currentMessageRef.current, isStreaming: false }
: msg
));
const loadMoreHistory = useCallback(async (userId, chatbotId) => {
if (isLoadingHistory || !hasMoreHistory) return;
setIsLoadingHistory(true);
const response = await fetch('/api/history', {
method: 'POST',
body: JSON.stringify({
userId, chatbotId,
page: currentPage, // 페이지 기반 로드
pageSize: 20
})
});
// 시간순 정렬 보장
const formattedMessages = historyData.messages
.sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime());
setCurrentPage(prev => prev + 1);
setHasMoreHistory(historyData.messages.length === 20);
}, [currentPage, isLoadingHistory, hasMoreHistory]);
// ChatView 컴포넌트에서 스크롤 이벤트 감지
const handleScroll = useCallback(() => {
if (!scrollContainerRef.current || isLoadingHistory) return;
const { scrollTop } = scrollContainerRef.current;
// 상단 근처 도달 시 히스토리 로드
if (scrollTop < 100) {
previousScrollHeight.current = scrollContainerRef.current.scrollHeight;
onLoadMoreHistory();
}
}, [onLoadMoreHistory, isLoadingHistory]);
// 히스토리 로드 후 스크롤 위치 복원
useEffect(() => {
if (!isLoadingHistory && scrollContainerRef.current) {
const newScrollHeight = scrollContainerRef.current.scrollHeight;
const scrollDiff = newScrollHeight - previousScrollHeight.current;
scrollContainerRef.current.scrollTop = scrollDiff;
}
}, [isLoadingHistory]);
const sendMessage = useCallback(async (userId, chatbotId, message) => {
try {
// 네트워크 레벨 에러
const response = await fetch(url, { /* options */ });
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
// 스트림 처리 중 에러
while (true) {
const { done, value } = await reader.read();
// JSON 파싱 에러 처리
try {
const data = JSON.parse(dataStr);
handleStreamEvent({ event: currentEventType, data });
} catch (e) {
console.warn('JSON 파싱 실패:', line, e);
// 개별 청크 실패는 전체를 중단하지 않음
}
}
} catch (error) {
// 전역 에러 상태 관리
setHasError(true);
setErrorMessage(error.message);
setIsStreaming(false);
}
}, []);
// 히스토리 로딩 인디케이터
{isLoadingHistory && (
<div css={loadingStyles}>
<Spinner />
<span>이전 대화를 불러오는 중...</span>
</div>
)}
// 답변 생성 중 애니메이션
{isStreaming && (
<div css={typingIndicatorStyles}>
답변 생성중...
</div>
)}
// 전역 메시지 풀에서 현재 챗봇 타입만 필터링
const EachTypeMessages = messages.filter(msg => msg.type === name);
// 챗봇 변경 시 완전 초기화
useEffect(() => {
resetStreamingState(); // 이전 챗봇 상태 클리어
}, [name, resetStreamingState]);
// 히스토리와 실시간 메시지 혼재 시 정렬
setMessages(prev => {
const currentTypeMessages = prev.filter(msg => msg.type === name);
const otherTypeMessages = prev.filter(msg => msg.type !== name);
const allCurrentMessages = [...formattedMessages, ...currentTypeMessages];
// 시간순 재정렬
const sorted = allCurrentMessages.sort((a, b) =>
a.timestamp.getTime() - b.timestamp.getTime()
);
return [...otherTypeMessages, ...sorted];
});