회사 핵심 솔루션의 추가 기능을 구현하던 중 대용량 파일을 업로드 하는 기능을 추가하게 되었다.
해당 기능의 프로세스는 크게
1.대용량 zip파일 업로드 ( 최대 300개의 PDF파일로 이루어진 ZIP파일 ,최대 3개)
2.Unzip & PDF Split ( java.util.zip 사용하여 Unzip후 멀티 스레드를 활용하여 2개의 PDF 파일로 스플릿)
3.Split한 파일 저장
따라서 Unzip & split 로직과 최대 1800개의 PDF 파일을 저장하는 로직이 한번에 이루어진다.
그러기에 파일업로드시 몆분 가량 로딩바를 바라만 보고있고 정상적으로 진행이 되는지 사용자 입장에서 알수 없을 것이다.
이에 대하여 업로드 로딩바 구현을 하기로 하였다.
Server-Sent-Event (SSE) 전체 흐름은
1. SSE Connect API 호출
2. Connect Complete 응답
3. 이벤트 API 호출
4. 이벤트 data 응답
심플하다. Connect API와 FileUpload API를 분리하여 아래 예제를 보자면
가장먼저 SSE 연결, 해제 , 에러처리 관련하여 Service를 구현하고
SseServiceImpl
@Service
@RequiredArgsConstructor
public class SseServiceImpl implements SseService {
private final EmitterRepository emitterRepository;
private final CodeDetailRepository codeDetailRepository;
private final CodeDetailLanRepository codeDetailLanRepository;
private static final Long DEFAULT_TIMEOUT = 5L * 1000 * 60;
private static final Logger LOGGER = LoggerFactory.getLogger(SseServiceImpl.class);
@Override
public SseEmitter sseConnection(String eventId, String lastEventId) {
SseEmitter sseEmitter = emitterRepository.save(eventId, new SseEmitter(DEFAULT_TIMEOUT));
// SseEmitter의 완료, 시간 초과, 에러로 인한 전송 불가 시 sseEmitter 삭제
handleException(sseEmitter, eventId);
// 연결 직후, 데이터 전송이 없을 시 503 에러 발생. 에러 방지 위한 더미데이터 전송
sendToClient(eventId, "Successfully Connected ::: " + eventId);
// 클라이언트가 미수신한 Event 유실 예방, 연결이 끊켰거나 미수신된 데이터를 다 찾아서 보내준다.
if (StringUtil.isNotEmpty(lastEventId)) {
Map<String, SseEmitter> events = emitterRepository.findAllStartById(eventId);
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(entry.getKey(), entry.getValue()));
}
return sseEmitter;
}
@Override
public SseEmitter sseConnection(String eventId,
String lastEventId, long timeOutInMilliSeconds) {
SseEmitter sseEmitter = emitterRepository.save(eventId, new SseEmitter(timeOutInMilliSeconds));
// SseEmitter의 완료, 시간 초과, 에러로 인한 전송 불가 시 sseEmitter 삭제
handleException(sseEmitter, eventId);
// 연결 직후, 데이터 전송이 없을 시 503 에러 발생. 에러 방지 위한 더미데이터 전송
sendToClient(eventId, "Successfully Connected ::: " + eventId);
return sseEmitter;
}
...//이밖에 에러처리, failResult 등 로직을 구현
본격적으로 SSE Connect MVC패턴을 구축한다.
Connect controller
@CrossOrigin(origins = "*", allowedHeaders = "*")
@RestController
@RequestMapping("/sse")
@RequiredArgsConstructor
public class SseController {
private final SseService sseService;
private final CurrentUserData currentUserData;
@GetMapping(value = "/connect", produces = "text/event-stream")
@Operation(summary = "SSE Emitter를 발급받기위한 API")
public SseEmitter sseConnection(@RequestHeader(value = "Last-Event-Id", required = false, defaultValue = "") String lastEventId, SseConnectionRequestParam requestParam, HttpServletResponse response){
return sseService.sseConnection(currentUserData.getUnqUserId(), lastEventId);
}
}
Connect Service
@Override
public SseEmitter sseConnection(String eventId, String lastEventId) {
SseEmitter sseEmitter = emitterRepository.save(eventId, new SseEmitter(DEFAULT_TIMEOUT));
// SseEmitter의 완료, 시간 초과, 에러로 인한 전송 불가 시 sseEmitter 삭제
handleException(sseEmitter, eventId);
// 연결 직후, 데이터 전송이 없을 시 503 에러 발생. 에러 방지 위한 더미데이터 전송
sendToClient(eventId, "Successfully Connected ::: " + eventId);
// 클라이언트가 미수신한 Event 유실 예방, 연결이 끊켰거나 미수신된 데이터를 다 찾아서 보내준다.
if (StringUtil.isNotEmpty(lastEventId)) {
Map<String, SseEmitter> events = emitterRepository.findAllStartById(eventId);
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(entry.getKey(), entry.getValue()));
}
return sseEmitter;
}
와 같이 구현하여 Connect api호출시 Connect Complete 메세지와 Emitter를 응답받는다.
위에서 사용된 sendToClient는 기본 sseEmitter.send를 커스텀한 메소드이며
sendToClient
@Override
public void sendToClient(String eventId, Object data) {
SseEmitter sseEmitter = emitterRepository.findByEventId(eventId);
try {
sseEmitter.send(SseEmitter.event()
.id(eventId)
.name("Connection")
.data(data, MediaType.APPLICATION_JSON));
} catch (IOException e) {
sseEmitter.completeWithError(e);
emitterRepository.deleteAllByKey(eventId);
LOGGER.error(StringUtil.extractStackTrace(e));
}
}
와 같이 메소드를 구성한다.
이후 FileUpload API 호출 후 service에서 사용하는 로직에서 응답이 필요한 부분에
이 sendToClient를 호출하면 된다 .
Uploadfile Service
public List<FileMst> uploadFilesAndSplit(UploadFileInfo uploadFileInfo,
String lastEventId, String eventId)
throws ExecutionException, InterruptedException {
```//파일 unzip 로직
sseService.sendToClient(eventId, fileName + " 파일 Unzip 완료!");
``` //파일 split 및 저장 로직
sseService.sendToClient(eventId, fileName + "파일 저장 및 Split 완료!");
}
간단한게 위와 같이 connect 는 web 1에 되었지만 FileUpload API는 web 2에 도달 할 수 있다는 것이다. 그 결과 web 1은 아무 응답도 주지않고 time-out이 되버린다.
이에 대하여 /conect와 /file-write 를 동시에 즉, Connect API와 FileUpload API를 하나로 합치는 방법을 선택하였다.
ThreadPool을 사용는 java라이브러리인 ExecutorService 을 사용하여
Connect 먼저 응답 후 FileUpload 로직을 수행하도록 적용하였다.
기존 로직
1.
[Conect API 호출]->[Connect controller]->[Connect Service]->[SendToClient(연결 완료)]
2.
[FileUpload API 호출]->[FileUpload controller]->[FileUpload Service]->
[uploadFilesAndSplit]->[SendToClient(업로드 진행현황)]
변경 로직
[FileUpload API 호출]->[FileUpload controller]->[uploadFiles]->
[SendToClient(연결 완료)]->[FileUpload Service]->
[uploadFilesAndSplit]->[SendToClient(업로드 진행현황)]
uploadFiles 메소드
@Override
public SseEmitter uploadFiles(FileUploadRequestParam requestParam) {
SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
uploadFilesAndSplit(requestParam, sseEmitter);
});
return sseEmitter;
}
1개 사이즈의 ThreadPool 생성후 task를 할당하면 mainThread와 별개로 병렬처리 하게된다.
따라서 한번의 API호출로 두 로직이 수행되도록 하였다.
혹시 SSE 연결할때, 연결이 수립된채로, 서버에서 응답이 없어 로드밸런싱에서 타임아웃나는 경우 어떻게 해결하셨나요?
예를들면
1) SSE 연결을 수립 ( 백엔드에서 SSE 타임아웃은 10분으로 설정)
2) 딱히 이벤트가 발생하지 않아 1분동안 연결만 유지된상태고, 백엔드에서 아무런 데이터를 Response하지 않음
3) 로드밸런서에서 유휴 제한시간이 1분으로 설정되어있어 타임아웃
4) Front에서 SSE연결이 끊겼음을 감지하고 재연결시도
5) 4번에서 끊김 --- 재연결 시도 사이에 발생한 Event를 백엔드에서는 보냈지만 프론트에서 수신하지 못해서 데이터 유실
일단 5번의 해결방법은 last-event-id로 해결하려고하는데
근본적으로 1분마다 재연결요청하는게 맞는지?
이걸 늘리는 방법은 로드밸런서에서 유휴제한시간을 늘리는방법밖에 없는지 궁금하네요
현재 로드밸런서는 ALB를 사용하고있는데
HTTP 요청자체의 유휴제한시간 설정만 있고
HTTP Content-Type에 따른 유휴시간설정을 제공하지 않더라고요
안녕하세요. 저 궁금한게 있어서 댓글 남깁니다!
혹시 sse 이벤트를 읽을때는 어떤 환경에서 작업하셨나요?