Spark 완벽 가이드 ch12. RDD

Q·2023년 1월 18일
0

Spark 완벽 가이드

목록 보기
13/24

저수준 API란

  • 스파크에는 두 종류의 저수준 API가 있음
    1. 분산 데이터 처리를 위한 RDD
    2. 분산형 공유 변수를 배포하고 다루기 위한 API(브로드캐스트 변수와 어큐뮬레이터)

저수준 API는 언제 사용할까

  • 고수준 API에서 제공하지 않는 기능이 필요한 경우

    • 클러스터의 물리적 데이터의 배치를 아주 세밀하게 제어해야하는 상황
    • RDD를 사용해 개발된 기존 코드를 유지해야 하는 상황
    • 사용자가 정의한 공유 변수를 다뤄야 하는 상황
  • 스파크의 모든 워크로드는 저수준 기능을 사용하는 기초적인 형태로 컴파일되므로 이를 이해하는 것은 많은 도움이 될 수 있음

    • DataFrame 트랜스포메이션을 호출하면 다수의 RDD 트랜스포메이션으로 변환

저수준 API는 어떻게 사용할까

  • SparkContext는 저수준 API 기능을 사용하기 위한 진입 지점
  • 스파크 클러스터에서 연산을 수행하는데 필요한 도구인 SparkSession을 이용해 접근 가능
spark.sparkContext
Out[1]:
SparkContext

Spark UI

Version		v3.1.0
Master		local[8]
AppName		Databricks Shell

RDD 개요

  • 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음

  • RDD의 레코드는 프로그래머가 선택하는 자바, 스칼라, 파이썬의 객체

    • DataFrame의 레코드는 스키마를 알고, 필드로 구성된 구조화된 로우
  • 이러한 객체에는 사용자가 원하는 포맷을 사용해 원하는 모든 데이터를 저장할 수 있음

  • 모든 값을 다루거나 값 사이의 상호작용 과정은 반드시 수동으로 정의

  • spark에서는 RDD레코드의 내부 구조를 파악할 수 없으므로 수작업으로 최적화

    • 필터 재정렬, 집계 등의 최적화 기법 직접 구현

RDD 유형

  • 두 가지 타입의 RDD를 만들 수 있음

    1. 제네릭 RDD 타입
    2. 키-값 RDD 타입
  • RDD의 주요 속성

    • 파티션의 목록
    • 각 조각을 연산하는 함수
    • 다른 RDD와의 의존성 목록
    • 부가적으로 키-값 RDD를 위한 Partitioner
    • 부가적으로 각 조각을 연산하기 위한 기본 위치 목록
  • 이러한 속성은 사용자 프로그램을 스케줄링하고 실행하는 스파크의 모든 처리 방식을 결정

  • 또한 RDD 역시 트랜스포메이션, 액션 제공

    • DataFrame과 Dataset의 트랜스포메이션, 액션과 동일한 방식으로 동작
      • 하지만 RDD에는 '로우'개념이 없으므로 구조적 API에서 제공하는 여러 함수를 사용하지 못하므로 수동으로 처리
  • 언어별 성능 차이

    • 스칼라, 자바는 비슷
    • 파이썬은 상당한 성능 저하가 발생
      • 오버헤드 발생: 직렬화 -> 파이썬 프로세스에 전달 -> 처리 -> 다시 직렬화 -> JVM에 반환
      • 따라서 구조적 API를 사용하는 것이 좋음

RDD는 언제 사용할까

  • 정말 필요한 경우가 아니라면 수동으로 RDD를 생성하면 X
  • DataFrame이 RDD보다 더 효율적이고 안정적이고 표현력이 좋음
  • 물리적으로 분산된 데이터(자체적으로 구성한 데이터 파티셔닝)에 세부적인 제어가 필요할 때 RDD를 사용하는 것이 가장 적합

Dataset과 RDD의 케이스 클래스

  • Dataset은 구조적 API가 제공하는 풍부한 기능과 최적화 기법을 제공한다는 점이 RDD와의 큰 차이점
  • Dataset을 사용하면 JVM 데이터 타입과 스파크 데이터 타입 중 어떤 것을 쓸지 고민하지 않아도 됨
    • 모두 성능 동일

RDD 생성하기

DataFrame, Dataset으로 RDD 생성하기

  • rdd 메서드를 호출하면 쉽게 변환 가능
spark.range(10).rdd  
Out[2]: MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:0
#위에서 만들어진 데이터를 처리하려면 Row 객체를 올바른 데이터 타입으로 변환하거나 Row 객체에서 값을 추출해야함
spark.range(10).toDF('id').rdd.map(lambda row: row[0])  
Out[3]: PythonRDD[12] at RDD at PythonRDD.scala:58
#RDD -> DataFrame
spark.range(10).rdd.toDF()  
Out[4]: DataFrame[id: bigint]

