실시간 채팅 어플리케이션으로 알아보는 Server-Sent Events

Changjun·2025년 8월 24일
7

이번에야말로

목록 보기
10/12
post-thumbnail

실시간 스트리밍 채팅 기능을 구현하게되면서 SSE에 대해 공부할 일이 있었다. 아래는 실제 코드를 작성해보며 SSE에 대해서 공부한 글이다.

서론

현대 웹 애플리케이션에서 실시간 데이터 통신은 필수적인 요소이다. 사용자가 페이지를 새로고침하지 않고도 실시간으로 업데이트되는 정보를 받을 수 있어야하는데

이번 글에서는 Server-Sent Events (SSE) 를 사용하여 실시간 채팅 애플리케이션을 구현하는 방법을 자세히 알아보자. SSE는 단방향 실시간 통신에 특화된 기술로, 특히 서버에서 클라이언트로의 데이터 스트리밍에 최적화되어 있다.

서버와 클라이언트의 연결 방식

전통적인 HTTP 통신의 한계

기존의 HTTP 특징은 비연결성이라 한번 요청을 하면 그 연결이 끊긴다는 단점이 있다. 이를 해결하기 위해 폴링, 롱폴링, 웹소켓 같은 기술들이 있다. 하지만

  1. 폴링(Polling): 클라이언트가 주기적으로 서버에 요청을 보내야 함, 서버 과부화 발생 가능, 그리고 실시간이 아님.
  2. 긴 폴링(Long Polling): 서버가 응답을 지연시켜 실시간성을 확보하지만 리소스 낭비
  3. 웹 소켓: 구현 복잡성, 불필요한 양방향 연결 유지, 방화벽 및 프록시 문제

SSE의 접근 방식

SSE는 이러한 한계를 극복하고 단방향 실시간 통신을 제공한다.

클라이언트와 서버사이의 pub/sub 패턴이라고 이해하면 쉽다. 서버에서 새로운 이벤트가 생기면 구독자인 클라이언트에게 알림을 보내는 시스템이다.

WebSocket과의 차이

특징Server-Sent Events (SSE)WebSocket
통신 방향단방향 (서버 → 클라이언트)양방향
프로토콜HTTP 기반독립적인 프로토콜
연결 유지자동 재연결 지원수동 관리 필요
브라우저 지원모든 모던 브라우저모든 모던 브라우저
구현 복잡도간단복잡
사용 사례실시간 알림, 채팅, 대시보드게임, 협업 도구, 실시간 편집

언제 SSE를 사용해야 할까?

SSE가 적합한 경우:

  • 서버에서 클라이언트로의 단방향 데이터 전송
  • 실시간 알림 및 업데이트
  • 스트리밍 데이터 (채팅, 로그 등)
  • 간단한 구현이 필요한 경우

WebSocket이 적합한 경우:

  • 양방향 실시간 통신이 필요한 경우
  • 게임이나 협업 도구
  • 복잡한 상태 동기화

SSE란?

Server-Sent Events의 정의

Server-Sent Events (SSE) 는 HTML5 표준의 일부로, 서버에서 클라이언트로 실시간 데이터를 푸시할 수 있게 해주는 웹 표준 기술이다.

SSE의 핵심 특징

  1. HTTP 기반: 기존 HTTP 인프라를 그대로 활용
  2. 자동 재연결: 연결이 끊어지면 자동으로 재연결 시도
  3. 이벤트 기반: 서버에서 이벤트를 보내면 클라이언트가 수신
  4. 단방향: 서버에서 클라이언트로만 데이터 전송
  5. 표준 지원: 모든 모던 브라우저에서 네이티브 지원

SSE 프로토콜 형식

data: {"type": "text", "content": "안녕하세요"}

data: {"type": "text", "content": "SSE는"}

data: {"type": "text", "content": "정말"}

data: {"type": "text", "content": "유용합니다!"}

data: {"type": "done"}

SSE 구현해보기

이제 실제 코드를 통해 SSE를 구현해보자. NestJS 백엔드와 React 프론트엔드를 사용한 실시간 채팅 애플리케이션을 예시로 들어보겠다.

1. 서버 측 구현 (NestJS)

SSE 연결 수립 (GET 엔드포인트)

