데브코스에서 배운 Spark 내용 정리 2일차.
Spark의 시스템 아키텍쳐와 데이터 종류들에 대해 간단히 알아보고, Spark 데이터프레임과 Spark SQL을 활용하는 간단한 예제를 실습했다.
코스 수료 후, 강의 내용을 다시 살펴보는데 이미 Spark를 이용한 프로젝트를 해봤어서 그런지 처음 들었을 때보다 이해가 더 잘되는 느낌이다.
1. Spark 데이터 처리
Spark 데이터 시스템 아키텍쳐

- 외부 데이터 소스로부터 Airflow같은 프레임워크로 주기적인 ETL을 하거나, Spark에서 바로 처리하는 방법이 있다.
데이터 병렬처리가 가능하려면
- 데이터가 먼저 분산 되어야함
- 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록 (128MB)
- hdfs-site.xml에 있는 dfs.block.size 프로퍼티가 결정
- Spark 에서는 이를 파티션이라 부름. 파티션의 기본크기도 128MB
- spark.sql.files.maxPartitionBytes: HDFS 등에 있는 파일을 읽어올 때만 적용
- 다음으로 나눠진 데이터를 각각 따로 동시 처리
- 맵리듀스에서 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크가 실행
- Map 태스크는 웬만하면 데이터 블록이 있는 서버상에서 실행(데이터 지역성)
- Spark에서는 파티션 단위로 메모리로 로드되어 Executor가 배정됨
처리 데이터 나누기
- 처리하려는 데이터 파일을 파티셔닝하여 파티션으로 나눔
- 파티셔닝 방법은 데이터 소스에 따라 달라짐
- 적절한 파티션의 수 = Executor의수 * Executor당 CPU의 수
- 파티션을 병렬 처리함
Spark 데이터 처리 흐름
- 데이터 프레임은 작은 파티션들로 구성됨
- 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 변환
- sort, group by, filter, map, join…
- sort, group by의 경우 파티션간 데이터 이동이 필요
- 원하는 형태로 변환된 데이터는 최종적으로 파일 시스템에 저장
셔플링
- 파티션간에 데이터 이동이 필요한 경우 발생
- 명시적으로 파티션을 새롭게 하는 경우 (예: 파티션 수를 줄이기)
- 시스템에 의해 이뤄지는 셔플링
- 셔플링이 발생할 때 네트웍을 타고 데이터가 이동하게 됨
- spark.sql.shuffle.partitions에 따라 결과 파티션 수가 정해짐
- 오퍼레이션에 따라 파티션 수가 결정됨
- random, hashing partion(Aggregation 연산), range partition(sorting의 경우) 등
- 또한 이 때도 Data Skew 발생 가능
Data Skewness
- 데이터 파티셔닝은 데이터 처리에 병렬성을 주지만 단점도 존재
- 이는 데이터가 균등하게 분포하지 않는 경우
- 셔플링을 최소화하는 것이 중요하고 파티션 최적화를 하는 것이 중요
2. Spark 데이터 구조
Spark 데이터 구조
RDD, DataFrame, Dataset (Immutable Distributed Data)
- 2016년에 DataFramer과 Dataset은 하나의 API로 통합됨
- 모두 파티션으로 나뉘어 Spark에서 처리됨
- RDD가 더 로우레벨, 대부분의 경우 Dataframe(파이썬), Dataset(스칼라, 자바)을 쓰는 것이 일반적
- 구조화된 데이터는 주로 Spark SQL을 사용
- RDD (Resilient Distributed Dataset)
- 로우레벨 데이티로 클러스터내의 서버에 분산된 데이터를 지칭
- 레코드별로 존재하지만 스키마가 존재하지 않음
- 구조화된 데이터나 비구조화된 데이터 모두 지원
- Dataframe과 Dataset
- RDD와는 달리 필드 정보를 갖고 있음 (테이블)
- Dataset은 타입 정보가 존재하며 컴파일 언어에서 사용 가능
- PySpark에서는 DataFrame을 사용

