[SPARK] Spark SQL과 Spark Catalog를 통한 테이블 관리 및 데이터 프로세싱

NewNewDaddy·2024년 2월 19일
0

SPARK

목록 보기
12/16
post-thumbnail

0. INTRO

  • 2년이 조금 넘는 짧은 기간동안 Spark를 사용하여 데이터 ETL 작업을 해왔고 대부분의 경우에는 Dataframe API를 사용하여 Spark SQL을 사용할 일이 사실은 크게 없었다.
  • 최근에 Spark를 다시 Deep하게 공부하면서 Spark SQL이 Catalog와 함께 기능하게 되면 자체 Database 기능을 갖춘 것 같이 데이터의 구조화된 처리와 관리를 효율적으로 수행할 수 있으며, 데이터 분석 작업의 생산성을 높일 수 있다는 것을 알게 되었다.
  • 이번 글에서는 Spark SQL과 Spark Catalog를 사용하여 테이블을 관리하고 데이터를 처리하는 방법에 살펴볼까한다.

1. Spark SQL과 Spark Catalog

  • Spark SQL은 구조화된 데이터를 처리하고 분석하기 위한 SQL 인터페이스 및 엔진을 제공하는 반면, Spark Catalog는 Spark SQL 및 다른 Spark 기능에서 사용되는 데이터 및 메타데이터의 중앙 저장소로서 동작합니다. Spark SQL은 데이터 처리 및 분석을 위한 인터페이스를 제공하는 반면, Spark Catalog는 데이터 및 메타데이터를 관리하고 제공합니다. 각각의 특성들은 아래와 같습니다.

1) Spark SQL

  • Spark SQL은 Apache Spark의 구조화된 데이터 처리 기능을 제공합니다. 이를 사용하여 SQL 쿼리를 사용하여 데이터를 처리하고 분석할 수 있습니다.
  • Spark SQL은 DataFrame 및 Dataset API를 통해 데이터를 구조화하고 처리하는 데 사용됩니다. 이를 통해 SQL 쿼리를 사용하여 데이터를 조작하고 분석하는 데 편리한 인터페이스를 제공합니다.
  • Spark SQL은 SQL 쿼리를 실행하고 데이터를 처리하기 위한 엔진을 제공하며, Catalyst 옵티마이저를 사용하여 쿼리 실행 계획을 최적화합니다.

2) Spark Catalog

  • Spark Catalog는 Spark SQL에서 사용되는 데이터 및 메타데이터의 중앙 저장소로 데이터베이스, 테이블, 뷰 등의 메타데이터를 저장하고 관리합니다. 이를 사용하여 Spark 애플리케이션에서 데이터의 구조 및 속성에 대한 정보를 쿼리하고 수정할 수 있습니다.
  • Spark Catalog는 외부 데이터 소스와의 통합 및 쿼리 최적화를 위해 사용됩니다. Catalog를 사용하여 데이터 소스의 메타데이터를 관리하고 쿼리 실행 계획을 최적화합니다.
특징Spark SQLSpark Catalog
역할구조화된 데이터 처리 및 분석을 위한 SQL 인터페이스 및 엔진데이터 및 메타데이터의 중앙 저장소
사용SQL 쿼리 실행 및 데이터 처리, 분석데이터 및 메타데이터 관리 및 제공
API 및 도구DataFrame 및 Dataset API, SQL 쿼리 처리 엔진Catalog APIs, 메타스토어, 메타데이터 관리 도구
주요 기능SQL 쿼리 실행, 데이터 처리 및 분석, Catalyst 옵티마이저 사용데이터베이스, 테이블, 뷰 등의 메타데이터 관리, 외부 데이터 소스 통합
역할SQL 쿼리 실행 및 데이터 처리를 담당데이터 구조와 속성에 대한 정보를 저장 및 관리

2. Spark Catalog - In Memory

  • Spark Catalog는 종류에 따라 In Memory와 Hive로 나뉘게 된다. 지금 다뤄볼 In Memory 타입의 경우 Database나 Table 생성시 메모리에 저장되며 spark.stop()과 같이 spark 세션이 끊기고 다시 시작되는 경우에는 저장되어있던 DB나 테이블들이 다 삭제된다.

0) Session 생성 및 Catalog 타입 조회

  • 일반적으로 Spark Session을 생성한 후 spark.sql.catalogImplementation 인자를 불러오면 기본적으로 in-memory 타입으로 나오게된다.
from pyspark.sql import SparkSession

## spark session 생성
spark = SparkSession.builder \
        .master("local") \
        .appName("memory") \
        .getOrCreate()
        
## 현재 catalog 저장소 타입 조회
> spark.conf.get("spark.sql.catalogImplementation")

  'in-memory'

1) Database 생성 및 조회

  • MYSQL 같은 상용 DB에서 데이터베이스를 생성하는 명령어와 동일하게 입력해주면 정해진 이름의 DB가 추가되어 보여지게 된다.
## 데이터베이스 생성
> spark.sql("CREATE DATABASE temp")