@Get("chat/stream")
@UseGuards(OpenAIAuthGuard)
async establishSSEConnection(@Query("sessionId") sessionId: string, @Res() res: Response) {
  try {
    // sessionId 유효성 검사
    if (!sessionId || sessionId.trim() === "") {
      res.status(HttpStatus.BAD_REQUEST).json({
        success: false,
        message: "sessionId is required",
      });
      return;
    }

    this.logger.log(`SSE connection established for session: ${sessionId}`);

    // SSE 표준 헤더 설정
    res.setHeader("Content-Type", "text/event-stream");
    res.setHeader("Cache-Control", "no-cache");
    res.setHeader("Connection", "keep-alive");
    res.setHeader("Access-Control-Allow-Origin", process.env.CLIENT_ORIGIN || "http://localhost:5173");
    res.setHeader("Access-Control-Allow-Headers", "Content-Type");
    res.setHeader("X-Accel-Buffering", "no"); // Nginx 버퍼링 비활성화

    // 연결 유지를 위한 heartbeat 전송
    const heartbeat = setInterval(() => {
      res.write(": heartbeat\n\n");
    }, 30000); // 30초마다 heartbeat

    // SSE 연결을 Map에 저장
    this.sseConnections.set(sessionId, res);

    // 연결이 끊어질 때 정리
    res.on("close", () => {
      this.logger.log(`SSE connection closed for session: ${sessionId}`);
      clearInterval(heartbeat);
      this.sseConnections.delete(sessionId);
    });

    // 연결 유지 (클라이언트가 연결을 끊을 때까지)
    res.on("error", (error) => {
      this.logger.error(`SSE connection error for session ${sessionId}:`, error);
      clearInterval(heartbeat);
      this.sseConnections.delete(sessionId);
    });
  } catch (error) {
    this.logger.error("SSE connection error:", error);
    res.end();
  }
}

메시지 전송 및 스트리밍 응답 (POST 엔드포인트)

@Post("chat/stream")
@UseGuards(OpenAIAuthGuard)
async streamChat(@Body() request: ChatRequest) {
  try {
    this.logger.log(`Stream chat request received for session: ${request.sessionId}`);

    // 해당 세션의 SSE 연결 확인
    const sseConnection = this.sseConnections.get(request.sessionId);
    if (!sseConnection) {
      throw new HttpException(`SSE connection not found for session: ${request.sessionId}`, HttpStatus.BAD_REQUEST);
    }

    // 스트리밍 응답 생성 및 SSE 연결로 푸시
    const stream = this.openaiService.generateStreamingResponse(request);

    for await (const chunk of stream) {
      const data = `data: ${JSON.stringify(chunk)}\n\n`;
      sseConnection.write(data);

      // 에러가 발생하면 스트림 종료
      if (chunk.type === "error") {
        break;
      }

      // 완료되면 스트림 종료
      if (chunk.type === "done") {
        break;
      }
    }

    return { success: true, message: "스트리밍 응답이 전송되었습니다." };
  } catch (error) {
    this.logger.error("Stream chat error:", error);

    if (error instanceof HttpException) {
      throw error;
    }

    throw new HttpException(
      `스트리밍 처리 중 오류가 발생했습니다: ${error.message}`,
      HttpStatus.INTERNAL_SERVER_ERROR
    );
  }
}

OpenAI 스트리밍 서비스

async *generateStreamingResponse(request: ChatRequest): AsyncGenerator<StreamingChunk> {
  try {
    this.logger.log(`Generating streaming response for ${request.messages.length} messages`);

    // OpenAI 스트리밍 API 호출
    const stream = await this.openai.chat.completions.create({
      model: request.modelConfig?.model || "gpt-3.5-turbo",
      messages: request.messages.map((msg) => ({
        role: msg.role,
        content: msg.content,
      })),
      temperature: request.modelConfig?.temperature || 0.7,
      max_tokens: request.modelConfig?.maxTokens || 1000,
      top_p: request.modelConfig?.topP || 1,
      frequency_penalty: request.modelConfig?.frequencyPenalty || 0,
      presence_penalty: request.modelConfig?.presencePenalty || 0,
      stream: true, // 스트리밍 모드 활성화
    });

    // 스트리밍 응답 처리
    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;

      if (content) {
        yield {
          type: "text",
          content,
          metadata: {
            model: chunk.model,
            finishReason: chunk.choices[0]?.finish_reason,
          },
        };
      }
    }

    // 스트리밍 완료 신호
    yield { type: "done" };
  } catch (error) {
    this.logger.error("OpenAI streaming error:", error);
    yield {
      type: "error",
      error: `OpenAI 스트리밍 API 호출 실패: ${error.message}`,
    };
  }
}

