안녕하세요.
미디엄에서 Go를 활용해 16GB 데이터를 수십초 만에 읽는 방법을 알게 되어 이에 대해 한글로 작성해보려고 합니다.
원본 사이트는 여기에 들어가시면 있습니다.
데이터를 다운 받고 싶으신 분께서는 여기를 참고해주세요~
f, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
// UPDATE: close after checking error
defer file.Close() //Do not forget to close the file
위 코드를 통해 파일을 열고난 뒤 행해지는 기본적으로 사용하는 방법이 두 가지가 있습니다.
2번 처럼 메모리에 불러오기도 어렵습니다. 또한, 1번 처럼 할 수 있지만 몇 초만에 처리하기도 어렵습니다.
그러면 어떻게 해야할까요?
chunk size로 파일을 읽어 데이터를 처리하면 됩니다.
Go에서는 bufio.NewReader()라는 함수를 제공해주는 데여 이 함수를 통해 파일을 chunk 단위로 읽을 수 있습니다.
이 코드를 사용하는 방법은 다음과 같습니다.
r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
}
대용량에 데이터를 읽을 때는 어떻게 해야할까여?
sync.pool과 goroutine를 사용해 대량의 데이터를 동시다발적으로 읽어들여 처리하면 됩니다.
코드는 다음과 같습니다.
func Process(f *os.File) error {
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 500*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
slicePool := sync.Pool{New: func() interface{} {
lines := make([]string, 100)
return lines
}}
var wg sync.WaitGroup
r := bufio.NewReader(f)
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
//fmt.Println(n)
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
fmt.Println(err)
break
}
return err
}
//linesPool을 통해 받은 데이터에 끝부분이 \n아닐 수도 있기에 추가적으로 읽기 진행
nextUntillNewline, err := r.ReadBytes('\n')
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
ProcessChunk(buf, &linesPool, &stringPool, &slicePool)
wg.Done()
}()
}
wg.Wait()
return nil
}
이 코드에서는 바이트 단위로 데이터를 읽어서 goroutine을 돌려줍니다. 이때 바이트 단위로 쪼개지다보면 문장이 끊어져서 들어올 수 있습니다. 이를 해결하기 위해 r.ReadBytes('\n')을 통해 추가적으로 데이터를 읽어줍니다. 그런 다음 ProcessChunk에 데이터를 보내 전처리를 진행해주면 됩니다!!
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool) {
//another wait group to process every chunk further
var wg2 sync.WaitGroup
//stringPool 불러오기
logs := stringPool.Get().(string)
//청크 데이터를 문자열로 변환
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//slicePool 불러오기
logsSlice := slicePool.Get().([]string)
//\n을 기준으로 string 배열 생성하기
logsSlice = strings.Split(logs, "\n")
//stringPool 반환
stringPool.Put(logs)
//100개의 로그만 읽기위한 chunksize 지정
chunkSize := 100
length := len(logsSlice)
//traverse the chunk
for i := 0; i < length; i += chunkSize {
wg2.Add(1)
//청크 계산하기
go func(s int, e int) {
for i := s; i < e; i++ {
text := logsSlice[i]
if len(text) == 0 {
continue
}
/*
여기에 전처리 코드를 입력하시면 됩니다:)
*/
}
wg2.Done()
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
}
//청크가 다 끝날때까지 기달리기
wg2.Wait()
// slicePool 반환
slicePool.Put(logsSlice)
}
stringPool을 통해 로그들을 받을 준비를 해주고 linesPool은 사용을 다했기에 Put을 통해 반환해줍니다.slicePool또한 불러와서 Logs들을 \n 단위로 쪼개서 저장해줍니다.
청크단위로 100개씩 해서 goroutine을 돌려줘서 데이터를 처리해주기만 하면 됩니다.
코드는 여기를 참고해주세요:)
오늘 포스팅은 이걸로 마치겠습니다~