Go를 활용해 16gb 파일 데이터 수십초 만에 처리하기

임태빈·2022년 1월 13일
2

go

목록 보기
11/13

안녕하세요.

미디엄에서 Go를 활용해 16GB 데이터를 수십초 만에 읽는 방법을 알게 되어 이에 대해 한글로 작성해보려고 합니다.

원본 사이트는 여기에 들어가시면 있습니다.

데이터를 다운 받고 싶으신 분께서는 여기를 참고해주세요~

파일 읽기

파일 읽기 코드

f, err := os.Open(fileName)
 if err != nil {
   fmt.Println("cannot able to read the file", err)
   return
 }
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

위 코드를 통해 파일을 열고난 뒤 행해지는 기본적으로 사용하는 방법이 두 가지가 있습니다.

  1. 파일을 한 줄 한 줄 읽습니다. 메모리 부담을 줄이는데 도움이 되지만 IO에 더 많은 시간이 걸립니다.
  2. 파일 전체를 한 번에 메모리로 읽고 파일을 처리합니다. 메모리를 더 많이 사용하기 때문에 시간이 크게 증가합니다.

만약 16GB에 파일을 읽게 된다면 어떻게 될까요?

2번 처럼 메모리에 불러오기도 어렵습니다. 또한, 1번 처럼 할 수 있지만 몇 초만에 처리하기도 어렵습니다.

그러면 어떻게 해야할까요?

chunk size로 파일을 읽어 데이터를 처리하면 됩니다.

Go에서는 bufio.NewReader()라는 함수를 제공해주는 데여 이 함수를 통해 파일을 chunk 단위로 읽을 수 있습니다.

이 코드를 사용하는 방법은 다음과 같습니다.

r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
   buf = buf[:n]
if n == 0 {
     if err != nil {
       fmt.Println(err)
       break
     }
     if err == io.EOF {
       break
     }
     return err
  }
}

대용량에 데이터를 읽을 때는 어떻게 해야할까여?

sync.pool과 goroutine를 사용해 대량의 데이터를 동시다발적으로 읽어들여 처리하면 됩니다.

코드는 다음과 같습니다.

func Process(f *os.File) error {
	linesPool := sync.Pool{New: func() interface{} {
		lines := make([]byte, 500*1024)
		return lines
	}}
	stringPool := sync.Pool{New: func() interface{} {
		lines := ""
		return lines
	}}
	slicePool := sync.Pool{New: func() interface{} {
		lines := make([]string, 100)
		return lines
	}}
	var wg sync.WaitGroup
	r := bufio.NewReader(f)
	for {
		buf := linesPool.Get().([]byte)
		n, err := r.Read(buf)
		buf = buf[:n]
		//fmt.Println(n)
		if n == 0 {
			if err != nil {
				fmt.Println(err)
				break
			}

			if err == io.EOF {
				fmt.Println(err)
				break
			}
			return err
		}
		//linesPool을 통해 받은 데이터에 끝부분이 \n아닐 수도 있기에 추가적으로 읽기 진행
		nextUntillNewline, err := r.ReadBytes('\n')
		if err != io.EOF {
			buf = append(buf, nextUntillNewline...)
		}
		wg.Add(1)
		go func() {
			ProcessChunk(buf, &linesPool, &stringPool, &slicePool)
			wg.Done()
		}()
	}
	wg.Wait()
	return nil
}

이 코드에서는 바이트 단위로 데이터를 읽어서 goroutine을 돌려줍니다. 이때 바이트 단위로 쪼개지다보면 문장이 끊어져서 들어올 수 있습니다. 이를 해결하기 위해 r.ReadBytes('\n')을 통해 추가적으로 데이터를 읽어줍니다. 그런 다음 ProcessChunk에 데이터를 보내 전처리를 진행해주면 됩니다!!

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool) {
	//another wait group to process every chunk further
	var wg2 sync.WaitGroup
	//stringPool 불러오기
	logs := stringPool.Get().(string)
	//청크 데이터를 문자열로 변환
	logs = string(chunk)
	linesPool.Put(chunk) //put back the chunk in pool
	//slicePool 불러오기
	logsSlice := slicePool.Get().([]string)
	//\n을 기준으로 string 배열 생성하기
	logsSlice = strings.Split(logs, "\n")
	//stringPool 반환
	stringPool.Put(logs)
	//100개의 로그만 읽기위한 chunksize 지정
	chunkSize := 100
	length := len(logsSlice)
	//traverse the chunk
	for i := 0; i < length; i += chunkSize {
		wg2.Add(1)
		//청크 계산하기
		go func(s int, e int) {
			for i := s; i < e; i++ {
				text := logsSlice[i]
				if len(text) == 0 {
					continue
				}

				/*
					여기에 전처리 코드를 입력하시면 됩니다:)
				*/
			}
			wg2.Done()
		}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))

	}
	//청크가 다 끝날때까지 기달리기
	wg2.Wait()
	// slicePool 반환
	slicePool.Put(logsSlice)
}

stringPool을 통해 로그들을 받을 준비를 해주고 linesPool은 사용을 다했기에 Put을 통해 반환해줍니다.slicePool또한 불러와서 Logs들을 \n 단위로 쪼개서 저장해줍니다.
청크단위로 100개씩 해서 goroutine을 돌려줘서 데이터를 처리해주기만 하면 됩니다.

코드는 여기를 참고해주세요:)

오늘 포스팅은 이걸로 마치겠습니다~

profile
golang과 서버 개발을 하고 있는 개발자입니다.

0개의 댓글