Spark 완벽 가이드 ch10. 스파크 SQL

Q·2023년 1월 18일
0

Spark 완벽 가이드

목록 보기
11/24
  • 스파크 SQL을 사용해서 다음과 같은 작업을 수행할 수 있음

    • 데이터베이스에 생성된 뷰나 테이블에 SQL쿼리 실행
    • 시스템 함수를 사용
    • 사용자 정의 함수 정의
    • 워크로드를 최적화 하기 위해 쿼리 실행 계획 분석
  • 스파크 SQL은 DataFrame과 Dataset API에 통합되어 있음

SQL이란

  • SQL(Structured Query Language)데이터에 대한 관계형 연산을 표현하기 위한 도메인 특화 언어
  • 모든 관계형 DB에 사용되며 많은 NoSQL DB에서도 쉽게 사용할 수 있는 변형된 자체 SQL제공
  • 스파크는 ANSI SQL:2003의 일부를 구현했음
    • 이는 대부분의 SQL DB에서 채택하고 있는 표준임

빅데이터와 SQL: 아파치 하이브

  • 하이브 참고
    • 하둡에서 동작하는 데이터 웨어하우스(Data Warehouse) 인프라 구조로서 데이터 요약, 질의 및 분석 기능 제공
  • 스파크가 등장하기 전에는 Hive가 빅데이터 SQL 접근 계층에서 사실상 표준이었음
    • 하이브는 페북에서 개발
    • SQL처리가 필요한 빅데이터 업계에서 믿을 수 없을 정도로 인기 있는 도구였음
    • 하둡을 다양한 산업군으로 진출시키는 데 다방면으로 도움을 주었음

빅데이터와 SQL: 스파크 SQL

  • 스파크 2.0 버전에는 하이브를 지원할 수 있는 상위 호환 기능으로 ANSI-SQL과 HiveQL을 모두 지원하는 자체 개발 파서가 포함됨

  • 스파크 SQL은 DataFrame과의 뛰어난 호환성 덕분에 다양한 기업에서 강력한 기능으로 자리매김할 것임

    • 2016년 말 페북은 스파크 워크로드를 가동하기 시작했고 효과를 봤다고 발표함
      1. 4.5~6배의 CPU 성능 개선
      2. 3~4배의 자원 예약 개선
      3. 최대 5배의 지연 시간 감소
  • 스파크 SQL은 OLTP가 아닌 OLAP DB로 동작함

    • OLTP vs OLAP 참고
    • OLTP(OnLine Transaction Processing)
      • 실제 데이터를 수정하는 작업 중심(트랜잭션 중심 / row 단위에 초점)
    • OLAP(OnLine Analytic Processing)
      • 데이터를 사용자의 요구와 목적에 맞게 조회하는 작업 중심(정보 중심 / column 단위에 초점)

스파크와 하이브의 관계

  • 스파크 SQL은 하이브 메타스토어를 사용하므로 하이브와 잘 연동할 수 있음
    • 하이브 메타스토어: 하이브에서 생성한 테이블 스키마를 저장하는 공간
    • 메타스토어 참고
  • 스파크 SQL은 하이브 메타스토어에 접속하여 조회할 파일 수를 최소화하기 위해 메타데이터를 참조
    • 이 기능은 기존 하둡 환경의 모든 워크로드를 스파크로 이관하려는 사용자들에게 인기

스파크 SQL 쿼리 실행 방법

스파크 SQL CLI

  • 스파크 SQL CLI(Command Line Interface)

  • 로컬 환경의 명령행에서 기본 스파크 SQL 쿼리를 실행할 수 있는 편리한 도구

  • 쓰리프트 JDBC서버와 통신 불가능

    • 쓰리프트(thrift)?
      • 서로 다른 언어간에 데이터 직렬화를 제공을 위한 인터페이스 정의 언어이자 이진 통신 프로토콜
      • 스파크의 쓰리프트 서버는 여러 사용자의 JDBC 및 ODBC 접속을 받아 사용자의 쿼리를 스파크 SQL 세션으로 실행
    • 참조
  • 사용하려면 스파크 디렉터리에서 다음 명령을 실행

    • ./bin/spark-sql
    • 스파크가 설치된 경로의 conf 디렉터리에서 hive-site.xml, core-site.xml, hdfs-site.xml 파일을 배치해서 하이브를 사용할 수 있는 환경 구성

스파크의 프로그래밍 SQL 인터페이스

  • 스파크에서 지원하는 언어 API로 비정형 SQL 실행 가능
  • SparkSession 객체의 sql 메서드 사용 -> DataFrame 반환
    • 다른 트랜스포메이션과 마찬가지로 즉시 실행 X 지연 처리
  • SQL과 DataFrame은 완벽하게 연동 가능
    • DataFrame 생성 -> SQL 처리 -> DataFrame 반환
spark.sql("SELECT 1+1").show()
+-------+ 
(1 + 1)| 
+-------+ 
2| 
+-------+
path= "/FileStore/tables/2010_summary.json"
spark.read.json(path).createOrReplaceTempView('some_sql_view')#DataFrame을 SQL에서 사용할 수 있도록 뷰 등록
spark.sql("select dest_country_name,sum(count) from some_sql_view group by dest_country_name")\
.where("dest_country_name like 'S%'").where("'sum(count)' > 10") #SQL 결과를 DataFrame으로 반환

