특정 URL에서 대용량의 데이터를 다운로드 받을 경우가 있다. 그리고 이 데이터를 또 다른 데이터랑 결합하여 분석하고자 하는 등의 니즈가 있다.
이런 니즈를 위해 이 데이터를 내부 S3에 업로드하고, 여러 사람이 가져다 쓰거나 그때그때 쿼리할 수 있도록 해야한다.
여기서 고려해야 할 점은 대용량의 데이터를 다운로드 받고 업로드하는 과정에서 Out Of Memory 문제를 겪을 수 있다는 것이다. 또 처리량, 처리 속도가 향상되길 원할 수 있다.
따라서 대용량의 데이터를 좀 더 효율적으로 처리하는 방법에 대해 고찰해보고자 한다.
이 프로세스를 두 가지 부분으로 나누어서 살펴보자.
Python에서 요청에 대한 응답 데이터를 받을 때 requests
모듈로 쉽게 받을 수 있다.
response = requests.get(url)
하지만 응답 데이터가 최소 수 백 MB
는 되는 용량이 큰 데이터일 경우 문제가 발생할 수 있다.
requests
로 받은 응답 데이터는 Memory에 저장된다.
큰 데이터를 받는다면 결국 Memory를 많이 잡아먹고, 남아있는 Memory가 부족하다면 Out Of Memory 에러를 겪을 수 있다.
따라서 이를 해결하기 위해 응답 데이터를 Chunk 단위로 쪼개서 읽을 필요가 있다.
그 과정을 살펴보자.
response = requests.get(url)
이 경우 get()
안에 stream=False
가 기본값으로 설정되어 있고, 응답 데이터를 단일 Chunk로 다운로드 받도록 한다.
즉, 이 지점에서 데이터를 다운로드받는다.
response = requests.get(url, stream=True)
하지만 stream=True
옵션을 주면, Response.content
에 접근하기 전까지 응답 데이터 다운로드를 연기한다.
이 지점에서는 응답 헤더만 다운로드 되고, 연결이 Open 된 채로 남아있다.
그래서 스트림이 생성된 이 때 컨텐츠 검색을 조건부로 만들 수 있다.
예를 들어, 응답 데이터가 100MB
보다 크면 다운로드 안 받는다 같은?
추가로 연결 해제를 보장하기 위해 with
문을 쓰는 것이 좋다.
with requests.get(url, stream=True) as r:
# To Do
이제 조건부로 iter_content()
를 이용해 컨텐츠를 Chunk 단위로 쪼개서 받으면 된다.
with requests.get(url, stream=True) as r:
for chunk in r.iter_content(chunk_size=chunk_size):
# 여기에 chunk를 write 하면 됨
chunk_size
는 Memory로 읽어야 하는byte
단위이다.
대충 개념은 저렇고, 실제로 시나리오 테스트를 해보자.
다운로드 받을 데이터는 공개된 데이터인데 536MB
Parquet 파일이다.
이 파일을 100MB
Chunk 단위로 쪼개서 다운로드 받도록 구현해보았다.
import requests
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2019-10.parquet'
filename = url.split('/')[-1]
with requests.get(url, stream=True) as r:
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=100*1024*1024):
f.write(chunk)
파일 용량을 모니터링해보면, 반복마다 100MB
씩 증가하는 것을 볼 수 있다.
이 외에도 대용량 파일을 다운로드하는 또 다른 방법은 아래 포스팅을 참조하면 된다.
https://fedingo.com/how-to-download-large-files-in-python-requests/
S3 Multipart upload는 Object를 S3에 업로드할 때, Parts로 나누어 업로드하는 방법이다.
AWS에서는 Object 크기가 100MB
이상이면, 단일 업로드보다는 Multipart Upload를 권장한다.
동작 방식은 모든 Parts가 업로드 된 뒤, S3가 이 Parts들을 조합해서 Object를 만든다.
따라서,
공식 문서는 아래 링크를 참조하면 되지만, 역시나 불친절하다...
"그래서 간단하게 어떻게 하냐고!"가 해소가 안된다...
https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
더도 말고 직관적인 코드로 S3 Multipart Upload를 해보자.
import boto3
filename = <file_name>
s3 = boto3.client("s3", aws_access_key_id=<aws_access_key_id>,
aws_secret_access_key=<aws_secret_access_key>)
bucket = <bucket_name>
key = <key_name>
mpu = s3.create_multipart_upload(Bucket=bucket, Key=key)
mpu_id = mpu["UploadId"]
parts = []
MB = 1024 * 1024
part_size = 100 * MB
with open(filename, 'rb') as f:
i = 1
while True:
data = f.read(part_size)
print(len(data))
if not len(data):
break
part = s3.upload_part(Body=data, Bucket=bucket,
Key=key, UploadId=mpu_id, PartNumber=i)
part_dict = {'PartNumber': i, 'ETag': part['ETag']}
parts.append(part_dict)
i += 1
result = s3.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=mpu_id,
MultipartUpload={'Parts': parts}
)
코드는 아래 포스팅을 참조하였다.
https://gist.github.com/teasherm/bb73f21ed2f3b46bc1c2ca48ec2c1cf5
Multipart Upload의 기초적인 코드이고, 공식 문서보고 더 고도화하여 병렬 처리도 가능할 것이다.
지금까지 OOM 에러의 영향을 피하고 대용량 데이터를 다운로드 받는 방법과 대용량 데이터를 S3에 나누어서 업로드하는 방법에 대해 알아보았다.
그렇다면 이 과정을 합치면 어떻게 될까?
말 그대로 인터넷에서 대용량 데이터를 다운로드 받아서 S3에 업로드하는 과정이 된다.
다만 여태 살펴본 과정을 보면 다음과 같은 과정을 거친다.
이런 생각이 든다.
Disk를 꼭 거쳐야 할까?
Disk read/write도 발생하고, 처리 속도도 느린데 굳이?
또 파일이 남겨지면 시간이 흐르면서 Disk 용량도 잡아먹게 되고, 지저분하기도 하기 때문에 (필자같이 찌거기 남는 거 싫어하는 사람들이 있다ㅋㅋ)
어쨋든 이 파일을 처리해야 하는데, 파일 처리하는 부분도 결국 하나의 프로세스이다. (귀찮고 고려해야 할 부분들이 있어 은근 까다롭다...)
이 부분을 더 최적화해보자.
Disk 단까지 내려가기 전 Memory 단에서 받은 데이터를 바로 S3에 업로드하면 된다.
즉, 인터넷 → Memory → S3 과정이다.
import requests
import boto3
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2019-10.parquet'
filename = url.split('/')[-1]
s3 = boto3.client("s3", aws_access_key_id=<aws_access_key_id>,
aws_secret_access_key=<aws_secret_access_key>)
bucket = <bucket_name>
key = <key_name>
MB = 1024 * 1024
chunk = 100 * MB
with requests.get(url, stream=True) as r:
if r.ok:
mpu = s3.create_multipart_upload(Bucket=bucket, Key=key)
mpu_id = mpu["UploadId"]
parts = []
for i, chunk in enumerate(r.iter_content(chunk_size=chunk), start=1):
part = s3.upload_part(
Body=chunk,
Bucket=bucket,
Key=key,
UploadId=mpu_id,
PartNumber=i
)
part_dict = {'PartNumber': i, 'ETag': part['ETag']}
parts.append(part_dict)
result = s3.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=mpu_id,
MultipartUpload={'Parts': parts}
)
깔끔한 글 잘 읽었습니다. 감사합니다!