대용량 학습 데이터 불러오기 & 전처리 시간 1/3로 줄이기

jongmin-oh·2023년 5월 23일
0

문제발생

학습데이터를 구성할때 긍정 샘플(postive sample)과 부정 샘플(negative sample)의 비율을
1:1로 구성했었지만, 여러 부정 샘플을 추가시키면 모델의 정확도가 향상되기 때문에
여러 부정샘플들을 추가하여 1:6비율로 학습 데이터를 구성했다. 그러다 보니
최종 학습데이터의 크기가 몇 배이상으로 늘었는데( 총 5000만 건),
이 과정에서 불러오고(메모리에 올리기에만) 간단한 전처리(포맷변경)만 38분이 걸렸다.

요약

여러 삽질과정을 통해 30분에 시간을 13분으로 줄였고 그 과정을 공유해보고자 한다.
물론 코어 수가 많거나 메모리가 충분할 경우에만 활용 가능한 방법이다.
*나의 경우 지원 받은 연구용 컴퓨터가 48core 800메모리이다..

1. 데이터 불러오기

데이터 형식

일반적으로는 .csv 형식이 익숙할 것이다.
여기서 속도를 조금 생각한다면 .txt 혹은 .tsv의 데이터형식을 생각해 볼 수 있다 대부분 예제들이나 오픈데이터들을 보면 tsv로 저장되어있는 경우가 많다.
하지만 tsv나 txt의 경우 읽어올때 가끔 "/t"을 잘 인식못하거나 불필요 문자열이 포함되어있어서
잘 구분 못하여 에러가 발생하는 경우가 종종 발생했고, 난 한 톨에 데이터도 버리기 아까워서 tsv방식은 예전에 버렸다.

tsv 말고는 parquet 방식이 있는데 parquet 형식은 데이터를 압축하여 저장하는 방식이다.
parquet의 유일한 단점은 tsv 나 csv 처럼 메모장이나 엑셀로 열어서 볼 수 없다는 점인데
이 부분만 빼면 장점이 훨씬 많다, 용량 대폭 감소, 불러오기 시간 감소

대용량 데이터의 경우(1000만건이 넘는) 데이터 형식은 무조건 parquet 형식을 추천한다.

패키지

python에서 데이터를 불러오는 패키지는 여러가지가 있지만 대표적인 pandas와 dask를 비교해보았다.
이론상으로 dask는 병렬처리를 지원하기 때문에 전처리 작업이 많다면 dask를 추천하지만, 데이터 불러오기만 보았을때는 데이터가 아주 크지 않다면 오히려 오버헤드가 발생하여 더 느리게 불러올 수 있다.
*나의경우 dask와 pandas의 read_parquet 속도가 똑같거나, dask가 미세하게 더 느렸다.

그래서 그냥 pandas를 사용했다.


2. 데이터 전처리

앞서 말했듯 데이터 프레임에 대한 전처리를 한다면 dask를 사용하면 좋을 것 같다,
하지만 난 데이터 프레임이 아니라 데이터 input 형식을 바꿔주는 작업을 해줘야했다.

기존 코드

import pandas as pd

class TrainDataLoader:
    def _load_data(self):
        train_input = []
        dev_input = []

        train_data = pd.read_parquet(DATASETS_DIR + "/train.gzip", use_threads=True)
        dev_data = pd.read_parquet(DATASETS_DIR + "/dev.gzip", use_threads=True)

        for row in train_data.iterrows():
            s1 = row[1]["col1"]
            s2 = row[1]["col2"]
            score = int(row[1]["label"])
            train_input.append(InputExample(texts=[s1, s2], label=score))

        for row in dev_data.iterrows():
            s1 = row[1]["col1"]
            s2 = row[1]["col2"]
            score = int(row[1]["label"])
            dev_input.append(InputExample(texts=[s1, s2], label=score))

        print(f"number of Train data : {len(train_input)}")
        print(f"number of Dev&Test data : {len(dev_input)}")

        return train_input, dev_input

    def _load_test(self):
        test_input = []
        test_data = pd.read_parquet(DATASETS_DIR + "/test.gzip", use_threads=True)

        for row in test_data.iterrows():
            s1 = row[1]["col1"]
            s2 = row[1]["col2"]
            score = int(row[1]["label"])
            test_input.append(InputExample(texts=[s1, s2], label=score))

        return test_input

간단하게 설명하면 데이터 프레임을 순회하면서 row를 InputExample 형식으로 변경해주는 작업이 필요했다. 이 과정에서 많은 시간이 소요가 되었는데. 멀티 프로세싱으로 코드를 변경했다.

실행 시간 :
CPU times: user 37min 41s, sys: 1min, total: 38min 42s
Wall time: 38min 19s


변경 코드

class TrainDataLoader:
    def row_to_inputs(self, data):
        inputs = []
        for row in data.iterrows():
            s1 = row[1]["col1"]
            s2 = row[1]["col2"]
            score = int(row[1]["label"])
            inputs.append(InputExample(texts=[s1, s2], label=score))
        return inputs

    def read_data(self, path):
        return pd.read_parquet(path)

    def parallel_process_dataframe(self, df):
        num_chunks = int(cpu_count() * 0.7)
        chunks = [df[i:i+len(df)//num_chunks] for i in range(0, len(df), len(df)//num_chunks)]
        with ProcessPoolExecutor(max_workers=num_chunks) as executor:
            # Apply the process_dataframe function to each chunk of data
            results = executor.map(self.row_to_inputs, chunks)
        
            # Collect the results from each process
            processed_data = []
            for result in results:
                processed_data.extend(result)
    
        return processed_data
    
    def _load_data(self):
        train_df = self.read_data(DATASETS_DIR + "/train.gzip")
        dev_df = self.read_data(DATASETS_DIR + "/dev.gzip")

        train_inputs = self.parallel_process_dataframe(train_df)
        dev_inputs = self.parallel_process_dataframe(dev_df)
        return train_inputs, dev_inputs

    def _load_test(self):
        test_df = self.read_data(DATASETS_DIR + "/test.gzip")
        test_inputs = self.parallel_process_dataframe(test_df)
        return test_inputs

parallel_process_dataframe 함수가 새롭게 추가되었는데.
현재 컴퓨터에 CPU 코어수를 가져와서 그 코어 수 만큼 데이터 프레임을 num_chucks로 분할한다.
그 분할한 데이터 하나하나를 각각의 코어에 할당하고, 그 결과를 마지막에 합쳐서 반환하는 코드이다.

실행 시간 :
CPU times: user 9min 5s, sys: 4min 10s, total: 13min 16s
Wall time: 13min 23s

38분에서 13분으로 단축


결론

똑같은 기능을 하는 코드를 조금 수정하여 시간을 1/3로 단축시켰다.
이 코드로 바꾸는 데 소요된 시간은 2시간이 넘었다.(삽질 시간 포함)

하지만 앞으로 학습데이터를 불러올때마다 매번 1/3 단축할 수 있게 되었다.
솔직히 귀찮아서 그냥 바꾸지말까도 고민했지만,
막상 바꾸고 그 과정을 공유해보니 뿌듯하다. 더 좋은 빠른 방법을 연구해보고
업데이트 하면서 더 줄여보도록하자. 시도해 볼 만한 방법은 아주 많더라~

profile
스타트업에서 자연어처리 챗봇을 연구하는 머신러닝 개발자입니다.

0개의 댓글