SSE 유사 구현(fetch 스트리밍)

정태수·2025년 9월 15일
post-thumbnail

SSE(Server Sent Events)

서버에서 클라이언트로 단방향 실시간 데이터를 푸시하는 HTTP 기반 기술
SSE는 연결을 한 번만 맺고 단일화된 HTTP 연결을 유지하면서 서버에서 클라이언트로 데이터를 계속해서 보낼 수 있다
별도의 라이브러리 설치 없이 브라우저의 EventSource API를 통해 사용 가능하며, 웹소켓과 달리 양방향 통신은 지원하지 않는 대신 실시간 알림, 주가 정보 표시 등에 사용

쓰이는 곳

  1. 실시간 알림시스템
    • 주식 코인 가격
    • 실시간 피드
  2. 전자상거래
    • 실시간 재고
    • 배송 추적

웹소켓과의 차이?

SSE 기술을 보니까 websocket이랑은 무슨 차이가 있는지 대략적으로만 정리함.

WebSocket

양방향 실시간 통신 (클라이언트 ↔ 서버)
별도 프로토콜 (ws://, wss://)
연결 설정 오버헤드 있음

SSE

단방향 통신 (서버 → 클라이언트)
표준 HTTP/HTTPS 사용
간단한 연결 설정

구현 과정

React + TypeScript로 SSE 스트리밍 챗봇 구현하기

• 프로젝트 개요

AI 챗봇 서비스에서 ChatGPT처럼 텍스트가 실시간으로 타이핑되는 효과를 구현해야 했습니다. 단순한 HTTP 요청-응답 방식으로는 사용자 경험이 떨어져서, SSE(Server-Sent Events) 스트리밍을 도입

핵심 요구사항

  • 실시간 텍스트 스트리밍 (타이핑 효과)
  • 챗봇 타입별 대화 히스토리 관리
  • 무한 스크롤 (feat. 이전 대화 로드)
  • 복잡한 상태 관리와 메모리 최적화
  • 에러 핸들링

• 아키텍처 설계

1. 커스텀 훅 기반 설계 패턴

// 관심사 분리와 재사용성을 위한 커스텀 훅
export function useStreamingChat({ name }: Props) {
    // 상태 관리
    const [messages, setMessages] = useState<ChatMessage[]>([]);
    const [isStreaming, setIsStreaming] = useState(false);
    
    // 무한 스크롤 상태
    const [hasMoreHistory, setHasMoreHistory] = useState(true);
    const [isLoadingHistory, setIsLoadingHistory] = useState(false);
    
    // 메모리 최적화를 위한 ref 활용
    const currentMessageRef = useRef<string>('');
    const currentBotMessageId = useRef<string>('');
}

설계 방법: 스트리밍 로직을 커스텀 훅으로 캡슐화, 컴포넌트는 UI 렌더링만 하도록 분리

2. TypeScript 타입 안전성 보장

export interface ChatMessage {
    id: string;
    content: string;
    sender: 'user' | 'bot';          // 유니온 타입으로 제한
    timestamp: Date;
    isStreaming?: boolean;           // 스트리밍 상태 추적
    type: string;                   // 챗봇 타입별 구분
}

export interface StreamEvent {
    event: string;
    data: any;                      // SSE 이벤트 구조
}

• SSE 스트리밍 구현 핵심

1. EventSource API 대신 fetch + ReadableStream을 선택한 이유

// EventSource는 GET만 지원, POST 불가
const eventSource = new EventSource('/api/stream'); 

// fetch는 POST로 복잡한 데이터 전송 가능
const response = await fetch(url, {
    method: 'POST',
    headers: { 'Accept': 'text/event-stream' },
    body: JSON.stringify({ userId, chatbotId, message })
});

판단: 챗봇은 사용자 ID, 챗봇 ID, 메시지 내용 등 request 데이터가 있어서 fetch 방식을 선택

일반 fetch는 응답이 끝날 때까지 기다리지만,
스트리밍 fetch는 응답 바디를 chunk 단위로 읽어서
챗봇처럼 점진적으로 UI를 업데이트할 수 있다.

2. 스트림 데이터 파싱 로직

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    // 청크 단위로 데이터 수신
    const chunk = decoder.decode(value, { stream: true });
    buffer += chunk;
    const lines = buffer.split('\n');
    buffer = lines.pop() || '';

    // SSE 형식 파싱 (event: / data:)
    for (const line of lines) {
        if (line.startsWith('event: ')) {
            currentEventType = line.substring(7).trim();
        }
        if (line.startsWith('data: ')) {
            const data = JSON.parse(line.substring(6));
            handleStreamEvent({ event: currentEventType, data });
        }
    }
}

핵심 포인트: 네트워크는 청크 단위로 도착하므로, 띄어쓰기 별로 분리하여 lines에 대한 포문 돌린다. 불완전한 JSON을 버퍼링하여 완전한 이벤트만 파싱.


• 성능 최적화 전략

1. useCallback을 활용한 메모리 최적화

// 의존성을 정확히 관리하여 불필요한 리렌더링 방지
const handleStreamEvent = useCallback((event: StreamEvent) => {
    switch (event.event) {
        case 'content_block_delta':
            const deltaText = event.data.delta?.text || event.data.text;
            currentMessageRef.current += deltaText;  // ref로 실시간 누적
            setCurrentStreamingMessage(currentMessageRef.current);
            break;
    }
}, [name]); // name만 의존성으로 설정