2. 클라이언트 측 구현 (React)

SSE 연결 및 메시지 처리 훅

export const useChat = () => {
  const [messages, setMessages] = useState<ChatMessage[]>([
    {
      id: "1",
      role: "assistant",
      content: "안녕하세요! OpenAI 챗봇입니다. 무엇을 도와드릴까요?",
      timestamp: new Date(),
    },
  ]);
  const [inputText, setInputText] = useState("");
  const [isLoading, setIsLoading] = useState(false);
  const [isStreaming, setIsStreaming] = useState(false);
  const [sessionId, setSessionId] = useState<string>("");
  const cleanupRef = useRef<(() => void) | null>(null);
  const tempBotMessageIdRef = useRef<string>("");

  // SSE 메시지 수신 처리를 위한 콜백 함수 (메모이제이션)
  const handleSSEMessage = useCallback(
    (chunk: StreamingChunk) => {
      console.log("SSE 메시지 수신:", chunk);

      if (chunk.type === "text") {
        setMessages((prev) =>
          prev.map((message) =>
            message.id === tempBotMessageIdRef.current
              ? { ...message, content: message.content + chunk.content }
              : message
          )
        );
      } else if (chunk.type === "done") {
        // 스트리밍 완료
        setIsStreaming(false);
        setIsLoading(false);
      } else if (chunk.type === "error") {
        handleError(chunk.error || "스트리밍 오류가 발생했습니다.");
      }
    },
    [handleError]
  );

  // SSE 에러 처리를 위한 콜백 함수 (메모이제이션)
  const handleSSEError = useCallback(
    (error: string) => {
      console.error("SSE 연결 오류:", error);
      handleError(error);
    },
    [handleError]
  );

  // 컴포넌트 마운트 시 SSE 연결 수립 (한 번만)
  useEffect(() => {
    const newSessionId = `session_${Date.now()}`;
    setSessionId(newSessionId);

    // SSE 연결 수립
    const cleanup = establishSSEConnection(newSessionId, handleSSEMessage, handleSSEError);

    cleanupRef.current = cleanup;

    // 컴포넌트 언마운트 시 cleanup 실행
    return () => {
      if (cleanupRef.current) {
        cleanupRef.current();
        cleanupRef.current = null;
      }
    };
  }, [handleSSEMessage, handleSSEError]);

  const handleSendMessage = useCallback(async () => {
    if (!inputText.trim() || isLoading || isStreaming) return;

    const userMessage: ChatMessage = {
      id: Date.now().toString(),
      role: "user",
      content: inputText,
      timestamp: new Date(),
    };

    setMessages((prev) => [...prev, userMessage]);
    setInputText("");
    setIsLoading(true);
    setIsStreaming(true);

    // 스트리밍 응답을 위한 임시 메시지 생성
    const tempBotMessageId = (Date.now() + 1).toString();
    tempBotMessageIdRef.current = tempBotMessageId;

    const tempBotMessage: ChatMessage = {
      id: tempBotMessageId,
      role: "assistant",
      content: "",
      timestamp: new Date(),
    };

    setMessages((prev) => [...prev, tempBotMessage]);

    try {
      const chatRequest: ChatRequest = {
        messages: [...messages, userMessage],
        sessionId: sessionId,
        modelConfig: {
          model: "gpt-3.5-turbo",
          temperature: 0.7,
          maxTokens: 1000,
          topP: 1,
          frequencyPenalty: 0,
          presencePenalty: 0,
        },
      };

      // POST 요청으로 메시지 전송 (SSE 연결은 이미 수립되어 있음)
      await sendStreamChatMessage(chatRequest);
    } catch (error) {
      console.error("메시지 전송 실패:", error);
      const errorMessage = error instanceof Error ? error.message : "메시지 전송에 실패했습니다. 다시 시도해주세요.";
      handleError(errorMessage);
    }
  }, [inputText, isLoading, isStreaming, messages, sessionId, handleError]);

  return {
    messages,
    inputText,
    isLoading,
    isStreaming,
    setInputText,
    handleSendMessage,
    handleKeyPress,
    formatTime,
  };
};

SSE 연결 API 함수

