Delta Lake Operation 02. Convert from Parquet to Delta Lake

Q·2024년 8월 21일

Delta Lake

목록 보기
4/8

Convert from Parquet to Delta Lake

  • Parquet to Delta Lake API

    df를 생성하고 parquet으로 저장

    columns = ["language", "num_speakers"]
    data = [("English", "1.5"), ("Mandarin", "1.1"), ("Hindi", "0.6")]
    rdd = spark.sparkContext.parallelize(data)
    df = rdd.toDF(columns)
    df.write.format("parquet").save("tmp/lake1")
    tmp/lake1
    ├── _SUCCESS
    ├── part-00000-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    ├── part-00003-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    ├── part-00006-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    └── part-00009-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet

    그 후 Delta lake로 변환

    deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake1`")
    tmp/lake1
    ├── _SUCCESS
    ├── _delta_log
    │   ├── 00000000000000000000.checkpoint.parquet
    │   ├── 00000000000000000000.json
    │   └── _last_checkpoint
    ├── part-00000-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    ├── part-00003-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    ├── part-00006-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
    └── part-00009-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet

    _delta_log가 Parquet 파일을 스캔하여 Delta Lake 데이터 쿼리에 필요한 메타데이터가 포함된 디렉토리를 빌드한다.

  • Partitioned Parquet table to Delta Lake API

    Partitioning 된 Parquet 테이블 생성

    df.write.partitionBy("language").format("parquet").save("tmp/lake2")
    tmp/lake2
    ├── _SUCCESS
    ├── language=English
    │   └── part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
    ├── language=Hindi
    │   └── part-00009-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
    └── language=Mandarin
        └── part-00006-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet

    이 후 Delta Lake로 Convert

    deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake2`")

    하지만 이때 다음 에러 발생

    AnalysisException: Expecting 0 partition column(s): [], but found 1 partition column(s): [`language`] from parsing the file name: file:/.../delta-examples/notebooks/pyspark/tmp/lake2/language=English/part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet

    에러가 발생한 이유는 Partition Column의 데이터 타입을 지정해주기 않아서 발생한 것으로
    Parquet 테이블에서 Partition Column의 데이터 타입은 디렉토리 이름에 따라 결정되며, 이는 모호할 수 있다.
    예를 들어, 디렉토리 이름 date=2022-09-21을 읽을 때 Delta Lake는 TIMESTAMP라고 지정해줘야 한다.

    deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake2`", "language STRING")
    tmp/lake2
    ├── _SUCCESS
    ├── _delta_log
    │   ├── 00000000000000000000.checkpoint.parquet
    │   ├── 00000000000000000000.json
    │   └── _last_checkpoint
    ├── language=English
    │   └── part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
    ├── language=Hindi
    │   └── part-00009-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
    └── language=Mandarin
        └── part-00006-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
        
        

참고 자료

profile
Data Engineer

0개의 댓글