대용량 데이터 다운로드 & S3에 업로드 최적화하기 with Python

bradley·2023년 4월 6일
2

Python

목록 보기
1/1
post-thumbnail

개요


특정 URL에서 대용량의 데이터를 다운로드 받을 경우가 있다. 그리고 이 데이터를 또 다른 데이터랑 결합하여 분석하고자 하는 등의 니즈가 있다.
이런 니즈를 위해 이 데이터를 내부 S3에 업로드하고, 여러 사람이 가져다 쓰거나 그때그때 쿼리할 수 있도록 해야한다.
여기서 고려해야 할 점은 대용량의 데이터를 다운로드 받고 업로드하는 과정에서 Out Of Memory 문제를 겪을 수 있다는 것이다. 또 처리량, 처리 속도가 향상되길 원할 수 있다.
따라서 대용량의 데이터를 좀 더 효율적으로 처리하는 방법에 대해 고찰해보고자 한다.

이 프로세스를 두 가지 부분으로 나누어서 살펴보자.

  • 대용량의 데이터를 다운로드 받는 과정
  • S3에 업로드 하는 과정

대용량의 데이터 다운로드


Out Of Memory

Python에서 요청에 대한 응답 데이터를 받을 때 requests 모듈로 쉽게 받을 수 있다.

response = requests.get(url)

하지만 응답 데이터가 최소 수 백 MB는 되는 용량이 큰 데이터일 경우 문제가 발생할 수 있다.
requests로 받은 응답 데이터는 Memory에 저장된다.
큰 데이터를 받는다면 결국 Memory를 많이 잡아먹고, 남아있는 Memory가 부족하다면 Out Of Memory 에러를 겪을 수 있다.

해결 방법

따라서 이를 해결하기 위해 응답 데이터를 Chunk 단위로 쪼개서 읽을 필요가 있다.
그 과정을 살펴보자.

response = requests.get(url)

이 경우 get() 안에 stream=False가 기본값으로 설정되어 있고, 응답 데이터를 단일 Chunk로 다운로드 받도록 한다.
즉, 이 지점에서 데이터를 다운로드받는다.

response = requests.get(url, stream=True)

하지만 stream=True 옵션을 주면, Response.content에 접근하기 전까지 응답 데이터 다운로드를 연기한다.
이 지점에서는 응답 헤더만 다운로드 되고, 연결이 Open 된 채로 남아있다.
그래서 스트림이 생성된 이 때 컨텐츠 검색을 조건부로 만들 수 있다.
예를 들어, 응답 데이터가 100MB보다 크면 다운로드 안 받는다 같은?

추가로 연결 해제를 보장하기 위해 with문을 쓰는 것이 좋다.

with requests.get(url, stream=True) as r:
	# To Do

이제 조건부로 iter_content()를 이용해 컨텐츠를 Chunk 단위로 쪼개서 받으면 된다.

with requests.get(url, stream=True) as r:
	for chunk in r.iter_content(chunk_size=chunk_size):
    	# 여기에 chunk를 write 하면 됨

chunk_size는 Memory로 읽어야 하는 byte 단위이다.


시나리오 테스트

대충 개념은 저렇고, 실제로 시나리오 테스트를 해보자.

다운로드 받을 데이터는 공개된 데이터인데 536MB Parquet 파일이다.
이 파일을 100MB Chunk 단위로 쪼개서 다운로드 받도록 구현해보았다.

import requests

url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2019-10.parquet'
filename = url.split('/')[-1]

with requests.get(url, stream=True) as r:
    with open(filename, 'wb') as f:
        for chunk in r.iter_content(chunk_size=100*1024*1024):
            f.write(chunk)

파일 용량을 모니터링해보면, 반복마다 100MB 씩 증가하는 것을 볼 수 있다.

이 외에도 대용량 파일을 다운로드하는 또 다른 방법은 아래 포스팅을 참조하면 된다.
https://fedingo.com/how-to-download-large-files-in-python-requests/

S3 Multipart Upload


S3 Multipart upload는 Object를 S3에 업로드할 때, Parts로 나누어 업로드하는 방법이다.
AWS에서는 Object 크기가 100MB 이상이면, 단일 업로드보다는 Multipart Upload를 권장한다.

동작 방식은 모든 Parts가 업로드 된 뒤, S3가 이 Parts들을 조합해서 Object를 만든다.
따라서,

  • 각 Part를 독립적으로 순서에 상관없이 업로드할 수 있고,
  • 병렬 처리하여 처리량도 향상시킬 수 있고,
  • 특정 Part가 업로드 중 실패하면 다른 Part에 영향을 미치지 않고 해당 Part만 재업로드할 수 있다.