const sendMessage = useCallback(async (userId, chatbotId, message) => {

}, [isStreaming, handleStreamEvent, addUserMessage, initBotMessage]);

메모리 최적화 포인트:

  • 스트리밍 중 빈번한 상태 업데이트는 useRef 로 처리
  • 함수 재생성을 방지하는 적절한 의존성 관리

2. 실시간 상태 관리 패턴

// 실시간 텍스트 누적 (ref 활용)
currentMessageRef.current += deltaText;
setCurrentStreamingMessage(currentMessageRef.current);

// 스트리밍 완료 시 최종 상태로 변환
setMessages(prev => prev.map(msg =>
    msg.id === currentBotMessageId.current
        ? { ...msg, content: currentMessageRef.current, isStreaming: false }
        : msg
));

• 무한 스크롤 + 히스토리 관리

1. 페이지네이션 기반 히스토리 로드

const loadMoreHistory = useCallback(async (userId, chatbotId) => {
    if (isLoadingHistory || !hasMoreHistory) return;
    
    setIsLoadingHistory(true);
    
    const response = await fetch('/api/history', {
        method: 'POST',
        body: JSON.stringify({ 
            userId, chatbotId, 
            page: currentPage,      // 페이지 기반 로드
            pageSize: 20 
        })
    });
    
    // 시간순 정렬 보장
    const formattedMessages = historyData.messages
        .sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime());
    
    setCurrentPage(prev => prev + 1);
    setHasMoreHistory(historyData.messages.length === 20);
}, [currentPage, isLoadingHistory, hasMoreHistory]);

2. 스크롤 위치 복원 로직

// ChatView 컴포넌트에서 스크롤 이벤트 감지
const handleScroll = useCallback(() => {
    if (!scrollContainerRef.current || isLoadingHistory) return;
    
    const { scrollTop } = scrollContainerRef.current;
    
    // 상단 근처 도달 시 히스토리 로드
    if (scrollTop < 100) {
        previousScrollHeight.current = scrollContainerRef.current.scrollHeight;
        onLoadMoreHistory();
    }
}, [onLoadMoreHistory, isLoadingHistory]);

// 히스토리 로드 후 스크롤 위치 복원
useEffect(() => {
    if (!isLoadingHistory && scrollContainerRef.current) {
        const newScrollHeight = scrollContainerRef.current.scrollHeight;
        const scrollDiff = newScrollHeight - previousScrollHeight.current;
        scrollContainerRef.current.scrollTop = scrollDiff;
    }
}, [isLoadingHistory]);

• 견고한 에러 핸들링

1. 다층 에러 처리 전략

const sendMessage = useCallback(async (userId, chatbotId, message) => {
    try {
        // 네트워크 레벨 에러
        const response = await fetch(url, { /* options */ });
        if (!response.ok) {
            throw new Error(`HTTP error! status: ${response.status}`);
        }
        
        // 스트림 처리 중 에러
        while (true) {
            const { done, value } = await reader.read();
            // JSON 파싱 에러 처리
            try {
                const data = JSON.parse(dataStr);
                handleStreamEvent({ event: currentEventType, data });
            } catch (e) {
                console.warn('JSON 파싱 실패:', line, e);
                // 개별 청크 실패는 전체를 중단하지 않음
            }
        }
    } catch (error) {
        // 전역 에러 상태 관리
        setHasError(true);
        setErrorMessage(error.message);
        setIsStreaming(false);
    }
}, []);

2. 사용자 친화적 로딩 상태

// 히스토리 로딩 인디케이터
{isLoadingHistory && (
    <div css={loadingStyles}>
        <Spinner />
        <span>이전 대화를 불러오는 중...</span>
    </div>
)}

// 답변 생성 중 애니메이션
{isStreaming && (
    <div css={typingIndicatorStyles}>
        답변 생성중...
    </div>
)}

• 챗봇 타입별 상태 분리

1. 타입별 메시지 필터링

// 전역 메시지 풀에서 현재 챗봇 타입만 필터링
const EachTypeMessages = messages.filter(msg => msg.type === name);

// 챗봇 변경 시 완전 초기화
useEffect(() => {
    resetStreamingState(); // 이전 챗봇 상태 클리어
}, [name, resetStreamingState]);

2. 시간순 정렬 보장

// 히스토리와 실시간 메시지 혼재 시 정렬
setMessages(prev => {
    const currentTypeMessages = prev.filter(msg => msg.type === name);
    const otherTypeMessages = prev.filter(msg => msg.type !== name);
    const allCurrentMessages = [...formattedMessages, ...currentTypeMessages];
    
    // 시간순 재정렬
    const sorted = allCurrentMessages.sort((a, b) => 
        a.timestamp.getTime() - b.timestamp.getTime()
    );
    
    return [...otherTypeMessages, ...sorted];
});

얻은 교훈

  1. 기술 선택의 근거: EventSource vs fetch 선택에서 요구사항 분석이 핵심
  2. 메모리 최적화: 실시간 업데이트에서 useRef의 전략적 활용
  3. 사용자 경험: 로딩 상태와 에러 처리가 기술적 구현만큼 중요
  4. 확장성: 단일 챗봇에서 멀티 타입으로 확장 가능한 설계

profile
프론트엔드 개발자

0개의 댓글