이 장에서는 스파크에 정형화 된 구조(structure)를 추가하게 된 주된 동기, 이 동기가 어떠헤 상위 수준 API 개발로 이어졌는지, 스파크 2에서 컴포넌트들 간에 통일성을 확립했는지, 스파크 SQL 엔진 등을 둘러본다.
스파크 SQL : SQL과 유사한 문법을 가진 고수준의 표현력 높은 연산 기능들을 보여주면서 이후의 릴리스들에서 더욱 탄탄한 기반의 구조를 다지게 된 데이터 프레임
스파크 연산에서 뛰어난 성능을 보여주는 길을 닦았다.
RDD 프로그래밍 API 모델을 보면서 정형적 모델 이전의 스파크의 모습을 보자
RDD에는 3가지의 핵심 특성이 있다.
위 세 가지 모두 모든 고수준 기능들이 기반으로 하는 기본 RDD 프로그래밍 API 모델에는 필수적이다.
위에 모델들은 강력하지만 몇 가지 문제점이 있다.
이 때문에 스파크는 데이터 압축은 커녕 정체를 알 수 없는 객체를 바이트 뭉치로 직렬화 해 사용해왔다.
위에 문제점들에 해법을 보자
스파크 2.x 는 스파크 구조 확립을 위한 핵심 개념들을 도입했다.
위에서 말한 구조들에 장점들을 알아보자.
구조를 갖추면 스파크 컴포넌트를 통틀어 더 나은 성능과 공간 효율성 등 많은 이득을 얻을 수 있다. (API를 다루면서 더 자세한 설명)
표현성과 구성 용이성
#파이썬 예제
# (name, age) 형태의 튜플로 된 RDD를 생성한다.
dataRDD = sc.parallelize([("brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)])
# 집계와 평균을 위한 람다 표현식과 함께 map과 reduceByKey 트랜스포메이션을 사용한다.
agesRDD = (dataRDD
.map(lambda x: (x[0], (x[1], 1)))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.map(lambda x: (x[0], x[1]x[0]/x[1]x[1])))
위에 코드는 어떻게 키를 집계하고, 평균 계산을 하는지 이해하기 어렵다.
람다 함수를 제외한 코드를 보자.
# 파이썬 예제
from pyspark.sql import SparkSession
from pyspark.sql.funcions import avg
from pyspark import SparkContext
sc = SparkContext()
#SparkSession으로부터 데이터 프레임을 만든다.
spark = (SparkSession
.builder
.appName("AuthorsAges")
.getOrCreate())
# 데이터 프레임 생성
data_df = sc.parallelize([("brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)])
# 동일한 이름으로 그룹화하여 나이별로 계산해 평균을 구한다.
avg_df = data_df.groupBy("name").agg(avg("age"))
# 최종 실행 결과를 보여준다.
avg_df.show()
위 코드는 상위 수준 DSL 연산자와 API를 써서 스파크에게 무엇을 할지 알려주므로 더 표현력이 높고 간단하다.
스파크는 이런 쿼리를 파악해서 사용자의 의돌를 이해할 수 있기 때문에 효과적인 실행을 위해 연산들을 최적하, 재배열할 수 있다.
위에 정형화된 형식 뿐 아니라 비정형적인 (그 위에 코드) 하위 수준의 RDD API를 쓸 수도 있다.
자주 쓰이는 연산들을 위한 API와 DSL이 데이터 분석에서 어떻게 쓰이는지 보자.
판다스 데이터 프레임에 영향을 받은 스파크 데이터 프레임은 이름 있는 칼럼과 스키마를 가진 분산 인메모리 테이블처럼 동작하고, 특정한 데이터 타입을 가진다.
사람이 보는 형태로는, 스파크 데이터 프레임은 하나의 표처럼 보인다. (p.48 표 3-1 참고)
이렇게 표 형태로 보면 이해하기도 쉽고, 행과 열 형태로 작업해야 할 때도 편리하다.
또한, 데이터 프레임은 불변성을 가지며 스파크는 그에 대한 모든 변경 내역(계보)를 보관한다. => 내용을 보존한 채로 칼럼의 이름이나 타입을 추가, 수정이 가능하다.
지원하는 프로그래밍 언어와 맞는 데이터 타입을 지원한다.
이 타입들은 스파크 애플리케이션에서 선언할 수도 있고, 스키마에서 정의할 수도 있다.
스파크에서는 더 복잡한 데이터 분석을 위한 복합 타입 또한 다룰 수 있다.
스파크에서 스키마 : 데이터 프레임을 위해 칼럼 이름과 연관된 데이터 타입을 정의한 것
위에 이유로 큰 데이터를 읽을 때는 스키마를 미리 지정하는 것을 권장한다.
스키마를 정의하는 방법은
여러 컬럼의 데이터 프레임을 정의하려면 데이터 프레임 API를 사용한다.
# 프로그래밍 스타일
from pyspark.sql.types import *
schema = StructType([StructField("author", StructType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False)])
# DDL 사용
schema = "author STRING, title STRING, pages INT"
데이터 프레임에서 이름이 정해진 칼럼들은 어떤 특정한 타입의 필드를 나타내는 개념이다.
사용자는 이름으로 칼럼들을 나열해 볼 수도 있다.
또, 관계형 표현이나 계산식 형태의 표현식으로 칼럼 단위 연산을 수행할 수 있다.
또 또, 논리식이나 수학 표현식을 칼럼 단위로 사용할 수도 있다.
해당 예시 코드는 스칼라로 p.56에 있다.
로우 : 스파크의 객체이고 순서가 있는 필드 집합 객체이다. 스파크의 지원 언어들에서 각 필드를 0부터 시작하는 인덱스로 접근한다.
# 로우 실습 파이썬 예제
from pyspark.sql import Row
blog_row = 6, Row("Reynold", "Xin", "Ittps://tinyurl.6", "3/2/2015", 25568,["twitter", "LinkedIn"])
### 인덱스로 개별 아이템에 접근한다.
blog_row[1]
## Row 객체로 데이터 프레임 만들기
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])
authors_df.show()
DataFrameReader : 구조화된 데이터를 데이터 프레임으로 로드해주는 함수 (JSON, CSV 등)
DataFrameWriter : 특정 포맷의 데이터 소스에 데이터 프레임의 데이터를 써서 내보내는 함수
트랜스포메이션과 액션 : 많이 쓰이고요
프로젝션 : SQL에서의 SELECT문과 같은 역할, df 뒤에 .select()로 사용
필터 : FILTER나 WHERE 조건문과 같은 역할, .select()뒤에 .where()나 .filter()로 사용
withColumnRenamed() : 컬럼의 이름을 변경할 수 있는 함수, df 뒤에 withColumnRenamed("바꿀 컬럼 이름", "바뀌어질 컬럼 이름")으로 사용
to_timestamp(), to_date() : 데이터 타입을 바꾸는 함수들의 예제, 상위 수준의 API인 spark.sql.functions 패키지에 속한다.
groupBy(), orderBy(), count(): 집계 연산도 많이 쓰이고요
min(), max(), sum(), avg()같은 통계 함수도 당연히 많이 쓰입니다.
개발자들의 편의를 위헤 스파크는 데이터 프레임과 데이터세트 API를 유사한 인터페이스를 갖도록 했다 API를 일원화했다.
잘 이해가 되지 않지만 자바와 스칼라에서는 타입의 안전을 보장받기 때문에 정적 타입 객체를 사용할 수 있고, 파이썬과 R은 그렇지 않기 때문에 동적 타입 객체를 사용한다고 합니다.
그래서 실행 시에 타입이 정해진다고 합니다.
Row는 스파크의 포괄적 객체 타입이라 Row 안에 있는 타입을 지원하는 언어에 맞춰(IntegerType이나 IntegerType() 등) 적절히 변환해서 사용하면 된다고 합니다.
데이터 소스에엇 데이터 프레임을 만들 때처럼, 데이터세트의 경우에도 해당 스키마를 알아야 합니다.
즉, 데이터 타입들을 모두 알고 있어야 합니다.
JSON이나 CSV 데이터라면 스키마 추론이 가능하지만, 대용량 데이터에서 이런 작업은 비효율적이다.
이를 쉽게 하는 것이 스칼라의 케이스 클래스와 자바의 자바빈 클래스이다.
p.74에 나와있는 예제는 스칼라로 케이스 클래스를 사용하는 법을 보여주는데, 쉽게 각 스키마에 데이터 타입을 지정해주는 것입니다.
데이터세트에서도 (데이터 프레임에서 그런 것처럼) 트랜스포메이션이나 액션들을 수행할 수 있다.
책에서는 예시로 scala를 사용해 filter(), 첫 번째 열만 확인하기 등을 수행한다.
요는, 데이터세트에서도 데이터 프레임과 유사한 함수들을 사용할 수 있다는 것이다.
책에서 IoT 데이터셋으로 데이터 프레임에 사용했던 질의들을 데이터셋에도 적용하는 여러 예제들을 노트북으로 올렸다고 하는데 뻥입니다 없어요
아무튼 스파크에서는 데이터세트에 사용하는 함수들이 데이터 프레임과 유사해서 굉장히 개발자 친화적이다~~
언어나 일부 상황에서 언제 어떻게 데이터 프레임이나 데이터세트를 사용할지 한 가지를 결정하게 된다.
p.78에 그림 3-2를 보면 간단한 도표가 있습니다.
데이터 프레임을 사용하는 경우
데이터세트를 사용하는 경우
골라서 사용해도 되는 경우
SQL 같은 질의를 수행하게 해주는 것 외에 스파크 SQL 엔진이 하는 일이 많다.
이 외에 카탈리스트 옵티마이저와 텅스텐 프로젝트가 있는데 텅스텐 프로젝트는 6장에서 다룬다.
카탈리스트 옵티마이저 : 연산 쿼리를 받아 실행 계획으로 변환한다.
사용하는 언어에 상관없이 결과물은 바이트 코드를 생성하게 된다.
4단계의 쿼리 최적화 과정이 있다.