export const establishSSEConnection = (
  sessionId: string,
  onChunk: (chunk: StreamingChunk) => void,
  onError: (error: string) => void
): (() => void) => {
  try {
    const eventSource = new EventSource(
      `${axiosInstance.defaults.baseURL}/api/v1/openai/chat/stream?sessionId=${sessionId}`,
      { withCredentials: false }
    );

    // 메시지 수신 처리
    eventSource.onmessage = (event) => {
      try {
        if (event.data.trim()) {
          const chunk: StreamingChunk = JSON.parse(event.data);
          onChunk(chunk);

          if (chunk.type === "error") {
            onError(chunk.error || "스트리밍 오류가 발생했습니다.");
          }
        }
      } catch (parseError) {
        console.error("JSON 파싱 오류:", parseError);
        onError("JSON 파싱 오류가 발생했습니다.");
      }
    };

    // 에러 처리
    eventSource.onerror = (event) => {
      console.error("EventSource 에러:", event);
      onError("SSE 연결 중 오류가 발생했습니다.");
    };

    // 연결 열림 처리
    eventSource.onopen = () => {
      console.log("SSE 연결이 열렸습니다.");
    };

    // 연결 종료 시 정리 함수 반환
    return () => {
      eventSource.close();
    };
  } catch (error) {
    console.error("SSE 연결 오류:", error);
    onError(error instanceof Error ? error.message : "알 수 없는 오류가 발생했습니다.");
    return () => {}; // 빈 cleanup 함수 반환
  }
};

전체 데이터 흐름

1. 초기 연결 단계

클라이언트 → GET /chat/stream → 서버
                ↓
            SSE 연결 수립
                ↓
            연결을 세션별로 저장

2. 메시지 전송 및 응답 단계

클라이언트 → POST /chat/stream → 서버
                ↓
            OpenAI API 호출
                ↓
            스트리밍 응답 생성
                ↓
            기존 SSE 연결로 데이터 전송
                ↓
            클라이언트에서 EventSource로 수신

SSE 구현의 핵심 포인트

1. 연결 관리

  • 세션별 연결 저장: Map<string, Response>를 사용하여 세션별로 SSE 연결 관리
  • 자동 정리: 연결이 끊어지면 자동으로 정리하여 메모리 누수 방지
  • Heartbeat: 30초마다 heartbeat를 전송하여 연결 상태 유지

2. 에러 처리

  • 연결 오류: EventSource의 onerror 이벤트로 연결 오류 처리
  • 데이터 파싱 오류: JSON 파싱 실패 시 적절한 에러 메시지 전송
  • 서버 오류: OpenAI API 호출 실패 시 에러 타입으로 응답

주의사항 및 제한사항

1. 브라우저 제한

  • 동시 연결 수: 브라우저당 최대 6개의 동시 HTTP 연결 (HTTP/1.1)
  • 연결 시간: 일부 프록시나 로드 밸런서에서 연결 시간 제한 가능

2. 서버 리소스

  • 메모리 사용량: 각 SSE 연결마다 Response 객체를 메모리에 유지
  • 연결 수 제한: 동시 연결 수가 많아지면 서버 리소스 부족 가능

3. 네트워크 환경

  • 프록시 설정: 일부 프록시에서 SSE 연결 차단 가능
  • 방화벽: 기업 네트워크에서 SSE 연결 차단 가능

향후 공부하면 좋을 내용

1. HTTP/2 Server Push

HTTP/2의 Server Push 기능을 활용하여 더 효율적인 실시간 통신 구현

2. WebTransport

QUIC 프로토콜 기반의 새로운 실시간 통신 표준

3. Edge Computing

CDN 엣지에서 SSE 연결을 처리하여 지연 시간 최소화

결론

Server-Sent Events는 단방향 실시간 통신을 위한 가장 간단하고 효율적인 기술스택이다. HTTP 기반으로 구현되어 기존 인프라를 그대로 활용할 수 있고, 구현이 간단하면서도 다양한 기능을 제공한다.

이번 글에서 살펴본 것처럼, SSE를 사용하면 실시간 채팅, 알림, 대시보드 등 다양한 실시간 기능을 쉽게 구현할 수 있다. 특히 서버에서 클라이언트로의 데이터 스트리밍이 필요한 경우에는 WebSocket보다 더 적합한 선택일 수있다.


참고 자료:

3개의 댓글

comment-user-thumbnail
2025년 8월 24일

아뉘 이사람 뭐야... nestjs도 할 줄 알았어요? 무서워 ㄷㄷㄷ 약코하지마욧!

1개의 답글
comment-user-thumbnail
2025년 8월 24일

와.. 와중에 블로그 글도 써? 김한자씌...!!!!!

답글 달기