Delta Lake Quickstart

유상기·2023년 2월 21일
0

Delta Lake

목록 보기
3/16
post-thumbnail

본 글은 Delta Lake 2.2.0 Quickstart 을 번역 및 정리하였습니다.

이 가이드는 Delta Lake의 주요 기능을 빠르게 탐색하도록 도와줍니다. 상호 작용적인, 배치, 스트리밍 쿼리에서 Delta 테이블에서 읽고 쓰는 방법을 보여주는 코드 스니펫을 제공합니다.

Set up Apache Spark with Delta Lake

Delta Lake와 Spark를 설정하려면 아래의 지침을 따르십시오. 다음 두 가지 방법으로 로컬 머신에서 다음의 단계를 실행할 수 있습니다.

  1. 인터랙티브 실행: Delta Lake를 사용하여 Spark 셸 (Scala 또는 Python)을 시작하고 셸에서 코드 스니펫을 대화식으로 실행합니다.
  2. 프로젝트로 실행: Delta Lake와 함께 Maven 또는 SBT 프로젝트 (Scala 또는 Java)를 설정하고 코드 스니펫을 소스 파일에 복사한 다음 프로젝트를 실행합니다. 또는 Github 저장소에서 제공하는 예제를 사용할 수도 있습니다.
💡 중요

다음 모든 지침에 대해, Delta Lake 2.2.0과 호환되는 올바른 버전의 Spark 또는 PySpark를 설치하는 것을 확인하십시오. 자세한 내용은 릴리스 호환성 행렬을 참조하십시오.

Prerequisite: set up Java

공식 Apache Spark 설치 지침에서 언급한대로, 유효한 Java 버전(8, 11 또는 17)을 설치하고, 시스템 PATH 또는 JAVA_HOME 환경 변수 중 하나를 사용하여 Java를 올바르게 구성했는지 확인하십시오.

Windows 사용자는 이 블로그의 지침을 따르되, Delta Lake 2.2.0과 호환되는 올바른 버전의 Apache Spark를 사용하는 것을 확인하십시오.

Set up interactive shell

Spark SQL, Scala 또는 Python 셸 내에서 Delta Lake을 대화식으로 사용하려면 로컬에 Apache Spark를 설치해야합니다. SQL, Python 또는 Scala를 사용하려는지에 따라 각각 SQL, PySpark 또는 Spark 셸을 설정할 수 있습니다.

Spark SQL Shell

Apache Spark 호환 버전을 다운로드하려면, pip를 사용하거나 아카이브를 다운로드하고 추출한 후 추출한 디렉터리에서 spark-sql을 실행하는 방법을 따르는 Downloading Spark의 지침을 따르십시오.

bin/spark-sql --packages io.delta:delta-core_2.12:2.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

PySpark Shell

  1. Delta Lake 버전과 호환되는 PySpark 버전을 설치하려면 다음을 실행하십시오.
pip install pyspark==<compatible-spark-version>
  1. Delta Lake 패키지와 추가 구성을 사용하여 PySpark를 실행하십시오.
pyspark --packages io.delta:delta-core_2.12:2.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Spark Scala Shell

Apache Spark 호환 버전을 다운로드하려면, pip를 사용하거나 아카이브를 다운로드하고 추출한 후 추출한 디렉터리에서 spark-shell을 실행하는 방법을 따르는 Downloading Spark의 지침을 따르십시오.

bin/spark-shell --packages io.delta:delta-core_2.12:2.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Set up project

Maven 중앙 저장소에서 Delta Lake 이진 파일을 사용하여 프로젝트를 빌드하려면 다음 Maven 좌표를 사용할 수 있습니다.

Maven

Maven 프로젝트에 Delta Lake을 포함하려면 POM 파일에서 종속성으로 추가하면 됩니다. Scala 2.12로 컴파일된 Delta Lake입니다.

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_2.12</artifactId>
  <version>2.2.0</version>
</dependency>

SBT

Delta Lake을 SBT 프로젝트에 포함하려면 build.sbt 파일에 다음 줄을 추가하면 됩니다.

libraryDependencies += "io.delta" %% "delta-core" % "2.2.0"

Python

Python 프로젝트(예: 단위 테스트용)를 설정하려면 pip install delta-spark==2.2.0를 사용하여 Delta Lake을 설치한 다음 Delta Lake의 configure_spark_with_delta_pip() 유틸리티 함수를 사용하여 SparkSession을 구성할 수 있습니다.

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Create a table

Delta 테이블을 생성하려면 DataFrame을 delta 형식으로 출력합니다. 기존의 Spark SQL 코드를 사용하고 parquet, csv, json 등의 형식에서 delta 형식으로 변경할 수 있습니다.

CREATE TABLE delta.`/tmp/delta-table` USING DELTA AS SELECT col1 as id FROM VALUES 0,1,2,3,4;
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

이러한 작업은 DataFrame에서 추론된 스키마를 사용하여 새 Delta 테이블을 만듭니다. 새 Delta 테이블을 만들 때 사용할 수 있는 전체 옵션 세트에 대해서는 '테이블 생성' 및 '테이블 쓰기'를 참조하십시오.

