DataFusion은 Rust로 작성된 인메모리 형식의 쿼리엔진이다. SQL 쿼리를 사용하여 데이터프레임과 유사한 방식으로 고성능의 데이터를 처리 및 분석이 가능하다. Apache Arrow 프로젝트의 일부로 개발되어, Arrow의 컬럼 기반 데이터 포맷을 사용하여 메모리 효율성과 성능을 최적화한다.
DataFusion은 복잡한 SQL 및 DataFrame 쿼리를 빠르게 실행한다. 이를 위해 고급 쿼리 플래너, 열 지향 다중 스레드 벡터화 실행 엔진, 그리고 Parquet, CSV, JSON 및 Avro와 같은 파티션 데이터 소스를 사용한다. 여기에 사용자가 유즈케이스에 따라 직접 커스텀하여 확장 할 수 있는데, 예를 들어 커스텀 ExecutionPlan 실행기를 추가하거나 내장된 SQL 플래너(SqlToRel)를 쓰지 않고, 직접 LogicalPlan을 생성하는 쿼리 언어를 생성할 수 있다.
let ctx = SessionContext::new(); // SessionContext눈 세션을 관리하고 SQL 쿼리를 실행하는 데 사용되는 주요 구조체
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | MIN(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
let ctx = SessionContext::new();
// create the dataframe
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | MIN(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
RecordBatch는 Apache Arrow 프로젝트에서 제공하는 핵심 데이터 구조 중 하나로, 메모리에 저장된 테이블 형식의 데이터를 효율적으로 처리할 수 있도록 설계된 형태다.
열 지향 데이터 구조: RecordBatch는 여러 열(Column)로 구성된 테이블 형태의 데이터를 메모리에 저장. 각 열은 Arrow의 배열(Array) 객체로 표현되며, 같은 인덱스 위치의 값들이 하나의 레코드(Record)를 형성함.
메모리 최적화: Arrow의 RecordBatch는 메모리 접근 패턴을 최적화하여 CPU 캐시 활용을 극대화하고, 데이터 읽기/쓰기 성능을 최적화함.
불변성과 효율성: RecordBatch는 불변성(Immutable)을 유지함. 한 번 생성된 데이터는 변경할 수 없으며, 이러한 특성은 병렬 처리와 메모리 관리에서 안정성을 제공함.
스키마: 각 RecordBatch는 해당 데이터의 스키마(Schema) 정보를 포함. 스키마는 각 열의 데이터 유형을 정의하고, 데이터 타입 변환과 유효성 검사를 수행하는 데 사용됨
직렬화 및 역직렬화: Arrow는 RecordBatch를 직렬화하여 네트워크 통신이나 파일 저장소에 저장 가능. 데이터 공유와 저장이 용이해짐
#[derive(Clone, Debug, PartialEq)]
pub struct RecordBatch {
schema: SchemaRef,
columns: Vec<Arc<dyn Array>>,
/// The number of rows in this RecordBatch
///
/// This is stored separately from the columns to handle the case of no columns
row_count: usize,
}
SQL 또는 DataFrame API: 사용자는 SQL 쿼리나 DataFrame API를 통해 쿼리를 작성
SQL 파싱 및 논리적 계획 생성: SQL 문자열은 파싱되어 논리적 계획으로 변환. DataFrame API를 사용할 경우 논리적 계획이 직접 생성.
논리적 계획 최적화: 논리적 계획은 다양한 최적화 규칙(AnalyzerRules, OptimizerRules)에 의해 최적화.
물리적 계획 생성 및 최적화: 논리적 계획은 물리적 계획으로 변환되고, 물리적 최적화 규칙(PhysicalOptimizerRules)에 따라 최적화.
실행: 최종 물리적 계획이 실행되어 결과 데이터를 생성. 이 과정에서 DataFusion은 Apache Arrow 포맷을 사용하여 데이터를 효율적으로 처리함.
3-1. LogicalPlan은 AnalyzerRules에 의해 검사 및 재작성됨
3-2. LogicalPlan은 ptimizerRules에 의해 재작성 및 최적화됨 (eg. projection and filter pushdown).
4-1. LogicalPlan은 PhysicalPlanner에 의해 ExecutionPlan으로 변환됨.
4-2. ExecutionPlan은 PhysicalOptimizerRules에 의해 재작성되고 최적화됨 (eg. sort and join selection).
DataFusion은 여러 내장 데이터 소스를 포함하고 있으며, TableProvider 트레이트를 구현하여 확장할 수 있다. TableProvider는 플래닝과 ExecutionPlan 실행을 위한 정보를 제공한다. (커스텀 데이터소스 쓸 때 ListingTable로 TableProvider를 만든다.)
ListingTable: Parquet, JSON, CSV 또는 AVRO 파일에서 데이터를 읽음. HIVE 스타일 파티셔닝, 선택적 압축, 원격 객체 저장소에서 직접 읽기 등을 통해 단일 파일 또는 여러 파일을 지원
MemTable: 인메모리 RecordBatch로부터 데이터를 읽음
StreamingTable: unbounded inputs으로 부터 데이터를 읽음
// 커스텀 데이터 소스 만들기 예시: json 다시 만들기
let table_path = ListingTableUrl::parse("test.json")?;
let mut ctx = SessionContext::new();
let json_format = JsonFormat::default();
let json_format = Arc::new(json_format);
let table_path = ListingTableUrl::parse("test.json")?;
let session_state = ctx.state();
let mut listing_options = ListingOptions::new(json_format);
// 스키마 추론
let schema: Arc<arrow::datatypes::Schema> = listing_options.infer_schema(&session_state, &table_path).await?; // session_state 와 table_path로 스키마 추
// 추론된 스키마로 listing table 생성
let config: ListingTableConfig = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(schema);
let listing_table = ListingTable::try_new(config)?; // Arc::new(listing_table)를 한게 TableProvider
ctx.register_table("test", Arc::new(listing_table))?; // -> Result<Option<Arc<dyn TableProvider>>>
ExecutionPlan은 Apache arrow의 메모리 포맷을 사용하여 데이터를 처리하며, arrow 크레이트의 함수를 많이 사용함. ExecutionPlan에서 RecordBatch를 생성할 때, 값들은 ColumnarValue로 나타내는데, 단일 상수 값인 경우 ScalarValue, arrow 배열인 경우 ArrayRef가 된다.
execute를 호출하면 풀 기반 실행 API를 구현한 SendableRecordBatchStream로 하나 이상의 데이터 파티션을 생성한다. .next().await를 호출하면 다음 RecordBatch를 점진적으로 실행하고 반환한다. RepartitionExec에 의해 구현된 Volcano (쿼리 처리 시스템) 스타일의 “Exchange” 작업으로 균형 잡힌 병렬 처리가 가능해진다.
최근 Morsel-Driven Parallelism과 같은 연구는 NUMBA 아키텍쳐의 풀 스타일 Volcano 실행 모델과 관련된 문제에 대해 설명하지만, 실제로 Datafusion은 DuckDB와 같은 Morsel driven 접근 방식을 사용하는 시스템과 유사한 확장성을 달성한다.
DataFusion은 target_partitions 스레드를 사용해 SendableRecordBatchStream로 부터 나온 결과를 점진적으로 계산한다. 병렬화는 여러 tokio task를 사용해 구현된다.
tokio의 장점은..
tokio is most commonly used for asynchronous network I/O, its combination of an efficient, work-stealing scheduler, first class compiler support for automatic continuation generation, and exceptional performance makes it a compelling choice for CPU intensive applications as well.
ConfigOptions은 Datafusion의 실행을 제어하는 옵션을 포함한다.
상태는 아래 구조체들에 의해 관리되는 실행 쿼리를 필요로한다.
SessionContext: 상태는 테이블 정의와 함수 등록 같은 LogicalPlans을 생성할 필요가 있다. -> 세션을 관리하고 SQL 쿼리 실행
TaskContext: 상태는 MemoryPool, DiskManager, and ObjectStoreRegistry와 같은 실행이 필요하다. -> 실행 계획 처리
ExecutionProps: 실행 별 속성 및 데이터(eg. starting timestamps, etc).
Datafusion이 plan을 실행하면서 사용하는 메모리의 양과 임시 로컬 디스크 공간은 MemoryPool과 DiskManager에 의해 제어될 수 있다. 다른 런타임 옵션은 RuntimeEnv에 있다.
Rust 기반
컬럼 지향 처리
Apache Arrow의 컬럼 기반 메모리 레이아웃을 사용하여 데이터 처리를 최적화함.
멀티스레드 기반 병렬 처리
SQL 표준의 대부분을 지원하며, 사용자가 친숙한 SQL 언어를 사용하여 데이터를 쿼리 가능.다양한 데이터 소스에 연결하여 SQL 쿼리를 실행 가능.
CSV, Parquet, JSON 등 다양한 형식의 파일에서 데이터를 로드하고 처리 가능.
데이터 소스를 플러그인 방식으로 확장할 수 있어, 사용자 정의 데이터 소스와의 통합 가능.
아파치 생태계에서의 통합 테스트
students.csv
id,name,age,grade,subject
1,John Doe,20,A,Math
2,Jane Smith,22,B,Science
3,Emily Davis,21,A-,English
4,Michael Brown,23,C+,History
5,Jessica Wilson,20,B+,Math
6,David Clark,22,A,Science
7,Sarah Lewis,21,B,English
8,Daniel Martinez,23,A-,History
9,Laura Allen,20,B+,Math
10,Robert White,22,C,Science
main.rs
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::arrow::array::*;
use datafusion::prelude::*;
use std::sync::Arc;
use ndarray::Array2;
use datafusion::arrow::util::pretty::print_batches;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// DataFusion context 생성. 이 context로 sql쿼리 실행
let mut ctx = SessionContext::new();
// students.csv파일을 학생 이라는 이름의 테이블로 등록.
ctx.register_csv("학생", "students.csv", CsvReadOptions::new()).await?;
// 데이터를 select 하기위해 sql쿼리 플랜 생성
let df = ctx.sql("SELECT * FROM 학생 WHERE age = 20").await?; // age가 20인 값만 select
println!("df: {:?}", df);
// 플랜 실행(collect()). RecordBatch 벡터로 결과를 모음
let results = df.collect().await?;
println!("results: {:?}", results);
// print_batches로 출력
print_batches(&results)?;
Ok(())
}
df: DataFrame { session_state: SessionState { session_id: "96bd000e-8064-4c00-b68d-e4aa04b0b134" }, plan: Projection: 학생.id, 학생.name, 학생.age, 학생.grade, 학생.subject
Filter: 학생.age = Int64(20)
TableScan: 학생 }
results: [RecordBatch { schema: Schema { fields: [Field { name: "id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "age", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "grade", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "subject", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64>
[
1,
5,
9,
], StringArray
[
"John Doe",
"Jessica Wilson",
"Laura Allen",
], PrimitiveArray<Int64>
[
20,
20,
20,
], StringArray
[
"A",
"B+",
"B+",
], StringArray
[
"Math",
"Math",
"Math",
]], row_count: 3 }]
+----+----------------+-----+-------+---------+
| id | name | age | grade | subject |
+----+----------------+-----+-------+---------+
| 1 | John Doe | 20 | A | Math |
| 5 | Jessica Wilson | 20 | B+ | Math |
| 9 | Laura Allen | 20 | B+ | Math |
+----+----------------+-----+-------+---------+
// TODO: 코드 수정 필요
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::prelude::*;
use std::sync::Arc;
use std::io::{self, Write};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig,ListingTableUrl};
// 데이터셋을 저장할 파일 경로
const OUTPUT_FILE: &str = "dataset.txt";
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// 파일 경로 입력
println!("Enter file path:");
io::stdout().flush().unwrap();
let mut file_path = String::new();
std::io::stdin().read_line(&mut file_path)?;
let table_path = file_path.trim().to_string();
// DataFusion context 생성
let mut ctx = SessionContext::new();
// 파일 확장자에 따른 테이블 등록
match table_path.split('.').last() {
Some("json") => {
ctx.register_json("test", &table_path, NdJsonReadOptions::default()).await?;
}
Some("parquet") => {
ctx.register_parquet("test", &table_path, ParquetReadOptions::default()).await?;
}
Some("csv") => {
ctx.register_csv("test", &table_path, CsvReadOptions::default()).await?;
println!("This is a csv file");
}
_ => {
println!("custom data file");
// 커스텀 데이터 소스 만들기 예시: json 다시 만들기
let json_format = JsonFormat::default();
let json_format = Arc::new(json_format);
let table_path = ListingTableUrl::parse("test.json")?;
let session_state = ctx.state();
let mut listing_options = ListingOptions::new(json_format);
// 스키마 추론
let session_state: datafusion::execution::context::SessionState = ctx.state();
let schema: Arc<arrow::datatypes::Schema> = listing_options.infer_schema(&session_state, &table_path).await?;
// 추론된 스키마로 listing table 생성
let config: ListingTableConfig = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.with_schema(schema);
let listing_table: ListingTable = ListingTable::try_new(config)?;
ctx.register_table("test", Arc::new(listing_table))?;
}
};
// SQL 쿼리 실행
let df: DataFrame = ctx.sql("SELECT * FROM test WHERE age = 20").await?;
println!("df: {:?}", df);
// 결과 수집
let results: Vec<RecordBatch> = df.collect().await?;
println!("results: {:?}", results);
print_batches(&results)?;
// 수집한 결과로 데이터셋 생성
match create_dataset(&results).await {
Ok(_) => println!("Dataset created successfully."),
Err(e) => eprintln!("Failed to create dataset: {}", e),
}
Ok(())
}
// 언어 모델 학습용 데이터셋을 생성하는 함수
async fn create_dataset(results: &Vec<RecordBatch>) -> std::io::Result<()> {
// 파일을 생성하고 쓰기 준비
let file = File::create(OUTPUT_FILE).await?;
let mut writer = BufWriter::new(file);
for batch in results {
for row in 0..batch.num_rows() {
let mut line = String::new();
for col in 0..batch.num_columns() {
let column: &Arc<dyn Array> = batch.column(col);
println!("column.data_type(): {:?}", column.data_type());
let value = match column.data_type() {
datafusion::arrow::datatypes::DataType::Utf8 => {
let array: &arrow::array::GenericByteArray<arrow::datatypes::GenericStringType<i32>> = column.as_any().downcast_ref::<arrow::array::StringArray>().unwrap();
array.value(row).to_string()
}
datafusion::arrow::datatypes::DataType::Int32 => {
let array = column.as_any().downcast_ref::<arrow::array::Int32Array>().unwrap();
array.value(row).to_string()
}
datafusion::arrow::datatypes::DataType::Int64 => {
let array = column.as_any().downcast_ref::<arrow::array::Int64Array>().unwrap();
array.value(row).to_string()
}
_ => unimplemented!("Unsupported data type"),
};
line.push_str(&value);
if col < batch.num_columns() - 1 {
line.push_str(", ");
}
}
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
}
writer.flush().await?;
Ok(())
}