[BE] SSE 적용과 난관 (feat.로드밸런싱)

방패맨·2024년 1월 22일
post-thumbnail

회사 핵심 솔루션의 추가 기능을 구현하던 중 대용량 파일을 업로드 하는 기능을 추가하게 되었다.
해당 기능의 프로세스는 크게

1.대용량 zip파일 업로드 ( 최대 300개의 PDF파일로 이루어진 ZIP파일 ,최대 3개)
2.Unzip & PDF Split ( java.util.zip 사용하여 Unzip후 멀티 스레드를 활용하여 2개의 PDF 파일로 스플릿)
3.Split한 파일 저장 

따라서 Unzip & split 로직과 최대 1800개의 PDF 파일을 저장하는 로직이 한번에 이루어진다.

그러기에 파일업로드시 몆분 가량 로딩바를 바라만 보고있고 정상적으로 진행이 되는지 사용자 입장에서 알수 없을 것이다.

이에 대하여 업로드 로딩바 구현을 하기로 하였다.

SSE 구현

실시간 업로드 현황을 보여주기 위해 당연하게 웹소켓 기술을 가장먼저 떠올렸다. 하지만 client는 일방적으로 정보를 받기만 하면 되기에 양방향 통신은 비효율적이라 생각되었다. 그러던 와중 SSE기술을 알게되었는데

Server-Sent-Event (SSE) 전체 흐름은

1. SSE Connect API 호출
2. Connect Complete 응답
3. 이벤트 API 호출
4. 이벤트 data 응답

심플하다. Connect API와 FileUpload API를 분리하여 아래 예제를 보자면

가장먼저 SSE 연결, 해제 , 에러처리 관련하여 Service를 구현하고

SseServiceImpl

@Service
@RequiredArgsConstructor
public class SseServiceImpl implements SseService {

  private final EmitterRepository emitterRepository;
  private final CodeDetailRepository codeDetailRepository;
  private final CodeDetailLanRepository codeDetailLanRepository;

  private static final Long DEFAULT_TIMEOUT = 5L * 1000 * 60;
  private static final Logger LOGGER = LoggerFactory.getLogger(SseServiceImpl.class);

  @Override
  public SseEmitter sseConnection(String eventId, String lastEventId) {
    SseEmitter sseEmitter = emitterRepository.save(eventId, new SseEmitter(DEFAULT_TIMEOUT));

    // SseEmitter의 완료, 시간 초과, 에러로 인한 전송 불가 시 sseEmitter 삭제
    handleException(sseEmitter, eventId);

    // 연결 직후, 데이터 전송이 없을 시 503 에러 발생. 에러 방지 위한 더미데이터 전송
    sendToClient(eventId, "Successfully Connected ::: " + eventId);

    // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊켰거나 미수신된 데이터를 다 찾아서 보내준다.
    if (StringUtil.isNotEmpty(lastEventId)) {
      Map<String, SseEmitter> events = emitterRepository.findAllStartById(eventId);
      events.entrySet().stream()
          .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
          .forEach(entry -> sendToClient(entry.getKey(), entry.getValue()));
    }
    return sseEmitter;
  }

  @Override
  public SseEmitter sseConnection(String eventId,
      String lastEventId, long timeOutInMilliSeconds) {
    SseEmitter sseEmitter = emitterRepository.save(eventId, new SseEmitter(timeOutInMilliSeconds));

    // SseEmitter의 완료, 시간 초과, 에러로 인한 전송 불가 시 sseEmitter 삭제
    handleException(sseEmitter, eventId);

    // 연결 직후, 데이터 전송이 없을 시 503 에러 발생. 에러 방지 위한 더미데이터 전송
    sendToClient(eventId, "Successfully Connected ::: " + eventId);

    return sseEmitter;
  }
  
  ...//이밖에 에러처리, failResult 등 로직을 구현 

본격적으로 SSE Connect MVC패턴을 구축한다.

Connect controller

@CrossOrigin(origins = "*", allowedHeaders = "*")
@RestController
@RequestMapping("/sse")
@RequiredArgsConstructor
public class SseController {

  private final SseService sseService;
  private final CurrentUserData currentUserData;

  @GetMapping(value = "/connect", produces = "text/event-stream")
  @Operation(summary = "SSE Emitter를 발급받기위한 API")
  public SseEmitter sseConnection(@RequestHeader(value = "Last-Event-Id", required = false, defaultValue = "") String lastEventId, SseConnectionRequestParam requestParam, HttpServletResponse response){
    return sseService.sseConnection(currentUserData.getUnqUserId(), lastEventId);
  }
}

Connect Service

  @Override
  public SseEmitter sseConnection(String eventId, String lastEventId) {
    SseEmitter sseEmitter = emitterRepository.save(eventId, new SseEmitter(DEFAULT_TIMEOUT));

    // SseEmitter의 완료, 시간 초과, 에러로 인한 전송 불가 시 sseEmitter 삭제
    handleException(sseEmitter, eventId);

    // 연결 직후, 데이터 전송이 없을 시 503 에러 발생. 에러 방지 위한 더미데이터 전송
    sendToClient(eventId, "Successfully Connected ::: " + eventId);

    // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊켰거나 미수신된 데이터를 다 찾아서 보내준다.
    if (StringUtil.isNotEmpty(lastEventId)) {
      Map<String, SseEmitter> events = emitterRepository.findAllStartById(eventId);
      events.entrySet().stream()
          .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
          .forEach(entry -> sendToClient(entry.getKey(), entry.getValue()));
    }
    return sseEmitter;
  }

와 같이 구현하여 Connect api호출시 Connect Complete 메세지와 Emitter를 응답받는다.
위에서 사용된 sendToClient는 기본 sseEmitter.send를 커스텀한 메소드이며

sendToClient