스파크 SQL 쓰리프트 JDBC/ODBC 서버

  • 위 내용 참고(스파크 SQL CLI)
  • 스파크는 자바 디비 연결(Java Database Connectivity, JDBC) 인터페이스 제공
  • 사용자나 원격 프로그램은 스파크 SQL을 실행하기 위해 이 인터페이스로 스파크 드라이버에 접속
    • 가장 대표적인 활용 사례: 태블로 같은 비즈니스 인텔리전스 소프트웨어를 이용해 스파크에 접속하는 형태
  • 쓰리프트 JDBC / ODBC(Open Database Connectivity) 서버는 하이브 1.2.1 버전의 Hiveserver2에 맞추어 구현됨

카탈로그

  • 스파크 SQL에서 가장 높은 추상화 단계는 카탈로그(catalog)임
  • 테이블에 저장된 데이터에 대한 메타데이터,DB,테이블,함수,뷰에 대한 정보를 추상화함
  • 카탈로그는 테이블, DB 그리고 함수를 조회하는 등 여러 유용한 함수를 제공
    • spark.sql함수를 사용해 관련 코드 실행

테이블

  • 스파크 SQL을 사용해 유용한 작업을 수행하려면 먼저 테이블을 정의해야함
  • 테이블은 명령을 실행할 데이터의 구조라는 점에서 dataframe과 논리적으로 동일
    • 조인, 필터링, 집계 등 여러 데이터 변환 작업 수행 가능
  • 스파크에서 테이블을 생성하면 default 데이터베이스에 등록됨
  • 테이블을 제거하면 모든 데이터가 제거되므로 조심

스파크 관리형 테이블

  • 테이블은 두 가지 중요한 정보를 저장함
    • 테이블의 데이터
    • 테이블에 대한 데이터(메타데이터)
  • 관리형 테이블: 스파크가 모든 정보를 추적할 수 있는 테이블
  • dataframe의 saveAsTable메서드로 관리형 테이블 생성 가능
spark.sql("create table flights (dest_country_name string, origin_country_name string, count long) \
          using json options (path '/FileStore/tables/2010_summary.json')") ```
```python
Out[6]: DataFrame[]

외부 테이블 생성하기

  • 스파크 SQL은 완벽하게 하이브 SQL과 호환됨
  • 기존 하이브 쿼리문을 스파크 SQL로 변환해야 하는 상황이 생길 수 있음
    • 이때 외부 테이블을 생성
  • 외부 테이블: 디스크에 저장된 파일을 이용해 정의한 테이블
    • 스파크는 외부 테이블의 메타데이터를 관리함
    • 하지만 데이터 파일은 스파크에서 관리하지 않음
    • create external table구문으로 외부 테이블 생성 가능
#/FileStore/tables/flight-data-hive에 있는 데이터 파일의 내용을 기반으로 외부 테이블 생성
spark.sql('create external table hive_flights\
(dest_country_name string, origin_country_name string, count long)\
row format delimited fields terminated by "," location "/FileStore/tables/flight-data-hive/"') ```         
```python 
Out[9]: DataFrame[]

테이블에 데이터 삽입하기

  • 데이터 삽입은 표준 SQL 문법을 따름
  • 특정 파티션에만 저장하고 싶다면 파티션 명세 추가 가능

테이블 메타 데이터 확인

spark.sql('describe table flights')  
Out[17]: DataFrame[col_name: string, data_type: string, comment: string]

테이블 메타데이터 갱신하기

  • 테이블 메타데이터를 유지하는 것은 가장 최신의 데이터셋을 읽고 있다는 것을 보장할 수 있는 중요한 작업
  • 테이블 메타데이터를 갱신할 수 있는 두 가지 명령이 있음
    • refresh table 구문: 테이블과 관련된 모든 캐싱된 항목을 갱신
    • repair table 구문: 새로운 파티션 정보를 수집하는 데 초점

테이블 제거

  • drop 키워드로 테이블 삭제
    • 테이블 제거 시 테이블의 데이터가 모두 제거됨(외부 테이블은 예외)
  • 존재하지 않는 테이블을 제거하려면 오류가 발생
    • 그래서 drop table if exists 구문 사용
spark.sql("drop table if exists flights_csv")  
Out[19]: DataFrame[]

테이블 캐싱하기

  • dataframe처럼 테이블을 캐시하거나 캐시에서 제거 가능
    • 캐시: cache table 구문
    • 캐시 제거: uncache table 구문

  • 뷰는 기존 테이블에 여러 트랜스포메이션 작업을 지정함
  • 뷰를 사용하면 쿼리 로직을 체계화하거나 재사용하기 편하게 만들 수 있음
  • 뷰는 디비에 설정하는 전역 뷰나 세션별 뷰가 있음
    • 전역 뷰: 디비에 상관없이 사용 가능 / 전체 스파크 애플리케이션에서 볼 수 있음 / 세션이 종료되면 뷰도 사라짐
    • 세션별 뷰: 현재 세션에만 사용 가능한 임시 뷰

