실시간 스트리밍 채팅 기능을 구현하게되면서 SSE에 대해 공부할 일이 있었다. 아래는 실제 코드를 작성해보며 SSE에 대해서 공부한 글이다.
현대 웹 애플리케이션에서 실시간 데이터 통신은 필수적인 요소이다. 사용자가 페이지를 새로고침하지 않고도 실시간으로 업데이트되는 정보를 받을 수 있어야하는데
이번 글에서는 Server-Sent Events (SSE) 를 사용하여 실시간 채팅 애플리케이션을 구현하는 방법을 자세히 알아보자. SSE는 단방향 실시간 통신에 특화된 기술로, 특히 서버에서 클라이언트로의 데이터 스트리밍에 최적화되어 있다.
기존의 HTTP 특징은 비연결성이라 한번 요청을 하면 그 연결이 끊긴다는 단점이 있다. 이를 해결하기 위해 폴링, 롱폴링, 웹소켓 같은 기술들이 있다. 하지만
SSE는 이러한 한계를 극복하고 단방향 실시간 통신을 제공한다.
클라이언트와 서버사이의 pub/sub 패턴이라고 이해하면 쉽다. 서버에서 새로운 이벤트가 생기면 구독자인 클라이언트에게 알림을 보내는 시스템이다.
특징 | Server-Sent Events (SSE) | WebSocket |
---|---|---|
통신 방향 | 단방향 (서버 → 클라이언트) | 양방향 |
프로토콜 | HTTP 기반 | 독립적인 프로토콜 |
연결 유지 | 자동 재연결 지원 | 수동 관리 필요 |
브라우저 지원 | 모든 모던 브라우저 | 모든 모던 브라우저 |
구현 복잡도 | 간단 | 복잡 |
사용 사례 | 실시간 알림, 채팅, 대시보드 | 게임, 협업 도구, 실시간 편집 |
SSE가 적합한 경우:
WebSocket이 적합한 경우:
Server-Sent Events (SSE) 는 HTML5 표준의 일부로, 서버에서 클라이언트로 실시간 데이터를 푸시할 수 있게 해주는 웹 표준 기술이다.
data: {"type": "text", "content": "안녕하세요"}
data: {"type": "text", "content": "SSE는"}
data: {"type": "text", "content": "정말"}
data: {"type": "text", "content": "유용합니다!"}
data: {"type": "done"}
이제 실제 코드를 통해 SSE를 구현해보자. NestJS 백엔드와 React 프론트엔드를 사용한 실시간 채팅 애플리케이션을 예시로 들어보겠다.
@Get("chat/stream")
@UseGuards(OpenAIAuthGuard)
async establishSSEConnection(@Query("sessionId") sessionId: string, @Res() res: Response) {
try {
// sessionId 유효성 검사
if (!sessionId || sessionId.trim() === "") {
res.status(HttpStatus.BAD_REQUEST).json({
success: false,
message: "sessionId is required",
});
return;
}
this.logger.log(`SSE connection established for session: ${sessionId}`);
// SSE 표준 헤더 설정
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("Access-Control-Allow-Origin", process.env.CLIENT_ORIGIN || "http://localhost:5173");
res.setHeader("Access-Control-Allow-Headers", "Content-Type");
res.setHeader("X-Accel-Buffering", "no"); // Nginx 버퍼링 비활성화
// 연결 유지를 위한 heartbeat 전송
const heartbeat = setInterval(() => {
res.write(": heartbeat\n\n");
}, 30000); // 30초마다 heartbeat
// SSE 연결을 Map에 저장
this.sseConnections.set(sessionId, res);
// 연결이 끊어질 때 정리
res.on("close", () => {
this.logger.log(`SSE connection closed for session: ${sessionId}`);
clearInterval(heartbeat);
this.sseConnections.delete(sessionId);
});
// 연결 유지 (클라이언트가 연결을 끊을 때까지)
res.on("error", (error) => {
this.logger.error(`SSE connection error for session ${sessionId}:`, error);
clearInterval(heartbeat);
this.sseConnections.delete(sessionId);
});
} catch (error) {
this.logger.error("SSE connection error:", error);
res.end();
}
}
@Post("chat/stream")
@UseGuards(OpenAIAuthGuard)
async streamChat(@Body() request: ChatRequest) {
try {
this.logger.log(`Stream chat request received for session: ${request.sessionId}`);
// 해당 세션의 SSE 연결 확인
const sseConnection = this.sseConnections.get(request.sessionId);
if (!sseConnection) {
throw new HttpException(`SSE connection not found for session: ${request.sessionId}`, HttpStatus.BAD_REQUEST);
}
// 스트리밍 응답 생성 및 SSE 연결로 푸시
const stream = this.openaiService.generateStreamingResponse(request);
for await (const chunk of stream) {
const data = `data: ${JSON.stringify(chunk)}\n\n`;
sseConnection.write(data);
// 에러가 발생하면 스트림 종료
if (chunk.type === "error") {
break;
}
// 완료되면 스트림 종료
if (chunk.type === "done") {
break;
}
}
return { success: true, message: "스트리밍 응답이 전송되었습니다." };
} catch (error) {
this.logger.error("Stream chat error:", error);
if (error instanceof HttpException) {
throw error;
}
throw new HttpException(
`스트리밍 처리 중 오류가 발생했습니다: ${error.message}`,
HttpStatus.INTERNAL_SERVER_ERROR
);
}
}
async *generateStreamingResponse(request: ChatRequest): AsyncGenerator<StreamingChunk> {
try {
this.logger.log(`Generating streaming response for ${request.messages.length} messages`);
// OpenAI 스트리밍 API 호출
const stream = await this.openai.chat.completions.create({
model: request.modelConfig?.model || "gpt-3.5-turbo",
messages: request.messages.map((msg) => ({
role: msg.role,
content: msg.content,
})),
temperature: request.modelConfig?.temperature || 0.7,
max_tokens: request.modelConfig?.maxTokens || 1000,
top_p: request.modelConfig?.topP || 1,
frequency_penalty: request.modelConfig?.frequencyPenalty || 0,
presence_penalty: request.modelConfig?.presencePenalty || 0,
stream: true, // 스트리밍 모드 활성화
});
// 스트리밍 응답 처리
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
yield {
type: "text",
content,
metadata: {
model: chunk.model,
finishReason: chunk.choices[0]?.finish_reason,
},
};
}
}
// 스트리밍 완료 신호
yield { type: "done" };
} catch (error) {
this.logger.error("OpenAI streaming error:", error);
yield {
type: "error",
error: `OpenAI 스트리밍 API 호출 실패: ${error.message}`,
};
}
}
export const useChat = () => {
const [messages, setMessages] = useState<ChatMessage[]>([
{
id: "1",
role: "assistant",
content: "안녕하세요! OpenAI 챗봇입니다. 무엇을 도와드릴까요?",
timestamp: new Date(),
},
]);
const [inputText, setInputText] = useState("");
const [isLoading, setIsLoading] = useState(false);
const [isStreaming, setIsStreaming] = useState(false);
const [sessionId, setSessionId] = useState<string>("");
const cleanupRef = useRef<(() => void) | null>(null);
const tempBotMessageIdRef = useRef<string>("");
// SSE 메시지 수신 처리를 위한 콜백 함수 (메모이제이션)
const handleSSEMessage = useCallback(
(chunk: StreamingChunk) => {
console.log("SSE 메시지 수신:", chunk);
if (chunk.type === "text") {
setMessages((prev) =>
prev.map((message) =>
message.id === tempBotMessageIdRef.current
? { ...message, content: message.content + chunk.content }
: message
)
);
} else if (chunk.type === "done") {
// 스트리밍 완료
setIsStreaming(false);
setIsLoading(false);
} else if (chunk.type === "error") {
handleError(chunk.error || "스트리밍 오류가 발생했습니다.");
}
},
[handleError]
);
// SSE 에러 처리를 위한 콜백 함수 (메모이제이션)
const handleSSEError = useCallback(
(error: string) => {
console.error("SSE 연결 오류:", error);
handleError(error);
},
[handleError]
);
// 컴포넌트 마운트 시 SSE 연결 수립 (한 번만)
useEffect(() => {
const newSessionId = `session_${Date.now()}`;
setSessionId(newSessionId);
// SSE 연결 수립
const cleanup = establishSSEConnection(newSessionId, handleSSEMessage, handleSSEError);
cleanupRef.current = cleanup;
// 컴포넌트 언마운트 시 cleanup 실행
return () => {
if (cleanupRef.current) {
cleanupRef.current();
cleanupRef.current = null;
}
};
}, [handleSSEMessage, handleSSEError]);
const handleSendMessage = useCallback(async () => {
if (!inputText.trim() || isLoading || isStreaming) return;
const userMessage: ChatMessage = {
id: Date.now().toString(),
role: "user",
content: inputText,
timestamp: new Date(),
};
setMessages((prev) => [...prev, userMessage]);
setInputText("");
setIsLoading(true);
setIsStreaming(true);
// 스트리밍 응답을 위한 임시 메시지 생성
const tempBotMessageId = (Date.now() + 1).toString();
tempBotMessageIdRef.current = tempBotMessageId;
const tempBotMessage: ChatMessage = {
id: tempBotMessageId,
role: "assistant",
content: "",
timestamp: new Date(),
};
setMessages((prev) => [...prev, tempBotMessage]);
try {
const chatRequest: ChatRequest = {
messages: [...messages, userMessage],
sessionId: sessionId,
modelConfig: {
model: "gpt-3.5-turbo",
temperature: 0.7,
maxTokens: 1000,
topP: 1,
frequencyPenalty: 0,
presencePenalty: 0,
},
};
// POST 요청으로 메시지 전송 (SSE 연결은 이미 수립되어 있음)
await sendStreamChatMessage(chatRequest);
} catch (error) {
console.error("메시지 전송 실패:", error);
const errorMessage = error instanceof Error ? error.message : "메시지 전송에 실패했습니다. 다시 시도해주세요.";
handleError(errorMessage);
}
}, [inputText, isLoading, isStreaming, messages, sessionId, handleError]);
return {
messages,
inputText,
isLoading,
isStreaming,
setInputText,
handleSendMessage,
handleKeyPress,
formatTime,
};
};
export const establishSSEConnection = (
sessionId: string,
onChunk: (chunk: StreamingChunk) => void,
onError: (error: string) => void
): (() => void) => {
try {
const eventSource = new EventSource(
`${axiosInstance.defaults.baseURL}/api/v1/openai/chat/stream?sessionId=${sessionId}`,
{ withCredentials: false }
);
// 메시지 수신 처리
eventSource.onmessage = (event) => {
try {
if (event.data.trim()) {
const chunk: StreamingChunk = JSON.parse(event.data);
onChunk(chunk);
if (chunk.type === "error") {
onError(chunk.error || "스트리밍 오류가 발생했습니다.");
}
}
} catch (parseError) {
console.error("JSON 파싱 오류:", parseError);
onError("JSON 파싱 오류가 발생했습니다.");
}
};
// 에러 처리
eventSource.onerror = (event) => {
console.error("EventSource 에러:", event);
onError("SSE 연결 중 오류가 발생했습니다.");
};
// 연결 열림 처리
eventSource.onopen = () => {
console.log("SSE 연결이 열렸습니다.");
};
// 연결 종료 시 정리 함수 반환
return () => {
eventSource.close();
};
} catch (error) {
console.error("SSE 연결 오류:", error);
onError(error instanceof Error ? error.message : "알 수 없는 오류가 발생했습니다.");
return () => {}; // 빈 cleanup 함수 반환
}
};
클라이언트 → GET /chat/stream → 서버
↓
SSE 연결 수립
↓
연결을 세션별로 저장
클라이언트 → POST /chat/stream → 서버
↓
OpenAI API 호출
↓
스트리밍 응답 생성
↓
기존 SSE 연결로 데이터 전송
↓
클라이언트에서 EventSource로 수신
Map<string, Response>
를 사용하여 세션별로 SSE 연결 관리onerror
이벤트로 연결 오류 처리HTTP/2의 Server Push 기능을 활용하여 더 효율적인 실시간 통신 구현
QUIC 프로토콜 기반의 새로운 실시간 통신 표준
CDN 엣지에서 SSE 연결을 처리하여 지연 시간 최소화
Server-Sent Events는 단방향 실시간 통신을 위한 가장 간단하고 효율적인 기술스택이다. HTTP 기반으로 구현되어 기존 인프라를 그대로 활용할 수 있고, 구현이 간단하면서도 다양한 기능을 제공한다.
이번 글에서 살펴본 것처럼, SSE를 사용하면 실시간 채팅, 알림, 대시보드 등 다양한 실시간 기능을 쉽게 구현할 수 있다. 특히 서버에서 클라이언트로의 데이터 스트리밍이 필요한 경우에는 WebSocket보다 더 적합한 선택일 수있다.
참고 자료:
아뉘 이사람 뭐야... nestjs도 할 줄 알았어요? 무서워 ㄷㄷㄷ 약코하지마욧!