  @Override
 public void sendToClient(String eventId, Object data) {
   SseEmitter sseEmitter = emitterRepository.findByEventId(eventId);
   try {
     sseEmitter.send(SseEmitter.event()
         .id(eventId)
         .name("Connection")
         .data(data, MediaType.APPLICATION_JSON));
   } catch (IOException e) {
     sseEmitter.completeWithError(e);
     emitterRepository.deleteAllByKey(eventId);
     LOGGER.error(StringUtil.extractStackTrace(e));
   }
 }

와 같이 메소드를 구성한다.

이후 FileUpload API 호출 후 service에서 사용하는 로직에서 응답이 필요한 부분에
sendToClient를 호출하면 된다 .

Uploadfile Service

 public List<FileMst> uploadFilesAndSplit(UploadFileInfo uploadFileInfo, 
 String lastEventId, String eventId)
    throws ExecutionException, InterruptedException {
    ```//파일 unzip 로직
    sseService.sendToClient(eventId, fileName + " 파일 Unzip 완료!");
    ``` //파일 split 및 저장 로직 
    sseService.sendToClient(eventId, fileName + "파일 저장 및 Split 완료!");
    }

로드밸런싱 환경에서의 난관

하지만 로컬, 개발 환경에서는 하나의 서버만 존재하기에 문제가 되지않지만, 현재 운영서버에 올라간 솔루션은 AWS의 ALB로 2개의 web server로 로드밸런싱된 환경이며 2개의 API (Connect API, FileUpload API) 가 같은 web server로 갈 것이라는 보장은 없다. 즉,

간단한게 위와 같이 connect 는 web 1에 되었지만 FileUpload API는 web 2에 도달 할 수 있다는 것이다. 그 결과 web 1은 아무 응답도 주지않고 time-out이 되버린다.

해결

이에 대하여 /conect와 /file-write 를 동시에 즉, Connect API와 FileUpload API를 하나로 합치는 방법을 선택하였다.

ThreadPool을 사용는 java라이브러리인 ExecutorService 을 사용하여
Connect 먼저 응답 후 FileUpload 로직을 수행하도록 적용하였다.

기존 로직

1. 
[Conect API 호출]->[Connect controller]->[Connect Service]->[SendToClient(연결 완료)]

2. 
[FileUpload API 호출]->[FileUpload controller]->[FileUpload Service]->
[uploadFilesAndSplit]->[SendToClient(업로드 진행현황)]

변경 로직

[FileUpload API 호출]->[FileUpload controller]->[uploadFiles]->
[SendToClient(연결 완료)]->[FileUpload Service]->
[uploadFilesAndSplit]->[SendToClient(업로드 진행현황)]

uploadFiles 메소드

  @Override
  public SseEmitter uploadFiles(FileUploadRequestParam requestParam) {
    SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(() -> {
      uploadFilesAndSplit(requestParam, sseEmitter);
    });
    return sseEmitter;
  }

1개 사이즈의 ThreadPool 생성후 task를 할당하면 mainThread와 별개로 병렬처리 하게된다.
따라서 한번의 API호출로 두 로직이 수행되도록 하였다.

profile
개발자 방패맨의 기술블로그

5개의 댓글

comment-user-thumbnail
2024년 5월 12일

안녕하세요. 저 궁금한게 있어서 댓글 남깁니다!
혹시 sse 이벤트를 읽을때는 어떤 환경에서 작업하셨나요?

1개의 답글
comment-user-thumbnail
2024년 8월 21일

혹시 SSE 연결할때, 연결이 수립된채로, 서버에서 응답이 없어 로드밸런싱에서 타임아웃나는 경우 어떻게 해결하셨나요?

예를들면

1) SSE 연결을 수립 ( 백엔드에서 SSE 타임아웃은 10분으로 설정)
2) 딱히 이벤트가 발생하지 않아 1분동안 연결만 유지된상태고, 백엔드에서 아무런 데이터를 Response하지 않음
3) 로드밸런서에서 유휴 제한시간이 1분으로 설정되어있어 타임아웃
4) Front에서 SSE연결이 끊겼음을 감지하고 재연결시도
5) 4번에서 끊김 --- 재연결 시도 사이에 발생한 Event를 백엔드에서는 보냈지만 프론트에서 수신하지 못해서 데이터 유실

일단 5번의 해결방법은 last-event-id로 해결하려고하는데
근본적으로 1분마다 재연결요청하는게 맞는지?
이걸 늘리는 방법은 로드밸런서에서 유휴제한시간을 늘리는방법밖에 없는지 궁금하네요

현재 로드밸런서는 ALB를 사용하고있는데
HTTP 요청자체의 유휴제한시간 설정만 있고
HTTP Content-Type에 따른 유휴시간설정을 제공하지 않더라고요

1개의 답글