로컬 컬렉션으로 RDD 생성하기

  • sparkContext의 parallelize 메서드 호출
    • 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(' ')
words = spark.sparkContext.parallelize(myCollection, 2) #파티션 수: 2  
#RDD에 이름을 지정하면 스파크 UI에 지정한 이름으로 RDD가 표시됨
words.setName('myWords')
words.name()  
Out[6]: 'myWords'

데이터소스로 RDD 생성하기

  • sparkContext를 사용해 데이터를 RDD로 읽을 수 있음
#줄 단위로 텍스트 파일을 RDD로 읽음
spark.sparkContext.textFile('/FileStore/tables/withTextFiles')  
Out[7]: /FileStore/tables/withTextFiles MapPartitionsRDD[26] at textFile at NativeMethodAccessorImpl.java:0
#여러 텍스트 파일의 각 줄을 레코드로 가진 RDD를 생성함
spark.sparkContext.wholeTextFiles('/FileStore/tables/withTextFiles')  
Out[8]: org.apache.spark.api.java.JavaPairRDD@613b96a0

트랜스포메이션

distinct

#distinct
words.distinct().count()  
Out[9]: 10

filter

#조건 함수
def startsWithS(individual):
  return individual.startswith("S")
words.filter(lambda word: startsWithS(word)).collect()
Out[11]: ['Spark', 'Simple']

map

words2=words.map(lambda word: (word, word[0], word.startswith("S")))
words2.filter(lambda record: record[2]).take(5)
Out[13]: [('Spark', 'S', True), ('Simple', 'S', True)]

flatMap

  • 단일 로우를 여러 로우로 변환
words.flatMap(lambda word: list(word)).take(5)
Out[14]: ['S', 'p', 'a', 'r', 'k']

sortBy

words.sortBy(lambda word: len(word)*-1).take(5)
Out[15]: ['Definitive', 'Processing', 'Simple', 'Spark', 'Guide']

randomSplit

  • RDD를 임의로 분할해서 RDD 배열을 만들 때 사용
fiftyFiftySplit = words.randomSplit([0.5,0.5])
len(fiftyFiftySplit)
Out[17]: 2
fiftyFiftySplit[0].collect()
Out[18]: ['Definitive', ':', 'Simple']
fiftyFiftySplit[1].collect()
Out[19]: ['Spark', 'The', 'Guide', 'Big', 'Data', 'Processing', 'Made']

액션

reduce

  • RDD의 모든 값을 하나의 값으로 만듦
spark.sparkContext.parallelize(range(1,21)).reduce(lambda x,y: x+y)
def wordLengthReducer(leftWord, rightWord):
  if len(leftWord)>len(rightWord):
    return leftWord
  return rightWord
words.reduce(wordLengthReducer)  
Out[22]: 'Processing'
  • wordLengthReducer 함수는 두 개의 입력값을 하나의 결과로 만들기 때문에 reduce메서드를 설명하는데 적합
  • 파티션에 대한 리듀스 연산은 비결정적인 특성을 가짐
  • 따라서 reduce 메서드를 실행할 때마다 다른 결과를 반환할 수 있음

count

words.count()
Out[23]: 10

countApprox

  • count함수의 근사치를 제한 시간 내에 계산
  • 제한 시간 초과 시 불완전한 결과 반환
  • 신뢰도(confidence)는 실제로 연산한 결과와의 오차율을 의미
    • ex) 0.9면 실제 연산 결과와 동일한 값이 90% 이상
confidence=0.95
timeout=400
words.countApprox(timeout, confidence)
Out[24]: 10

countApproxDistinct

  • 메모리 사용을 어느정도 사용할지 조절할 수 있는 카운터
  • 상대 정확도(relative accuracy)값이 작으면 더 많은 메모리 공간을 사용
    • 0.000017보단 커야함
words.countApproxDistinct(0.05)
words.countApproxDistinct(0.000017)
IllegalArgumentException Traceback (most recent call last) <command-1697308693987943> in <module> 
----> 1 words.countApproxDistinct(0.000017) 
/databricks/spark/python/pyspark/rdd.py in countApproxDistinct(self, relativeSD) 
2734 # the hash space in Java is 2^32 
2735 hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) 
-> 2736 return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) 
2737 
2738 def toLocalIterator(self, prefetchPartitions=False): 
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
1302 
1303 answer = self.gateway_client.send_command(command) 
-> 1304 return_value = get_return_value( 
1305 answer, self.gateway_client, self.target_id, self.name) 
1306 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 
121 # Hide where the exception came from that shows a non-Pythonic 
122 # JVM exception message. 
--> 123 raise converted from None 
124 else: 
125 raise 
IllegalArgumentException: requirement failed: accuracy (1.7E-5) must be greater than 0.000017