- 기본적으로 RDD API위에 Spark SQL Engine이 올라가있고, 그 위에 DataFrame, Dataset, SparkSQL이 올라가 있는 구조
- 결국에는 상위레벨 코드도 RDD 오퍼레이션으로 변환되어 실행
Spark 데이터 구조 - RDD
- 변경이 불가능한 분산 저장된 데이터
- RDD는 다수의 파티션으로 구성
- 로우레벨의 함수형 변환 지원 (map, filter, flatMap 등)
- 일반 파이썬 데이터는 parallelize 함수로 RDD로 변환
- 변환된 데이터가 Spark의 RDD 상으로 올라가는 형태
- 반대는 collect로 파이썬 데이터로 변환 가능
Spark 데이터 구조 - 데이터 프레임
- 변경이 불가한 분산 저장된 데이터
- 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
- 판다스의 데이터 프레임 혹은 관계형 데이터베이스의 테이블과 거의 흡사
- 판다스의 연산들과 흡사한 연산들을 지원
- 다양한 데이터 소스 지원: HDFS, Hive, 외부 데이터베이스. RDD 등
- 스칼라, 자바, 파이썬과 같은 언어에서 지원
3. Spark 프로그램 구조
Spark Session 생성
- Spark 프로그램의 시작은 Spark Session을 만드는 것
- 프로그램마다 하나를 만들어 Spark Cluster와 통신: 싱글톤 객체
- Spark 2.0에서 처음 소개됨
- Spark Session을 통해 Spark이 제공해주는 다양한 기능을 사용
- DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신
- config 메소드를 이용해 다양한 환경설정 가능
- 단 RDD와 관련된 작업을 할 떄는 SparkSession 밑의 sparkContext 객체를 사용
- Spark SQL Engine위에서 동작함
Spark Session 환경변수
- executor별 메모리: spark.executor.memory (기본값: 1G)
- executor별 CPU 수: spark.executor.cores (YARN에서는 기본값 1)
- driver 메모리: spark.driver.memory (기본값: 1G)
- Shuffle 후 파티션의 수: spark.sql.shuffle.partitions (기본값: 최대 200)
- 사용하는 Resource Manager에 따라 환경변수가 많이 달라짐
- 가능한 모든 환경변수 옵션은 여기에서 찾을 수 있음
Spark Session 환경설정방법 4가지
- 환경변수
- $SPARK_HOMW/conf/spark_defaults.conf
- spark-submit 명령의 커맨드라인 파라미터
- SparkSession 만들 때 지정
- 우선 순위는 아래일수록 높음
Spark 프로그램의 전체적인 플로우
- Spark Session 만들기
- 입력 데이터 로딩
- 데이터 조작 작업 (판다스와 아주 흡사)
- DataFrame API나 Spark SQL을 사용
- 원하는 결과가 나올 떄까지 새로운 DataFrame을 생성
- 최종결과 저장
Spark Session이 지원하는 데이터 소스
- spark.read(DataFrameReader)를 사용하여 데이터프레임으로 로드
- spark.write(DataFrameWriter)를 사용하여 데이터프레임을 저장
- 많이 사용되는 데이터 소스들
- HDFS 파일
- CSV, JSON, Parquet, ORC, Text, Avro
- HIVE 테이블
- JDBC 관계형 데이터베이스
- 클라우드 기반 데이터 시스템
- Redshift, Snowflake, BigQuery 등
- 스트리밍 시스템
4. Spark 개발 환경
Spark 개발 환경 옵션
- Local Standalone Spark + Spark Shell
- Python IDE - PyCharm, Visual Studio
- Databricks Cloud
- 노트북 - 주피터 노트북, 구글 Colab, 아나콘다 등등
Local Standalone Spark
- Spark Cluster Manager로 local[n] 지정
- master를 local[n]으로 지정
- master는 클러스터 매니저를 지정하는데 사용
- 주로 개발이나 간단한 테스트 용도
- 하나의 JVM에서 모든 프로세스를 실행
- 하나의 Driver와 하나의 Executor가 실행됨
- 1+ 쓰레드가 Executor안에서 실행됨
- Executor안에 생성되는 쓰레드 수
- local: 하나의 쓰레드만 생성
- local[*]: 컴퓨터 CPU 수만큼 쓰레드를 생성
구글 Colab에서 Spark 사용 예시
- PySpark + Py4J를 설치
- 구글 Colab 가상 서버 위에 로컬모드 Spark를 실행
- 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가
- Spark Web UI는 기본적으로 접근 불가
- ngrok을 통해 억지로 열 수는 있음
- 이거보다는 AWS 우분투 환경에서 로컬로 설치한 후, 포트를 열어서 웹 UI로 접근
- Py4J: 파이썬에서 JVM내에 있는 자바 객체를 사용가능케 해줌
Spark 실습간 오류 해결
실습간 Spark를 로컬환경에 설치 후, spark-shell 명령어 수행시 다음 오류 발생
Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)
- 이는 spark 디렉토리/bin 내에 있는
load-spark-env.sh 파일에
export SPARK_LOCAL_IP=”127.0.0.1” 내용을 추가하여 해결
Spark 로컬환경에서 실행해보기
- 설치는 다음문서 참조
- spark-shell 실행해보기
- spark Web UI 방문
- spark-shell 실행시 나오는 웹 주소로 방문 하면 됨. 포트번호는 4040
- pyspark 실행해보기
-
pyspark는 파이썬 버전의 Spark 쉘
-
먼저 py4j를 설치
% pip install py4j
-
pyspark 명령어를 통해 쉘 실행
% pyspark
5. Spark 실습
헤더가 없는 파일을 읽어서 Spark로 처리해보자
실습 환경은 Google Colab이다.
Spark Session 만들기
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark DataFrame #2')\
.getOrCreate()
데이터프레임 스키마 정의하기
- 헤더가 없으므로 데이터 프레임의 스키마를 따로 정의해주어야 함
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType
schema = StructType([ \
StructField("cust_id", StringType(), True), \
StructField("item_id", StringType(), True), \
StructField("amount_spent", FloatType(), True)])
데이터프레임 생성
- 다운받은 파일을 읽어 Spark 데이터프레임을 만들자
df = spark.read.schema(schema).csv("customer-orders.csv")
Spark 데이터프레임에서 지원하는 GROUP BY 연산하기
cust_id 별로 GROUP BY를 하여 amount_spent 를 합해보자
df_ca = df.groupBy("cust_id").sum("amount_spent")
- 다음의 방법들로 Alias를 지정할 수도 있다.
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")
혹은
import pyspark.sql.functions as f
df_ca = df.groupBy("cust_id") \
.agg(f.sum('amount_spent').alias('sum'))
MAX와 AVG도 추가로 연산해보자
df.groupBy("cust_id") \
.agg(
f.sum('amount_spent').alias('sum'),
f.max('amount_spent').alias('max'),
f.avg('amount_spent').alias('avg')).collect()
Spark SQL로 처리하기
Spark 데이터프레임에서도 다양한 연산들을 지원하지만, 이번 예시처럼 구조화된 데이터의 경우
Spark SQL을 사용하는 것이 더 직관적이고 사용하기가 쉬워서 대부분 Spark SQL을 통해 처리한다.
왜 그렇게 하는지 직접 사용하여 살펴보자
df.createOrReplaceTempView("customer_orders")
- 이제 Spark SQL로 위에서 한 GROUP BY 연산들을 해보자
spark.sql("""SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg
FROM customer_orders
GROUP BY 1""")
spark.catalog.listTables()