공식 문서는 아래 링크를 참조하면 되지만, 역시나 불친절하다...
"그래서 간단하게 어떻게 하냐고!"가 해소가 안된다...
https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html

더도 말고 직관적인 코드로 S3 Multipart Upload를 해보자.

import boto3

filename = <file_name>

s3 = boto3.client("s3", aws_access_key_id=<aws_access_key_id>,
                  aws_secret_access_key=<aws_secret_access_key>)
bucket = <bucket_name>
key = <key_name>

mpu = s3.create_multipart_upload(Bucket=bucket, Key=key)
mpu_id = mpu["UploadId"]

parts = []

MB = 1024 * 1024
part_size = 100 * MB
with open(filename, 'rb') as f:
    i = 1
    while True:
        data = f.read(part_size)
        print(len(data))
        if not len(data):
            break

        part = s3.upload_part(Body=data, Bucket=bucket,
                              Key=key, UploadId=mpu_id, PartNumber=i)
        part_dict = {'PartNumber': i, 'ETag': part['ETag']}
        parts.append(part_dict)
        i += 1

result = s3.complete_multipart_upload(
    Bucket=bucket,
    Key=key,
    UploadId=mpu_id,
    MultipartUpload={'Parts': parts}
)

코드는 아래 포스팅을 참조하였다.
https://gist.github.com/teasherm/bb73f21ed2f3b46bc1c2ca48ec2c1cf5

Multipart Upload의 기초적인 코드이고, 공식 문서보고 더 고도화하여 병렬 처리도 가능할 것이다.

더 최적화할 수는 없을까?


지금까지 OOM 에러의 영향을 피하고 대용량 데이터를 다운로드 받는 방법과 대용량 데이터를 S3에 나누어서 업로드하는 방법에 대해 알아보았다.

그렇다면 이 과정을 합치면 어떻게 될까?
말 그대로 인터넷에서 대용량 데이터를 다운로드 받아서 S3에 업로드하는 과정이 된다.

다만 여태 살펴본 과정을 보면 다음과 같은 과정을 거친다.

  • 다운로드 과정 : 인터넷 → Memory → Disk(파일)
  • 업로드 과정 : Disk(파일) → Memory → S3

고찰

이런 생각이 든다.

Disk를 꼭 거쳐야 할까?

Disk read/write도 발생하고, 처리 속도도 느린데 굳이?
또 파일이 남겨지면 시간이 흐르면서 Disk 용량도 잡아먹게 되고, 지저분하기도 하기 때문에 (필자같이 찌거기 남는 거 싫어하는 사람들이 있다ㅋㅋ)
어쨋든 이 파일을 처리해야 하는데, 파일 처리하는 부분도 결국 하나의 프로세스이다. (귀찮고 고려해야 할 부분들이 있어 은근 까다롭다...)

최적화

이 부분을 더 최적화해보자.
Disk 단까지 내려가기 전 Memory 단에서 받은 데이터를 바로 S3에 업로드하면 된다.
즉, 인터넷 → Memory → S3 과정이다.

import requests
import boto3

url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2019-10.parquet'
filename = url.split('/')[-1]

s3 = boto3.client("s3", aws_access_key_id=<aws_access_key_id>,
                  aws_secret_access_key=<aws_secret_access_key>)
bucket = <bucket_name>
key = <key_name>


MB = 1024 * 1024
chunk = 100 * MB

with requests.get(url, stream=True) as r:
	if r.ok:
    	mpu = s3.create_multipart_upload(Bucket=bucket, Key=key)
		mpu_id = mpu["UploadId"]

		parts = []

		for i, chunk in enumerate(r.iter_content(chunk_size=chunk), start=1):
			part = s3.upload_part(
                    Body=chunk, 
                    Bucket=bucket, 
                    Key=key, 
                    UploadId=mpu_id, 
                    PartNumber=i
				)
			part_dict = {'PartNumber': i, 'ETag': part['ETag']}
			parts.append(part_dict)

		result = s3.complete_multipart_upload(
					Bucket=bucket, 
					Key=key, 
					UploadId=mpu_id, 
					MultipartUpload={'Parts': parts}
				)
profile
데이터 엔지니어링에 관심이 많은 홀로 삽질하는 느림보

1개의 댓글

comment-user-thumbnail
2023년 9월 8일

깔끔한 글 잘 읽었습니다. 감사합니다!

답글 달기