데이터 스트리밍을 통해 chatGpt api를 이용하는 모습이다.

저번 글에서 connection 종료와 관련된 문제가 있었다.
도착하는 데이터를 확인해보면 {id:~~}로 시작하는 json형식으로 응답이 오다가 마지막 메세지가 도착한 후에 String으로 "[DONE]"을 반환하는 것을 확인할 수 있었다. 그래서 그걸 확인하기 전에 바로 Dto에 맵핑을 해서 오류가나고 전송이 계속 오고 있음에도 complete()를 호출하게 되어 생기는 문제였다.
응답 데이터를 바로 Dto로 받는게 아니라 String으로 받도록 고쳤다.
그다음 필요한 곳에서 Dto로 바꿔주었다.
exchangeToFlux(response -> response.bodyToFlux(String.class)) //이렇게 String으로 받고
jackson의 ObjectMapper를 이용해 ChatStreamResponseDto로 타입을 바꿨다.
이때 ChatStreamResponseDto의 모든 필드가 채워져서 오는게 아니다. 그래서 UNKNOWN 에러가 발생하기 때문에 FAIL_ON_UNKNOWN_PROPERTIES 옵션을 false로 둬야했다. (이거 때문에 시간쫌 날렸다..ㅠ)
ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
ChatStreamResponseDto streamDto = mapper.readValue(data,ChatStreamResponseDto.class);
ChatStreamResponseDto.Choice.Delta delta = streamDto.getChoices().get(0).getDelta();
또, 추가된 코드는 chatGpt의 응답을 데이터베이스에 저장할 필요가 있었기 때문에 한두글자씩 오는 요청을 StringBuffer에 모아서 전체 문자열을 저장하는 코드를 추가했다.
원래 pooling 방식처럼 응답을 받으면 connection을 끊는 것이 아니다. (Connection 헤더를 Keep-Alive로 한다면 가능하긴 하다.) RestTemplete을 사용한다면 connection pool을 사용하지 않고 매 요청마다 새로운 커넥션을 만들어 통신한다.
이와 달리 sse의 경우 (text/event-stream을 통애 알수 있다) 응답을 보내고 연결을 바로 끊는게 아니라, 타임아웃이라던가 개발자가 정한 조건에 따라 요청을 끊게 된다. 이 사이 event가 발생한다면 커넥션이 유지되고 있기 때문에 응답을 보낼 수 있다.
리액티브 애플리케이션을 위한 것으로 non-blocking 방식이다.
webflux는 Server-sent events를 이용해 데이터 스트리밍을 할 수 있다.
Flux 객체를 통해 스트리밍 데이터를 다룰수 있다.
그리고 이러한 기능들이 restTemplete으로 지원되지 않기때문에 webClient를 사용한 것이다.
중요한 점은 text/event-stream의 MIME type으로 보낸다는 것이다.
@RestController
@RequiredArgsConstructor
public class ChatgptController {
private final ChatgptService chatgptService;
@PostMapping(value = "/gpt/{chat_room_id}" ,produces = "text/event-stream")
public ResponseEntity<SseEmitter> getGpt(Authentication authentication,
@PathVariable("chat_room_id") Long chat_room_id,
@RequestBody ChatgptRequest request){
String memberId = authentication.getName();
SseEmitter response = chatgptService.getGpt(
memberId,
chat_room_id,
request.getPrompt(),
request.getGptConfigInfo());
return ResponseEntity.ok().body(response);
}
}
지피티로부터의 응답을 받기 위한 것으로 openai의 명세에 따라 작성했다.
@Data
public class ChatStreamResponseDto implements Serializable {
private String id;
private String object;
private Long created;
private String model;
@JsonProperty("system_fingerprint")
private String system_fingerprint;
private List<Choice> choices;
@Data
public static class Choice{
private Delta delta;
private Integer index;
@JsonProperty("finish_reason")
private String finishReason;
private String logprobs;
@Data
public static class Delta{
private String content;
}
}
}
webClient쪽 코드를 설명하면,
uri와 header, body로 요청메세지를 구성해 요청을 날린다. 응답은 response에 담겨서 오며 bodyToFlux(String.class) 응답데이터의 형식을 String 으로 지정해준 것이다.
이 응답을 subscribe하고있다가 데이터가 emit(방출)될 때 마다 doOnNext()를 실행하게 된다.
sseEmitter.send(delta.getContent()); 이 부분이 실제로 클라이언트에 데이터를 보내는 코드가 되는데 SseEmitter 객체를 이용해 데이터를 보내면 event가 발생한 것으로 클라이언트가 응답을 받을 수 있다.
.doOnComplete(sseEmitter::complete)
.doOnError(sseEmitter::completeWithError)
이 부분은 각각 완료되었을때, 오류가 발생했을때 콜백함수를 지정해주는 부분이다. doOnNext()에서 처럼 따로 함수를 구현할 수도 있지만 그냥 둘다 종료되도록 뒀다.
public SseEmitter getGpt(String memberId,Long chatroomId, String request, GptConfigInfo gptConfigInfo){
String modelType = gptConfigInfo.getModel_name();
StringBuffer sb = new StringBuffer();
SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
// 채팅방 찾기(없으면 생성)
Long chatRoomId = chatRoomService.find_chatroom(memberId,modelType,chatroomId);
//요청 메세지
ChatgptRequestDto requestDto = ChatgptRequestDto.builder()
.messages(compositeMessage(request,chatRoomId))
.model(gptConfigInfo.getModel_name())
.temperature(gptConfigInfo.getTemperature())
.maxTokens(gptConfigInfo.getMaximum_length())
.stop_sequences(gptConfigInfo.getStop_sequence())
.topP(gptConfigInfo.getTop_p())
.frequency_penalty(gptConfigInfo.getFrequency_penalty())
.presence_penalty(gptConfigInfo.getPresence_penalty())
.stream(true)
.build();
WebClient.create()
.post().uri(url)
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + key)
.body(BodyInserters.fromValue(requestDto))
.exchangeToFlux(response -> response.bodyToFlux(String.class))
.doOnNext(data -> {
try {
if (data.equals("[DONE]")) {
sseEmitter.send("chat_room_id: "+chatRoomId );
// 메세지 저장과 전송 종료
chatService.save_chat(chatRoomId,false,sb.toString());
chatService.save_chat(chatRoomId,true,request);
sseEmitter.complete();
}
else{
ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
ChatStreamResponseDto streamDto = mapper.readValue(data,ChatStreamResponseDto.class);
ChatStreamResponseDto.Choice.Delta delta = streamDto.getChoices().get(0).getDelta();
if (delta!=null && delta.getContent()!=null){
sb.append(delta.getContent());
sseEmitter.send(delta.getContent());
}
}
} catch (IOException e) {
throw new AhiException(ErrorCode.FAIL_TO_SEND);
}
})
.doOnComplete(sseEmitter::complete)
.doOnError(sseEmitter::completeWithError)
.subscribe();
return sseEmitter;
}
예외처리하는 부분이 여전히 맘에 안들긴 하지만... 이 기능은 여기까지 하는걸로 해야겠다...
SseEmitter는 Thread-safe한 환경을 지원하지 않는다고 한다. 다행히도 이번 프로젝트는 로드밸런싱을 하지 않고 서버를 한개만 올릴예정이라 상관없겠지만, 여기에 대해서도 추가로 공부해보고 싶다 !