0. INTRO
- 2년이 조금 넘는 짧은 기간동안 Spark를 사용하여 데이터 ETL 작업을 해왔고 대부분의 경우에는 Dataframe API를 사용하여 Spark SQL을 사용할 일이 사실은 크게 없었다.
- 최근에 Spark를 다시 Deep하게 공부하면서 Spark SQL이 Catalog와 함께 기능하게 되면 자체 Database 기능을 갖춘 것 같이 데이터의 구조화된 처리와 관리를 효율적으로 수행할 수 있으며, 데이터 분석 작업의 생산성을 높일 수 있다는 것을 알게 되었다.
- 이번 글에서는 Spark SQL과 Spark Catalog를 사용하여 테이블을 관리하고 데이터를 처리하는 방법에 살펴볼까한다.
1. Spark SQL과 Spark Catalog
Spark SQL은 구조화된 데이터를 처리하고 분석하기 위한 SQL 인터페이스 및 엔진
을 제공하는 반면, Spark Catalog는 Spark SQL 및 다른 Spark 기능에서 사용되는 데이터 및 메타데이터의 중앙 저장소
로서 동작합니다. Spark SQL은 데이터 처리 및 분석을 위한 인터페이스를 제공하는 반면, Spark Catalog는 데이터 및 메타데이터를 관리하고 제공합니다. 각각의 특성들은 아래와 같습니다.
1) Spark SQL
- Spark SQL은 Apache Spark의 구조화된 데이터 처리 기능을 제공합니다. 이를 사용하여
SQL 쿼리를 사용하여 데이터를 처리하고 분석
할 수 있습니다.
- Spark SQL은 DataFrame 및 Dataset API를 통해 데이터를 구조화하고 처리하는 데 사용됩니다. 이를 통해 SQL 쿼리를 사용하여 데이터를 조작하고 분석하는 데 편리한 인터페이스를 제공합니다.
- Spark SQL은 SQL 쿼리를 실행하고 데이터를 처리하기 위한 엔진을 제공하며, Catalyst 옵티마이저를 사용하여 쿼리 실행 계획을 최적화합니다.
2) Spark Catalog
- Spark Catalog는 Spark SQL에서 사용되는 데이터 및
메타데이터의 중앙 저장소
로 데이터베이스, 테이블, 뷰 등의 메타데이터를 저장하고 관리합니다. 이를 사용하여 Spark 애플리케이션에서 데이터의 구조 및 속성에 대한 정보를 쿼리하고 수정할 수 있습니다.
- Spark Catalog는
외부 데이터 소스와의 통합 및 쿼리 최적화
를 위해 사용됩니다. Catalog를 사용하여 데이터 소스의 메타데이터를 관리하고 쿼리 실행 계획을 최적화합니다.
특징 | Spark SQL | Spark Catalog |
---|
역할 | 구조화된 데이터 처리 및 분석을 위한 SQL 인터페이스 및 엔진 | 데이터 및 메타데이터의 중앙 저장소 |
사용 | SQL 쿼리 실행 및 데이터 처리, 분석 | 데이터 및 메타데이터 관리 및 제공 |
API 및 도구 | DataFrame 및 Dataset API, SQL 쿼리 처리 엔진 | Catalog APIs, 메타스토어, 메타데이터 관리 도구 |
주요 기능 | SQL 쿼리 실행, 데이터 처리 및 분석, Catalyst 옵티마이저 사용 | 데이터베이스, 테이블, 뷰 등의 메타데이터 관리, 외부 데이터 소스 통합 |
역할 | SQL 쿼리 실행 및 데이터 처리를 담당 | 데이터 구조와 속성에 대한 정보를 저장 및 관리 |
2. Spark Catalog - In Memory
- Spark Catalog는 종류에 따라 In Memory와 Hive로 나뉘게 된다. 지금 다뤄볼 In Memory 타입의 경우 Database나 Table 생성시 메모리에 저장되며
spark.stop()
과 같이 spark 세션이 끊기고 다시 시작되는 경우에는 저장되어있던 DB나 테이블들이 다 삭제된다.
0) Session 생성 및 Catalog 타입 조회
- 일반적으로 Spark Session을 생성한 후
spark.sql.catalogImplementation
인자를 불러오면 기본적으로 in-memory
타입으로 나오게된다.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("memory") \
.getOrCreate()
> spark.conf.get("spark.sql.catalogImplementation")
'in-memory'
1) Database 생성 및 조회
- MYSQL 같은 상용 DB에서 데이터베이스를 생성하는 명령어와 동일하게 입력해주면 정해진 이름의 DB가 추가되어 보여지게 된다.
> spark.sql("CREATE DATABASE temp")
1. > spark.sql("SHOW DATABASES").show()
+---------+
|namespace|
+---------+
| default|
| temp|
+---------+
2. > spark.catalog.listDatabases()
[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]
2) Database 선택
> spark.catalog.currentDatabase()
1. > spark.sql("USE temp")
2. > spark.catalog.setCurrentDatabase("temp")
3) Table 생성 및 조회
> df = spark.read.parquet("파일 경로")
> df.createOrReplaceTempView("house")
1. > spark.sql("SHOW TABLES FROM temp").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| temp| house| true|
+---------+---------+-----------+
2. > spark.catalog.listTables()
[Table(name='house', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
> spark.catalog.listColumns("house")
[Column(name='idx', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='sex', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='status', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='grade', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]
> tbl = spark.read.table("테이블 명")
4) Table 저장
saveAsTable
명령어의 경우 spark-warehouse
경로 아래에 테이블의 데이터가 parquet 파일 형식으로 저장된다.
In-Memory
형식으로 사용하고 있을 경우에는 session이 다시 시작되었을 때 물리적인 파일이 있더라도 table list에는 남아있지 않으므로 임시테이블과 동일하게 작동한다.
> df.write.saveAsTable("테이블 명")
3. Spark Catalog - Hive
- 위에서 알아본
In-Memory
방식의 저장소의 경우 데이터베이스와 테이블을 새로 생성하여 작업하고 있다가도 spark session이 끊어졌다가 다시 실행되면 spark catalog에서 모두 지워지고 목록들이 초기화된다.
- 따라서 작업중인 DB와 테이블 정보들을 영구적인 metastore에 등록시켜놓고 사용을 해야 세션이 끊어지더라도 내용들이 휘발되지 않는데 이 때 사용할 수 있는 조건이
enableHiveSupport()
이다.
0) Session 생성 및 Catalog 타입 조회
- SparkSession의 옵션에
enableHiveSupport()
를 추가해주면 저장소 타입이 hive로 나오는 것을 볼 수 있다.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("memory") \
.enableHiveSupport() \
.getOrCreate()
> spark.conf.get("spark.sql.catalogImplementation")
'hive'
- 또한 SparkSession에 옵션을 주어 spark catalog 기본 경로를 바꿀 수도 있다.
spark = SparkSession.builder \
.master("local") \
.appName("memory") \
.enableHiveSupport() \
.getOrCreate() \
.config("spark.sql.warehouse.dir", "[사용자 지정 spark warehouse 경로]")
1) Database 생성 및
> spark.sql("CREATE DATABASE temp")
1. > spark.sql("SHOW DATABASES").show()
+---------+
|namespace|
+---------+
| default|
| temp|
+---------+
2. > spark.catalog.listDatabases()
[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]
2) Table 생성 및 조회
- 위에서 다뤄본 것과 동일하게
createOrReplaceTempView
메소드를 통해 sql 테이블을 생성하고 테이블 목록들을 조회해보면 위의 in-memory
일 때와 동일하게 isTemporary
옵션이 True로 그대로인 것을 확인 할 수 있다.
Hive
로 Catalog 타입을 변경해주었더라도 createOrReplaceTempView
메소드로 생성된 테이블은 여전히 임시테이블로 생성이 되는 것을 볼 수 있다.
- 영구 저장 테이블로 생성하기 위해서는
saveAsTable
메소드를 사용해야 한다.
> df = spark.read.parquet("파일 경로")
> df.createOrReplaceTempView("house")
1. > spark.sql("SHOW TABLES FROM temp").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| temp| house| true|
+---------+---------+-----------+
2. > spark.catalog.listTables()
[Table(name='house', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
3) Table 저장
- 이제는
saveAsTable
로 DataFrame을 저장하게되면 세션이 재시작되어도 지속적으로 남아있는 테이블로 저장된다.
> df.write.saveAsTable("테이블 명")
4. 참고문서