csv to parquet by rust

손호준·2023년 7월 5일

csv를 parquet로 변환하는 csv2Parquet 크레이트가 존재하지만, 이건 단일 파일로 변환해준다.
(https://crates.io/crates/csv2parquet)

parquet와 Arrow

Parquet는 데이터 직렬화를 위한 컬럼 파일 형식입니다. Parquet 파일을 읽으려면 내용을 일종의 메모리 내 데이터 구조로 압축 해제하고 디코딩해야 합니다. 디코딩을 위한 CPU 사용률을 희생시키면서 공간/IO 효율적이도록 설계되었습니다. 메모리 내 컴퓨팅을 위한 데이터 구조를 제공하지 않습니다. Parquet은 처음부터 끝까지 디코딩해야 하는 스트리밍 형식이며 최근 일부 "인덱스 페이지" 기능이 저장 형식에 추가되었으며 일반적으로 임의 액세스 작업은 비용이 많이 듭니다.

반면에 Arrow는 인메모리 컴퓨팅을 위한 컬럼 데이터 구조를 제공하는 가장 중요한 라이브러리입니다 . Parquet 파일을 읽을 때 데이터를 Arrow 열 형식 데이터 구조 로 압축 해제하고 디코딩할 수 있으므로 디코딩된 데이터에 대해 메모리 내에서 분석을 수행할 수 있습니다. 임의 액세스는 O(1)이고 각 값 셀은 메모리에서 이전 및 다음 값 셀 옆에 있으므로 반복하는 것이 효율적입니다.

그렇다면 "Arrow file"은? Apache Arrow는 메시징 및 프로세스 간 통신에 사용할 수 있는 Arrow 열 형식 배열("레코드 배치"라고 함) 컬렉션을 배열하기 위한 이진 "직렬화" 프로토콜을 정의합니다. 나중에 메모리 매핑하거나 메모리로 읽고 다른 곳으로 보낼 수 있는 디스크를 포함하여 어디에나 프로토콜을 둘 수 있습니다.

이 Arrow 프로토콜은 역직렬화를 수행하지 않고 Arrow 데이터의 블롭을 "매핑"할 수 있도록 설계되었으므로 디스크에서 Arrow 프로토콜 데이터에 대한 분석을 수행하면 메모리 매핑을 사용하고 비용을 효과적으로 지불할 수 있습니다. 이 프로토콜은 "pandas udfs"라고 하는 Spark SQL 데이터 청크에 대해 pandas 함수를 실행하기 위해 Spark SQL과 Python 간의 데이터 스트리밍과 같은 많은 작업에 사용됩니다.

일부 애플리케이션에서 Parquet 및 Arrow는 온디스크 데이터 직렬화를 위해 서로 교환하여 사용할 수 있습니다.

명심해야 할 몇 가지 사항:

  • Parquet은 "아카이브" 목적을 위한 것입니다. 즉, 지금 파일을 작성하는 경우 "Parquet 읽기"가 가능하다고 말하는 모든 시스템이 5년 또는 7년 후에 파일을 읽을 수 있을 것으로 기대합니다. 우리는 아직 Arrow 형식의 장기적인 안정성에 대해 이 주장을 하지 않습니다(미래에는 그럴 수도 있지만).
  • Parquet는 다른 데이터 구조로 디코딩해야 하기 때문에 일반적으로 읽는 데 훨씬 더 비쌉니다. Arrow 프로토콜 데이터는 단순히 메모리 매핑될 수 있습니다.
  • Parquet 파일은 Parquet에서 사용하는 데이터 인코딩 체계로 인해 디스크 상의 Arrow-protocol-protocol-on-disk보다 훨씬 작은 경우가 많습니다. 디스크 스토리지 또는 네트워크가 느린 경우 Parquet가 더 나은 선택이 될 것입니다.


(출처:https://stackoverflow.com/questions/56472727/difference-between-apache-parquet-and-arrow)


결론: Parquet 파일은 디스크 스토리지용으로 설계되었고 Arrow는 인메모리용으로 설계되었다(그러나 디스크에 저장한 다음 나중에 메모리 맵에 넣을 수 있음). 이들은 서로 호환되도록 의도되었으며 응용 프로그램에서 함께 사용될 수 있다.

Arrorw2

Arrow2 크레이트는 arrow 형식과 상호 운용을 가능하게 하는 데이터 구조 및 기능을 구현한 Rust 라이브러리이다.

아파치 arrow는 CPU나 GPU 같은 최신 하드웨어에서 효율적인 분석 작업을 위한 열 지향 메모리 형식을 말한다. Arrow 메모리 형식은 직렬화 오버헤드 없이 초고속 데이터 액세스를 위해 무복사 읽기를 지원한다.

Arrow2의 출발점은 데이터가 arrow의 생태계와 상호 운용될 수 있도록 특정 배열로 메모리에 저장된다는 아이디어 이다.
Arrow2의 가장 중요한 디자인 측면은 연속 영역이 Arc와 공유된다는 점이다. 이렇게 했을 때 장점은 메모리 영역의 슬라이싱 비용이 O(1) 이라는 것이고(because it corresponds to changing an offset and length), 단점은 Arc 아래 있게 되면 메모리 영역이 불변이라는 점이다. (이걸 어떻게 극복하는지 아래서 살펴보자)
두번째로 중요한 점은 Arrows가 두가지 타입의 데이터 버퍼를 갖는다는 점인데 (하나는 bitmaps(오프셋이 bits)이고 다른 하나는 bytes(오프셋이 bytes) 이다.) 이를 염두에 두고 Arrow2는 연속 메모리 영역의 두 가지 주요 컨테이너를 갖는다.
...

변환 과정

csv read한 후에 parquet write 하고 메타 데이터(.json) 생성하기.

1. csv read

csv 파일을 읽을때 CPU intensive한 작업 두가지

  • csv 파일을 row 단위로 쪼갠다. 여기에는 parsing quotes와 delimiters가 포함되는데 주어진 row를 seek하기 위해 필요하다.
  • bytes 단위의 csv rows 묶음을 Array들로 파싱한다.

바이트를 값으로 파싱하는 것은 라인을 해석하는것 보다 비싸다. 따라서 파일의 서로 다른 부분을 스캔하는 단일 파일의 readers를 갖는 것이 일반적으로 유리하다.(IO 제약 조건 내에서) -> csv 크레이트에 의존함

Arrow2크레이트의 read csv하는 예제 코드를 테스트 해보려 했으나..


arrow2 깃헙 레포 확인해보면 csv모듈이 있는데 csv모듈을 찾을 수 없다는 에러가 발생

When compiled with feature io_csv, you can use this crate to read CSV files. This crate makes minimal assumptions on how you want to read a CSV, and offers a large degree of customization to it, along with a useful default.

는 cfg 문제였다. cargo add arrow2 --features io_csv 커맨드로 io_csv를 추가해주니 정상적으로 작동했다. 아래는 예제 코드

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::csv::read;

pub fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Box<dyn Array>>> {
    // Create a CSV reader. This is typically created on the thread that reads the file and
    // thus owns the read head.
    let mut reader = read::ReaderBuilder::new().from_path(path)?;

    // Infers the fields using the default inferer. The inferer is just a function that maps bytes
    // to a `DataType`.
    let (fields, _) = read::infer_schema(&mut reader, None, true, &read::infer)?; // 타입 추론 infer 함수는 각 DataType 분기

    // allocate space to read from CSV to. The size of this vec denotes how many rows are read.
    let mut rows = vec![read::ByteRecord::default(); 100]; // vec![]의 initial_capacity 설정

    // skip 0 (excluding the header) and read up to 100 rows.
    // this is IO-intensive and performs minimal CPU work. In particular,
    // no deserialization is performed.
    let rows_read = read::read_rows(&mut reader, 0, &mut rows)?;
    let rows = &rows[..rows_read];

    // parse the rows into a `Chunk`. This is CPU-intensive, has no IO,
    // and can be performed on a different thread by passing `rows` through a channel.
    // `deserialize_column` is a function that maps rows and a column index to an Array
    read::deserialize_batch(rows, &fields, projection, 0, read::deserialize_column)
}

read_path의 반환값을 보면 Result<Chunk<Box<dyn Array>>>인 것을 알 수 있는데, 위에서 설명한 것처럼 arrow의 생태계와 상호 운용될 수 있도록 특정 배열로 메모리에 저장한다.

profile
Rustacean🦀

0개의 댓글