[ WebFlux / AWS SDK for Java ] AWS File 비동기 Upload

jordy·2023년 3월 25일
0

numble-mybox

목록 보기
2/2

사용하는 Storage Object 는 Naver Cloud Platform 서비스 중 하나인 Object Storage

AWS SDK 를 지원한다.

Naver Cloud Platform 가이드

텍스트그러나 내가 구현하고 싶은 서비스는 비동기 서비스이기 때문에
AWS SDK for Java V2 가이드 문서를 참고 하였다

근데 한글페이지가 번역이 이상한 것 같아 가능하면 영어로 된 가이드 문서를 추천한다


MultiPart Upload

이점

구현하려는 서비스는 네이버 MyBox ( 구 nCloud ) 이기 때문에 비교적 큰 파일들을 업로드/다운르도 하고 싶기 때문에 단일 객체 업로드 모델보단 멀티 파트 업로드를 사용하려 한다

AWS MultiPart Docs 에서는 100MB 이상의 파일 업로드일 경우 MultiPart 업로드 사용을 권장하고 있다.

MultiPart Upload 방식이 주는 이점

1. 개선된 처리량 개선 - 부분을 병렬적으로 업로드하여 처리량을 개선할 수 있습니다.

2. 네트워크 문제로부터 빠른 복구 - 더 작아진 부분 크기는 네트워크 오류로 인해 실패한 업로드 재시작의 영향을 최소화합니다.

3. 객체 업로드 일시 중지 및 재개 – 객체 부분을 장시간에 걸쳐 업로드할 수 있습니다. 일단 멀티파트 업로드가 시작되면 제한 시간이 없습니다. 멀티파트 업로드를 명시적으로 완료하거나 중단해야 합니다.

4. 최종 객체 크기를 알기 전에 업로드를 시작 – 객체를 생성하는 동안 업로드할 수 있습니다.

구현

문서에 나와있는대로 MultiPart 를 활용한 Upload 방식의 프로세스는 크게 3가지로 진행된다.

1. MultiPart Upload 시작

    public Mono<FileResponse> uploadObject(FilePart filePart) {

        Map<String, String> metadata = new java.util.HashMap<>(Map.of("filename", filename));
        metadata.put("path", path);
        MediaType mediaType = ObjectUtils.defaultIfNull(filePart.headers().getContentType(), MediaType.APPLICATION_OCTET_STREAM);

        // MultiPart Upload 시작 ( Upload Id 요청 )
        CompletableFuture<CreateMultipartUploadResponse> s3AsyncClientMultipartUpload = s3AsyncClient
            .createMultipartUpload(CreateMultipartUploadRequest.builder()
                .contentType(mediaType.toString())
                .key(filename)
                .metadata(metadata)
                .bucket(s3ConfigProperties.getS3BucketName())
                .build());
                

        // Upload 프로세스 진행을 위한 상태 저장 객체 - 구현 필요
        UploadStatus uploadStatus = new UploadStatus(Objects.requireNonNull(filePart.headers().getContentType()).toString(), filename);

        return Mono.fromFuture(s3AsyncClientMultipartUpload)
            .flatMapMany(createMultipartUploadResponse -> {
            	// Upload Id 저장
                uploadStatus.setUploadId(createMultipartUploadResponse.uploadId());
                return filePart.content();
            })
                ...

2. MultiPart 부분 업로드

			...
        
           .bufferUntil(dataBuffer -> {
                // 바이트 수 축적
                uploadStatus.addBuffered(dataBuffer.readableByteCount());

                // 일정 크기가 되면 업로드 상태 객체의 누적 바이트 수를 0으로 초기화 후 return true
                if (uploadStatus.getBuffered() >= 5242880) {
                    log.info("BufferUntil - returning true, bufferedBytes={}, partCounter={}, uploadId={}",
                        uploadStatus.getBuffered(), uploadStatus.getPartCounter(), uploadStatus.getUploadId());
                    uploadStatus.setBuffered(0);
                    return true;
                }
                return false;
            })
            // 업로드를 위한 버퍼 형변환
            .map(FileUtils::dataBufferToByteBuffer)
            // 2. 부분 업로드 진행
            .flatMap(byteBuffer -> uploadPartObject(uploadStatus, byteBuffer))
            // 배압 컨트롤 하여 버퍼 모으기
            .onBackpressureBuffer()
            // Flux to Mono
            .reduce(uploadStatus, (status, completedPart) -> {
                log.info("Completed: PartNumber={}, etag={}", completedPart.partNumber(), completedPart.eTag());
                (status).getCompletedParts().put(completedPart.partNumber(), completedPart);
                return status;
            })
            
            ...

3. MultiPart 업로드 완료처리

            // 3. 업로드 완료 처리
            .flatMap(status -> completeMultipartUpload(uploadStatus))
            .map(response -> {
                FileUtils.checkSdkResponse(response);
                log.info("upload result: {}", response.toString());
                return new FileResponse(filename, uploadStatus.getUploadId(), response.location(), uploadStatus.getContentType(), response.eTag());

전체 소스 - https://github.com/pilgrim13/mybox
참고

profile
Hello Worlds!

0개의 댓글