## 데이터베이스 목록 조회 
1. > spark.sql("SHOW DATABASES").show()

  +---------+
  |namespace|
  +---------+
  |  default|
  |     temp|
  +---------+
  
2. > spark.catalog.listDatabases()

  [Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]

2) Database 선택

## 현재 데이터베이스 확인
> spark.catalog.currentDatabase()

## 사용할 데이터베이스 선택
1. > spark.sql("USE temp")

2. > spark.catalog.setCurrentDatabase("temp")

3) Table 생성 및 조회

## 데이터 읽기
> df = spark.read.parquet("파일 경로")

## spark SQL 테이블 생성
> df.createOrReplaceTempView("house")

## 테이블 조회
1. > spark.sql("SHOW TABLES FROM temp").show()

  +---------+---------+-----------+
  |namespace|tableName|isTemporary|
  +---------+---------+-----------+
  |     temp|    house|       true|
  +---------+---------+-----------+
  
2. > spark.catalog.listTables()

  [Table(name='house', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
  
## 테이블 컬럼 조회
> spark.catalog.listColumns("house")

  [Column(name='idx', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
   Column(name='sex', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
   Column(name='status', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
   Column(name='grade', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

## spark catalog에 있는 테이블 읽기
> tbl = spark.read.table("테이블 명")

4) Table 저장

  • saveAsTable 명령어의 경우 spark-warehouse 경로 아래에 테이블의 데이터가 parquet 파일 형식으로 저장된다.
  • In-Memory 형식으로 사용하고 있을 경우에는 session이 다시 시작되었을 때 물리적인 파일이 있더라도 table list에는 남아있지 않으므로 임시테이블과 동일하게 작동한다.
> df.write.saveAsTable("테이블 명")

3. Spark Catalog - Hive

  • 위에서 알아본 In-Memory 방식의 저장소의 경우 데이터베이스와 테이블을 새로 생성하여 작업하고 있다가도 spark session이 끊어졌다가 다시 실행되면 spark catalog에서 모두 지워지고 목록들이 초기화된다.
  • 따라서 작업중인 DB와 테이블 정보들을 영구적인 metastore에 등록시켜놓고 사용을 해야 세션이 끊어지더라도 내용들이 휘발되지 않는데 이 때 사용할 수 있는 조건이 enableHiveSupport() 이다.

0) Session 생성 및 Catalog 타입 조회

  • SparkSession의 옵션에 enableHiveSupport()를 추가해주면 저장소 타입이 hive로 나오는 것을 볼 수 있다.
from pyspark.sql import SparkSession

## spark session 생성
spark = SparkSession.builder \
        .master("local") \
        .appName("memory") \
        .enableHiveSupport() \
        .getOrCreate()
        
## 현재 catalog 저장소 타입 조회
> spark.conf.get("spark.sql.catalogImplementation")

  'hive'
  • 또한 SparkSession에 옵션을 주어 spark catalog 기본 경로를 바꿀 수도 있다.
spark = SparkSession.builder \
        .master("local") \
        .appName("memory") \
        .enableHiveSupport() \
        .getOrCreate() \
        .config("spark.sql.warehouse.dir", "[사용자 지정 spark warehouse 경로]")

1) Database 생성 및

## 데이터베이스 생성
> spark.sql("CREATE DATABASE temp")

## 데이터베이스 목록 조회 
1. > spark.sql("SHOW DATABASES").show()

  +---------+
  |namespace|
  +---------+
  |  default|
  |     temp|
  +---------+
  
2. > spark.catalog.listDatabases()

  [Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]

2) Table 생성 및 조회

  • 위에서 다뤄본 것과 동일하게 createOrReplaceTempView 메소드를 통해 sql 테이블을 생성하고 테이블 목록들을 조회해보면 위의 in-memory 일 때와 동일하게 isTemporary 옵션이 True로 그대로인 것을 확인 할 수 있다.
  • Hive로 Catalog 타입을 변경해주었더라도 createOrReplaceTempView 메소드로 생성된 테이블은 여전히 임시테이블로 생성이 되는 것을 볼 수 있다.
  • 영구 저장 테이블로 생성하기 위해서는 saveAsTable 메소드를 사용해야 한다.
## 데이터 읽기
> df = spark.read.parquet("파일 경로")

## spark SQL 테이블 생성
> df.createOrReplaceTempView("house")

## 테이블 조회
1. > spark.sql("SHOW TABLES FROM temp").show()

  +---------+---------+-----------+
  |namespace|tableName|isTemporary|
  +---------+---------+-----------+
  |     temp|    house|       true|
  +---------+---------+-----------+
  
2. > spark.catalog.listTables()

  [Table(name='house', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

3) Table 저장

  • 이제는 saveAsTable로 DataFrame을 저장하게되면 세션이 재시작되어도 지속적으로 남아있는 테이블로 저장된다.
> df.write.saveAsTable("테이블 명")

4. 참고문서

profile
데이터 엔지니어의 작업공간 / #PYTHON #SPARK #AWS #NCLOUD

0개의 댓글