tensorflow2 multi threading을 사용해서 효율적인 data pipeline을 구축하자!
이 게시물은 tensorflow로 모델을 학습하기 전까지 사용되는 data pipeline 최적화에 대해서만 얘기하며, multi gpu 학습에 대해서는 논하지 않는다.
이미지를 처리하는 학습 파이프라인의 경우, 모델에 이미지를 넣기 전에 시간이 오래 걸리는 전처리를 해야하는 경우가 있다.
예시 파이프라인 )
- 이미지 불러오기(image decoding)
- 이미지 전처리(image preprocessing)
- 이미지 벡터라이제이션(image vectorization)
- 모델 학습/예측 (model train/inference)
- 예측 결과 후처리 (vector postprocessing)
- 예측 결과 데이터베이스에 업로드 (upload to database)
이 경우 이미지를 하나씩 불러와서 모델에 넣기까지의 과정을 순차적으로 처리한다면 시간이 굉장히 오래 걸릴 것이다.
만약 전처리 과정이 미니배치를 통한 vectorization을 하지 못하고 꼭 한 장씩만 초리해야 한다면,,,
이미지 한 장당 전처리 과정이 0.3초가 걸리고, 이미지 갯수가 100만개라면 이미지 디코딩과 후처리, 모델 예측 시간을 제외하더라도 5000분, 83시간이 걸린다. (...)
이미지 전처리만 3일이 걸리는 상황을 피하려면 multi threading과 vectorization을 적절히 사용하면 시간을 단축할 수 있다.
아래에서는 내가 어떻게 기존의 알고리즘을 .map, .batch function를 사용하여 변경했는지 예시를 적을 것이다.
(글을 읽기 전 tensorflow 입력 파이프라인 빌드에 대한 설명 을 참고하면 좋다.)
tf.data.experimental.CsvDataset 을 사용했다.
.map(func, num_parallel_calls=tf.data.AUTOTUNE)
map 함수의 num_parallel_calls인자 설정해주기.
num_parallel_calls는 map으로 전달해준 함수를 처리하는데에 몇개의 thread를 사용할 것인지 정해준다. 사용자가 임의로 32,64 와 같이 숫자를 정해줄 수도 있고, tf.data.AUTOTUNE을 사용할 수도 있다.
tf.data.AUTOTUNE으로 설정하면 tf.data 런타임이 실행 시에 동적으로 값을 조정하도록 만든다. (시스템이 알아서 최적의 thread 갯수를 찾아준다는 말)
.batch
batch 함수는 원하는 수대로 배치로 묶어준다. 배치로 묶는다는 것(vectorization)의 의미는 N 차원의 벡터로 나타낼 수 있는 데이터라면 어떤 함수를 N개의 데이터에 한꺼번에 처리할 수 있다는 것이다.
예를 들어 jpeg 이미지 데이터를 64배치로 묶는다면 해당 벡터의 차원은 (tf 기준) (64, height, width, 3) 으로 나타낼 수 있다.
사람이 해석하기에는 이 벡터가 "3차원으로 이루어진 height, width 크기의 64개의 이미지"로 해석되지만, 벡터 계산의 입장으로 보면 그냥 (64, height, width, 3) 의 shape을 가진 행렬을 계산하는 것과 같다.
따라서 한번에 처리해도 관계없는 normalization(((image / 255.0) - mean) / std)과 같은 처리는 vectorization을 거친 후에 적용하는 것이 시간이 절약된다.
.prefetch
프로듀서(데이터를 생성하는 작업)와 컨슈머(학습, 인퍼런스와 같이 데이터를 소비하는 작업)을 병렬화한다. (컨슈머가 작업할 때 프로듀서가 가만히 기다리는게 아니라 동시에 데이터를 가져온다는 말)
prefetch 함수도 map과 마찬가지로 tf.data.AUTOTUNE을 인자로 전달하여 multi-thread로 돌아가도록 할 수 있다.
.interleave
이 함수는 tf.data.experimental.CsvDataset에서는 제공하지 않아 사용하지 못했다. (글 작성 시기 : 2022.12)
tf.data.experimental.make_csv_dataset 을 사용하면 num_parallel_reads 인자를 통해 interleaving 할 수 있다.
여러개의 데이터를 읽어와야 하는 경우 tf.data.AUTOTUNE을 사용하여 multi threading을 통해 읽어올 수 있다.
정리하면 다음과 같다.
- multi threading : num_parallel_calls=tf.data.AUTOTUNE
- batch : 매핑 벡터화(mapping vectorization)
- prefetch : 프로듀서, 컨슈머 작업 오버랩
- interleave : 데이터 추출 병렬화
이미지 한장씩 처리할 수 밖에 없는 이미지 전처리 과정
이미지 디코딩, 전처리, 정규화(normalize)를 하나의 함수 안에서 진행
전처리에서 python for문으로 처리하는 과정 (매우 느림)
tensor를 np.array(tensor) 로 바꾸어 numpy 제공 함수 사용 (tensor <-> numpy 전환, numpy array 매번 생성)
동일 함수에서 이미지 normalize 진행 (normalize도 한장씩 해야 됨)
def tf_numpy_preprocess(path):
def numpy_fn(_path):
1. JPEG_manager.jpeg_decode_from_path로 이미지 디코딩
2. 이미지 전처리 함수 호출
tf_tensor_list = tf.numpy_function(numpy_fn,
[path],
[tf.float32, tf.string])
return tf_tensor_list
tfdataset = tf.data.experimental.CsvDataset('path', ['None'])
tfdataset = tfdataset.map(tf_numpy_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
tfdataset = tfdataset.repeat(1)
tfdataset = tfdataset.prefetch(batch_size)
tfdataset = tfdataset.batch(batch_size, drop_remainder=False)
def map_decorator(func):
def wrapper(image, path):
# 자동 그래프가 메서드를 컴파일하지 못하도록 tf.py_function을 사용
return tf.py_function(
func,
inp=(image, path),
Tout=(tf.float32, tf.string)
)
return wrapper
def tf_image_decoding(path):
1. tf.io.decode_jpeg로 이미지 디코딩
return [image, path]
@map_decorator
def tf_preprocess(image, _path):
1. 이미지 전처리 함수 호출
return image, _path
@map_decorator
def image_normalization(image,path):
image = tf.cast(image, tf.float32)
return ((image / 255.0) - mean) / std, path
tfdataset = tf.data.experimental.CsvDataset('path', ['none'])\
.map(tf_image_decoding, num_parallel_calls=tf.data.AUTOTUNE)\
.map(tf_preprocess, num_parallel_calls=tf.data.AUTOTUNE)\
.batch(batch_size, drop_remainder=False)\
.map(image_normalization, num_parallel_calls=tf.data.AUTOTUNE)\
.repeat(1)\
.prefetch(batch_size)