💡 이 퀵스타트는 Delta 테이블 위치에 대해 로컬 경로를 사용합니다. HDFS 또는 클라우드 스토리지를 Delta 테이블에 대한 **[스토리지로 구성](https://velog.io/@azuresky/Storage-configuration)**하려면 '스토리지 구성'을 참조하십시오.

Read data

Delta 테이블에서 데이터를 읽으려면 파일 경로를 지정합니다. 예를 들어, "/tmp/delta-table"과 같이 지정합니다.

SELECT * FROM delta.`/tmp/delta-table`;
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

Update table data

Delta Lake은 표준 DataFrame API를 사용하여 테이블을 수정하는 데 사용할 수 있는 여러 가지 작업을 지원합니다. 다음은 테이블의 데이터를 덮어쓰기하는 배치 작업을 실행하는 예시입니다:

Overwrite

INSERT OVERWRITE delta.`/tmp/delta-table` SELECT col1 as id FROM VALUES 5,6,7,8,9;
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

이 테이블을 다시 읽으면, 이전 데이터가 덮어씌워졌기 때문에 새롭게 추가된 값 5-9만 볼 수 있습니다.

Conditional update without overwrite

Delta Lake은 조건부 업데이트, 삭제 및 병합(UPSERT)을 위한 프로그래밍 API를 제공합니다. 다음은 몇 가지 예시입니다.

-- Update every even value by adding 100 to it
UPDATE delta.`/tmp/delta-table` SET id = id + 100 WHERE id % 2 == 0;

-- Delete very even value
DELETE FROM delta.`/tmp/delta-table` WHERE id % 2 == 0;

-- Upsert (merge) new data
CREATE TEMP VIEW newData AS SELECT col1 AS id FROM VALUES 1,3,5,7,9,11,13,15,17,19;

MERGE INTO delta.`/tmp/delta-table` AS oldData
USING newData
ON oldData.id = newData.id
WHEN MATCHED
  THEN UPDATE SET id = newData.id
WHEN NOT MATCHED
  THEN INSERT (id) VALUES (newData.id);

SELECT * FROM delta.`/tmp/delta-table`;
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

기존 행이 업데이트되고 새 행이 삽입된 것을 확인할 수 있습니다.

이러한 작업에 대한 자세한 내용은 '테이블 삭제, 업데이트 및 병합'을 참조하십시오.

Read older versions of data using time travel

타임 트래블을 사용하여 Delta 테이블의 이전 스냅샷을 쿼리할 수 있습니다. 덮어쓴 데이터에 액세스하려면, versionAsOf 옵션을 사용하여 처음 데이터 집합을 덮어쓰기 전에 테이블의 스냅샷을 쿼리할 수 있습니다.

SELECT * FROM delta.`/tmp/delta-table` VERSION AS OF 0;
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

덮어쓴 이전 데이터 집합을 볼 수 있습니다. 타임 트래블은 Delta Lake 트랜잭션 로그의 기능을 활용하여 더 이상 테이블에 없는 데이터에 액세스할 수 있습니다. version 0 옵션을 제거하거나 version 1을 지정하면 다시 새 데이터를 볼 수 있습니다. 자세한 내용은 '테이블의 이전 스냅샷 쿼리(타임 트래블)'을 참조하십시오.

Write a stream of data to a table

구조적 스트리밍을 사용하여 Delta 테이블에 쓸 수도 있습니다. Delta Lake 트랜잭션 로그는 테이블에 대해 동시에 다른 스트림 또는 배치 쿼리가 실행 중일 때에도 정확히 한 번 처리를 보장합니다. 기본적으로 스트림은 append 모드에서 실행되며 새 레코드를 테이블에 추가합니다.

streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

스트림이 실행되는 동안 이전 명령을 사용하여 테이블을 읽을 수 있습니다.

💡 쉘에서 이 작업을 실행하는 경우, 스트리밍 작업 진행 상황이 표시되어 해당 쉘에서 명령을 입력하기 어려울 수 있습니다. 테이블 쿼리를 위한 새로운 터미널에서 다른 쉘을 시작하는 것이 유용할 수 있습니다.

스트림을 중지하려면, 스트림을 시작한 동일한 터미널에서 stream.stop()을 실행하면 됩니다.

Delta Lake과 구조적 스트리밍의 통합에 대한 자세한 내용은 '테이블 스트리밍 읽기 및 쓰기'를 참조하십시오. 또한 Apache Spark 웹사이트의 구조적 스트리밍 프로그래밍 가이드도 참고하시기 바랍니다.

Read a stream of changes from a table

스트림이 Delta 테이블에 기록되는 동안 해당 테이블에서 읽을 수도 있습니다. 예를 들어, Delta 테이블에 적용된 모든 변경 사항을 출력하는 새로운 스트리밍 쿼리를 시작할 수 있습니다. 시작 버전이나 시작 타임스탬프 옵션을 제공하여 구조적 스트리밍이 해당 시점부터 변경 사항을 가져오도록 지정할 수 있습니다. 자세한 내용은 구조적 스트리밍을 참조하십시오.

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
profile
Data/AI Solution Architect

0개의 댓글