뷰 생성하기

  • 최종 사용자에게 뷰는 테이블처럼 보임
  • 신규 경로에 모든 데이터를 다시 저장하지 않고 단순히 쿼리 시점에 데이터소스에 트랜스포메이션을 수행
spark.sql("create view just_usa_view as select * from flights where dest_country_name = 'united states'")
Out[60]: DataFrame[]
#등록되지 않고 현재 세션에만 사용할 수 있는 임시 뷰 생성
spark.sql("create temp view just_usa_view as select * from flights where dest_country_name = 'united states'")
Out[61]: DataFrame[]
#전역적 임시 뷰 생성
spark.sql("create global temp view just_usa_view as select * from flights where dest_country_name = 'united states'")
Out[62]: DataFrame[]
spark.sql('show tables').show()
+--------+-------------+-----------+
database| tableName|isTemporary| 
+--------+-------------+-----------+
default| flights| false| 
default| hive_flights| false| 
default|just_usa_view| false| 
default| nested_data| false| 
		|just_usa_view| true| 
+--------+-------------+-----------+

뷰 제거하기

  • 테이블 제거와 핵심적인 차이는 뷰는 어떤 데이터도 제거되지 않고 뷰 정의만 제거된다는 점
spark.sql("drop view if exists just_usa_view")
Out[67]: DataFrame[]

데이터베이스

spark.sql('show databases').show()
+------------+ 
databaseName| 
+------------+ 
default| 
+------------+

데이터베이스 생성하기

spark.sql('create database some_db')
Out[33]: DataFrame[]
spark.sql('show databases').show()
+------------+ 
databaseName| 
+------------+ 
default| 
some_db| 
+------------+

데이터베이스 설정하기

  • use 키워드 다음에 DB명을 붙여서 쿼리 수행에 필요한 DB 설정
#현재 어떤 디비를 사용 중인지 확인
spark.sql('select current_database()').show()

+------------------+ 
current_database()| 
+------------------+ 
default| 
+------------------+
spark.sql('use some_db')
Out[36]: DataFrame[]
#현재 어떤 디비를 사용 중인지 확인
spark.sql('select current_database()').show()
+------------------+ 
current_database()| 
+------------------+ 
some_db| 
+------------------+
#기본 디비로 돌아갈 수 있음
spark.sql('use default')
Out[39]: DataFrame[]
#현재 어떤 디비를 사용 중인지 확인
spark.sql('select current_database()').show()
+------------------+ 
current_database()| 
+------------------+ 
default| 
+------------------+

데이터베이스 제거하기

spark.sql('drop database if exists some_db')
Out[41]: DataFrame[]
spark.sql('show databases').show()
+------------+ 
databaseName| 
+------------+ 
default| 
+------------+

고급 주제

  • SQL 쿼리는 특정 명령 집합을 실행하도록 요청하는 SQL 구문
  • SQL구문은 조작, 정의, 제어와 관련된 명령을 정의할 수 있음

복합 데이터 타입

  • 표준 SQL에는 존재하지 않는 매우 강력한 기능
  • 스파크 SQL에는 구조체, 리스트, 맵 세 가지 핵심 복합 데이터 타입이 존재

구조체

  • 구조체는 맵에 더 가까우며 스파크에서 중첩 데이터를 생성하거나 쿼리하는 방법 제공
  • 구조체를 만들기 위해선 여러 컬럼이나 표현식을 괄호로 묶기만 하면 됨
spark.sql("create view if not exists nested_data as select (dest_country_name, origin_country_name) as country, count from flights")
Out[29]: DataFrame[]

리스트

  • 값의 리스트를 만드는 collect_list함수나
  • 중복 값 없는 배열을 만드는 collect_set 함수가 있음
  • 위 두 함수 모두 집계 함수이므로 집계 연산 시에만 사용 가능

함수

# 스파크 SQL이 제공하는 전체 함수 목록
spark.sql('show functions').show(10)
```python
+--------+ 
function| 
+--------+ 
!| 
%| 
&| 
*| 
+| 
-| 
/| 
<| 
<=| 
<=>| 
+--------+ 
only showing top 10 rows
# 스파크에 내장된 시스템 함수 목록
spark.sql('show system functions').show(10)
+--------+ 
function| 
+--------+ 
!| 
%| 
&| 
*| 
+| 
-| 
/| 
<| 
<=| 
<=>| 
+--------+ 
only showing top 10 rows
# 사용자 정의 함수 목록
spark.sql('show user functions').show(10)
+--------+ 
function| 
+--------+ 
+--------+
#와일드 카드 문자(*)가 포함된 문자열을 사용하여 결과 필터링 가능
#'s'로 시작하는 모든 함수 필터링
spark.sql('show functions "s*"').show(10)
+------------------+ 
function| 
+------------------+ 
schema_of_json| 
second| 
sentences| 
sequence| 
sha| 
sha1| 
sha2| 
shiftleft| 
shiftright| 
shiftrightunsigned| 
+------------------+ 
only showing top 10 rows
profile
Data Engineer

0개의 댓글