countByValue

  • RDD 값의 개수를 구함
  • 결과 데이터셋을 드라이버의 메모리로 읽어들여 처리
  • 이 메서드를 사용하면 익스큐터의 연산 결과가 드라이버 메모리에 모두 적재됨
  • 그래서 결과가 작은 경우에만 사용해야함
words.countByValue()

first

  • 데이터셋의 첫 번째 값 반환
words.first()

max, min

spark.sparkContext.parallelize(range(1,20)).max()
spark.sparkContext.parallelize(range(1,20)).min()

take

  • RDD에서 가져올 값의 개수를 파라미터로 사용
  • 이 메서드는 먼저 하나의 파티션을 스캔함
  • 그 다음 해당 파티션의 결과 수를 이용해 파라미터로 지정된 값을 만족하는 데 필요한 추가 파티션 수를 예측함
words.take(5)
#정렬
words.takeOrdered(5)
#최상윗값
words.top(5)

파일 저장하기

  • RDD를 사용하면 일반적인 의미의 데이터소스에 저장할 수 없음
  • 전체 파티션을 순회하면서 각 파티션의 내용을 외부 데이터베이스에 저장해야함

saveAsTextFile

words.saveAsTextFile('/FileStore/tables/bookeTitle')

시퀀스 파일

  • 시퀀스 파일은 바이너리 키-값 쌍으로 구성된 플랫 파일이며 맵리듀스의 입출력 포맷으로 널리 사용됨
words.saveAsSequenceFile('/')

하둡 파일

words.saveAsHadoopFile('/')

캐싱

  • RDD 캐싱도 구조적 API와 동일한 원칙 적용
words.cache()

체크포인팅

  • DataFrame API에서 사용할 수 없는 기능 중 하나가 체크포인팅
  • 체크포인팅은 RDD를 디스크에 저장하는 방식
  • 메모리에 저장하지 않고 디스크에 저장한다는 사실만 제외하면 캐싱과 유사
  • 나중에 저장된 RDD를 참조할 때 원본 데이터 소스를 다시 계산하지 않고 디스크에 저장된 중간 결과 파티션 참조
    • 반복적인 연산 수행 시 매우 유용
#저장 위치 지정
spark.sparkContext.setCheckpointDir('/FileStore/check')
words.checkpoint()

RDD를 시스템 명령으로 전송하기

  • pipe 메서드를 사용하면 파이핑 요소로 생성된 RDD를 외부 프로세스로 전달 가능
  • 이때 외부 프로세스는 파티션마다 한 번씩 처리해 결과 RDD를 생성
#wc -l: 파일 내 전체 라인 수 출력
words.pipe('wc -l').collect()
  • 파티션 2개, 각 파티션은 5개의 로우를 가짐

mapPartitions

  • 로우 단위가 아닌 파티션 단위로 map 연산 수행 가능
  • 클러스터에서 물리적인 단위로 개별 파티션을 처리하기 때문
words.mapPartitions(lambda part:[1]).sum()
  • 각 파티션(두 개)에 '1' 값을 생성했으니 합산하면 2가 나오게됨

mapPartitionWithIndex

  • 파티션 인덱스를 사용해 각 레코드가 속한 데이터셋이 어디에 있는지 알아낼 수 있음
  • 이 메서드를 사용하려면 인덱스와 파티션의 모든 아이템을 순회하는 이터레이터를 가진 함수를 인수로 지정해야함
def indexedFunc(partitionIndex, withPartIterator):
  return [f"partition: {partitionIndex}=>{x for x in withPartIterator}"]
words.mapPartitionsWithIndex(indexedFunc).collect()

foreachPartition

  • mapPartitions는 처리 결과를 반환하지만 foreachPartition은 파티션의 모든 데이터를 순회할 뿐 결과 반환 X
  • 각 파티션의 데이터를 DB에 저장하는 것과 같이 개별 파티션에서 특정 작업을 수행하는 데 적합
  • 실제로 많은 데이터소스 커넥터에서 이 함수를 사용하고 있음

glom

  • 데이터셋의 모든 파티션을 배열로 변환
  • 데이터를 드라이버로 모으고 데이터가 존재하는 파티션의 배열이 필요한 경우에 매우 유용
spark.sparkContext.parallelize(['hello','world'],2).glom().collect()
spark.sparkContext.parallelize(['hello','world'],1).glom().collect()
profile
Data Engineer

0개의 댓글