Parquet to Delta Lake API
df를 생성하고 parquet으로 저장
columns = ["language", "num_speakers"]
data = [("English", "1.5"), ("Mandarin", "1.1"), ("Hindi", "0.6")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.format("parquet").save("tmp/lake1")
tmp/lake1
├── _SUCCESS
├── part-00000-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
├── part-00003-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
├── part-00006-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
└── part-00009-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
그 후 Delta lake로 변환
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake1`")
tmp/lake1
├── _SUCCESS
├── _delta_log
│ ├── 00000000000000000000.checkpoint.parquet
│ ├── 00000000000000000000.json
│ └── _last_checkpoint
├── part-00000-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
├── part-00003-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
├── part-00006-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
└── part-00009-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
_delta_log가 Parquet 파일을 스캔하여 Delta Lake 데이터 쿼리에 필요한 메타데이터가 포함된 디렉토리를 빌드한다.
Partitioned Parquet table to Delta Lake API
Partitioning 된 Parquet 테이블 생성
df.write.partitionBy("language").format("parquet").save("tmp/lake2")
tmp/lake2
├── _SUCCESS
├── language=English
│ └── part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
├── language=Hindi
│ └── part-00009-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
└── language=Mandarin
└── part-00006-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
이 후 Delta Lake로 Convert
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake2`")
하지만 이때 다음 에러 발생
AnalysisException: Expecting 0 partition column(s): [], but found 1 partition column(s): [`language`] from parsing the file name: file:/.../delta-examples/notebooks/pyspark/tmp/lake2/language=English/part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
에러가 발생한 이유는 Partition Column의 데이터 타입을 지정해주기 않아서 발생한 것으로
Parquet 테이블에서 Partition Column의 데이터 타입은 디렉토리 이름에 따라 결정되며, 이는 모호할 수 있다.
예를 들어, 디렉토리 이름 date=2022-09-21을 읽을 때 Delta Lake는 TIMESTAMP라고 지정해줘야 한다.
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake2`", "language STRING")
tmp/lake2
├── _SUCCESS
├── _delta_log
│ ├── 00000000000000000000.checkpoint.parquet
│ ├── 00000000000000000000.json
│ └── _last_checkpoint
├── language=English
│ └── part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
├── language=Hindi
│ └── part-00009-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
└── language=Mandarin
└── part-00006-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet