Spark 완벽 가이드 ch13. RDD 고급 개념

Q·2023년 1월 25일
0

Spark 완벽 가이드

목록 보기
14/24
  • 이 장에서 다룰 핵심 주제
    • 집계와 키-값 형태의 RDD
    • 사용자 정의 파티셔닝
    • RDD 조인
myCollection = "Spark The Definitive Guide: Big Data Processing Made Simple".split(' ')
words = spark.sparkContext.parallelize(myCollection, 2)  

키-값 형태의 기초(키-값 형태의 RDD)

  • 메서드 이름에 ByKey가 있다면 PairRDD 타입(키-값 타입)만 사용 가능
    • RDD에 맵 연산을 수행해서 키-값 구조로 만들 수 있음
#(키, 값)
words.map(lambda word: (word.lower(), 1)).collect()
Out[2]: [('spark', 1),
('the', 1),
('definitive', 1),
('guide:', 1),
('big', 1),
('data', 1),
('processing', 1),
('made', 1),
('simple', 1)]

keyBy

  • 현재 값으로 부터 키를 생성하는 함수
#단어의 첫번째 문자를 키로 만들어 RDD 생성
keyword = words.keyBy(lambda word: word.lower()[0])
keyword.collect()
Out[3]: [('s', 'Spark'), 
('t', 'The'), 
('d', 'Definitive'), 
('g', 'Guide:'), 
('b', 'Big'), 
('d', 'Data'), 
('p', 'Processing'), 
('m', 'Made'), 
('s', 'Simple')]

값 매핑하기

  • mapValues
  • flatMapValues
#위에서 생성한 키에 값 매핑
keyword.mapValues(lambda word: word.upper()).collect()
Out[4]: [('s', 'SPARK'), 
('t', 'THE'), 
('d', 'DEFINITIVE'), 
('g', 'GUIDE:'), 
('b', 'BIG'), 
('d', 'DATA'), 
('p', 'PROCESSING'), 
('m', 'MADE'), 
('s', 'SIMPLE')]
#flatMap함수는 단어의 각 문자를 값으로 하도록 함
keyword.flatMapValues(lambda word: word.upper()).collect()
Out[5]: [('s', 'S'), ('s', 'P'), ('s', 'A'), ('s', 'R'), ('s', 'K'), 
('t', 'T'), ('t', 'H'), ('t', 'E'),
('d', 'D'), ('d', 'E'), ('d', 'F'), ('d', 'I'), ('d', 'N'), ('d', 'I'), ('d', 'T'), ('d', 'I'), ('d', 'V'), ('d', 'E'),
('g', 'G'), ('g', 'U'), ('g', 'I'), ('g', 'D'), ('g', 'E'), ('g', ':'),
('b', 'B'), ('b', 'I'), ('b', 'G'),
('d', 'D'), ('d', 'A'), ('d', 'T'), ('d', 'A'),
('p', 'P'), ('p', 'R'), ('p', 'O'), ('p', 'C'), ('p', 'E'), ('p', 'S'), ('p', 'S'), ('p', 'I'), ('p', 'N'), ('p', 'G'),
('m', 'M'), ('m', 'A'), ('m', 'D'), ('m', 'E'),
('s', 'S'), ('s', 'I'), ('s', 'M'), ('s', 'P'), ('s', 'L'), ('s', 'E')]

키와 값 추출하기

keyword.keys().collect()
Out[6]: ['s', 't', 'd', 'g', 'b', 'd', 'p', 'm', 's']
keyword.values().collect()
Out[7]: ['Spark', 'The', 'Definitive', 'Guide:', 'Big', 'Data', 'Processing', 'Made', 'Simple']

lookup

  • 특정 키에 관한 결과 찾음
#키가 s인 값
keyword.lookup('s')
Out[8]: ['Spark', 'Simple']

sampleByKey

  • 근사치나 정확도를 이용해 키를 기반으로 RDD샘플 생성
  • 특정 키를 부분 샘플링할 수 있음
  • 해당 메서드는 RDD를 한 번만 처리하면서 간단한 무작위 샘플링을 사용함
import random

distinctChars = words.flatMap(lambda word: list(word.lower())).distinct().collect()
distinctChars
Out[10]: ['s', 'p', 'r', 'h', 'd', 'i', 'g', 'b', 'c', 'l', 'a', 'k', 't', 'e', 'f', 'n', 'v', 'u', ':', 'o', 'm']
sampleMap = dict(map(lambda c: (c, random.random()), distinctChars))
sampleMap
Out[12]: {'s': 0.4297864903860694,
'p': 0.18868029016164345,
'r': 0.22862016724286993,
'h': 0.14526448288068994,
'd': 0.665248317840256,
'i': 0.5521114195447983,
'g': 0.8753878879131628,
'b': 0.2829208684453799,
'c': 0.20586361665842334,
'l': 0.21774410547593126,
'a': 0.196883744534185,
'k': 0.016270573164381852,
't': 0.35061350763694465,
'e': 0.7724279823577984,
'f': 0.9128642381493561,
'n': 0.7248651110759695,
'v': 0.5816624409071538,
'u': 0.30639859612050724,
':': 0.12432781497534262,
'o': 0.18115111299784814,
'm': 0.9135147283787879}
words.map(lambda word: (word.lower()[0], word)).sampleByKey(True, sampleMap, 6).collect()
Out[13]: [('t', 'The'), ('g', 'Guide:'), ('m', 'Made')]

집계

chars= words.flatMap(lambda word: word.lower())
KVcharacters = chars.map(lambda letter: (letter, 1))
chars.take(5)
Out[15]: ['s', 'p', 'a', 'r', 'k']
KVcharacters.take(5)
Out[16]: [('s', 1), ('p', 1), ('a', 1), ('r', 1), ('k', 1)]
def maxFunc(left, right):
  return max(left, right)

def addFunc(left, right):
  return left+right
nums = sc.parallelize(range(1,31),5)

countByKey

  • 각 키의 아이템 수를 구하고 로컬 맵으로 결과를 수집
KVcharacters.countByKey()
Out[19]: defaultdict(int, {'s': 4, 
'p': 3, 
'a': 4, 
'r': 2,
'k': 1,
't': 3,
'h': 1, 
'e': 7, 
'd': 4, 
'f': 1,
'i': 7,
'n': 2,
'v': 1,
'g': 3,
'u': 1, 
':': 1,
'b': 1, 
'o': 1, 
'c': 1, 
'm': 2, 
'l': 1})

집계 연산 구현 방식 이해하기

  • 키-값 형태의 PairRDD를 생성하는 몇 가지 방법이 있음
    • groupBy
    • reduce
  • 이때 구현 방식은 Job의 안정성을 위해 매우 중요함

groupByKey

  • 각 키에 대한 값의 크기가 일정하고 익스큐터에 할당된 메모리에서 처리 가능할 정도의 크기라면 해당 메서드 사용
    • 모든 익스큐터에서 함수를 적용하기 전에 해당 키와 관련된 모든 값을 메모리로 읽어 들임
    • 따라서 만약 심각하게 치우쳐진 키가 있다면 일부 파티션이 엄청난 양의 값을 가질 수 있음(out of memory)
from functools import reduce
#각 키에 1의 값을 매핑했었음
KVcharacters.groupByKey().map(lambda row: (row[0],reduce(addFunc, row[1]))).collect()
Out[20]: [('s', 4), 
('p', 3), 
('r', 2),
('h', 1),
('d', 4), 
('i', 7),
('g', 3),
('b', 1),
('c', 1),
('l', 1),
('a', 4),
('k', 1),
('t', 3), 
('e', 7), 
('f', 1), 
('n', 2),
('v', 1), 
('u', 1), 
(':', 1),
('o', 1), 
('m', 2)]

reduceByKey

  • 작업 부하를 줄이려는 경우 적합
  • 각 파티션에서 리듀스 작업을 수행하기 때문에 안정적이고 모든 값을 메모리에 유지하지 않아도 됨
  • 또한 최종 리듀스 과정을 제외한 모든 작업은 개별 워커에서 처리하므로 연산 중 셔플 발생 X
KVcharacters.reduceByKey(addFunc).collect()
Out[21]: [('s', 4), 
('p', 3), 
('r', 2), 
('h', 1), 
('d', 4), 
('i', 7), 
('g', 3), 
('b', 1), 
('c', 1), 
('l', 1),
('a', 4), 
('k', 1),
('t', 3), 
('e', 7), 
('f', 1), 
('n', 2), 
('v', 1), 
('u', 1), 
(':', 1), 
('o', 1), 
('m', 2)]

기타 집계 메서드

aggregate

  • 파티션 기준 집계
nums.aggregate(0, maxFunc, addFunc)#시작값, 파티션 내에 수행될 함수, 모든 파티션에 걸쳐 수행될 함수
Out[22]: 90

treeAggregate

  • 위 aggregate는 드라이버에서 최종 집계를 수행하므로 익스큐터 결과가 너무 크면 out of memory
  • 그래서 익스큐터끼리 트리를 형성해서 집계 처리의 일부 하위 과정을 push down 방식으로 수행하는 것이 해당 함수
  • 이렇게 집계 처리를 여러 단계로 구성해서 드라이버의 메모리를 모두 소비하는 현상을 막음
nums.treeAggregate(0, maxFunc, addFunc, depth=3)
Out[23]: 90

aggregateByKey

  • 파티션 대신 키를 기준으로 집계
KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()
Out[24]: [('s', 3), 
('p', 2),
('r', 1),
('h', 1),
('d', 2),
('i', 4),
('g', 2),
('b', 1),
('c', 1),
('l', 1),
('a', 3),
('k', 1), 
('t', 2),
('e', 4),
('f', 1), 
('n', 1), 
('v', 1), 
('u', 1), 
(':', 1), 
('o', 1), 
('m', 2)]

combineByKey

  • 키를 기준으로 연산을 수행하며 파라미터로 사용된 함수에 따라 값을 병합함
  • 그런다음 여러 결괏값을 병합해 결과 반환
  • 사용자 정의 파티셔너를 사용해 출력 파티션 수를 지정할 수도 있음
def valToCombiner(v):
  return [v]

def mergeValuesFunc(v, valToAppend):
  v.append(valToAppend)
  return v

def mergeCombinerFunc(v1, v2):
  return v1+v2

outputPartitions=6
KVcharacters.combineByKey(valToCombiner, mergeValuesFunc, mergeCombinerFunc,outputPartitions).collect()
Out[26]: [('s', [1, 1, 1, 1]), 
('d', [1, 1, 1, 1]), ('l', [1]), 
('v', [1]),
(':', [1]), 
('p', [1, 1, 1]),
('r', [1, 1]), 
('c', [1]), 
('k', [1]), 
('t', [1, 1, 1]),
('n', [1, 1]), 
('u', [1]), 
('o', [1]), 
('h', [1]), 
('i', [1, 1, 1, 1, 1, 1, 1]),
('g', [1, 1, 1]),
('b', [1]), 
('a', [1, 1, 1, 1]), 
('e', [1, 1, 1, 1, 1, 1, 1]),
('f', [1]),
('m', [1, 1])]

foldByKey

  • 결합 함수와 항등원(어떤 원소와 연산을 취해도 자기 자신이 되게 하는 원소)인 제로값을 이용해 각 키의 값을 병합
KVcharacters.foldByKey(0,addFunc).collect()
Out[27]: [('s', 4), 
('p', 3), 
('r', 2),
('h', 1), 
('d', 4), 
('i', 7), 
('g', 3),
('b', 1),
('c', 1), 
('l', 1), 
('a', 4), 
('k', 1), 
('t', 3),
('e', 7),
('f', 1), 
('n', 2),
('v', 1), 
('u', 1), 
(':', 1), 
('o', 1), 
('m', 2)]

cogroup

  • 키-값 형태의 RDD를 키를 기준으로 그룹화 가능
    • 스칼라를 사용하는 경우 최대 3개까지
    • 파이썬을 사용하는 경우 최대 2개까지
import random
distinctChars = words.flatMap(lambda word: word.lower()).distinct()
charRDD = distinctChars.map(lambda c: (c, random.random()))
charRDD2 = distinctChars.map(lambda c: (c, random.random()))

charRDD.cogroup(charRDD2).take(5)
Out[28]: [('s', (<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8430>,
<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b87c0>)),
('p', (<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8610>,
<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8670>)),
('r', (<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8250>, <pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8fa0>)),
('i', (<pyspark.resultiterable.ResultIterable at 0x7f4c5e277340>, <pyspark.resultiterable.ResultIterable at 0x7f4c5e277190>)),
('g', (<pyspark.resultiterable.ResultIterable at 0x7f4c5e277580>, <pyspark.resultiterable.ResultIterable at 0x7f4c5e277160>))]

조인

내부 조인

keyedChars = distinctChars.map(lambda c:(c, random.random()))
outputPartitions =10

# KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()
Out[39]: 51
KVcharacters.join(keyedChars, outputPartitions).take(5)
Out[43]: [('u', (1, 0.04255988777577302)), 
('m', (1, 0.9649551230089524)), 
('m', (1, 0.9649551230089524)), 
('k', (1, 0.74456038306458)), 
('t', (1, 0.8738054481806644))]

zip

  • 두 개의 RDD를 연결
numRange= sc.parallelize(range(9), 2)
words.zip(numRange).collect()
Out[37]: [('Spark', 0), 
('The', 1), 
('Definitive', 2),
('Guide:', 3), 
('Big', 4), 
('Data', 5), 
('Processing', 6), 
('Made', 7), 
('Simple', 8)]

파티션 제어하기

  • RDD를 사용하면 데이터가 클러스터 전체에 물리적으로 정확히 분산되는 방식을 정의할 수 있다.
  • 이러한 기능을 가진 메서드 중 일부는 구조적 API와 기본적으로 동일
  • 다른 점은, 파티션 함수를 파라미터로 사용할 수 있다는 사실
    • 파티션 함수: 보통 사용자 지정 Partitioner를 의미

coalesce

  • 파티션을 재분배할 때 발생하는 데이터 셔플을 방지하기 위해 동일한 워크에 존재하는 파티션을 합치는 메서드
#파티션 수 지정 -> 1
words.coalesce(1).getNumPartitions()
Out[44]: 1

repartition

  • 똑같이 파티션 수를 늘리거나 줄일 수 있음
  • 하지만 노드 간 셔플이 발생
words.repartition(10).getNumPartitions()
Out[46]: 10

repartitionAndSortWithinPartitions

  • 파티션 재분배 가능
  • 재분배된 결과 파티션의 정렬 방식을 지정할 수 있음

사용자 정의 파티셔닝

  • RDD를 사용하는 가장 큰 이유 중 하나이므로 가장 중요

  • 사용자 정의 파티셔너는 저수준 API의 세부적인 구현 방식

    • job이 성공적으로 동작되는지 여부에 상당한 영향을 미침
  • 사용자 정의 파티셔닝의 유일한 목표: 데이터의 치우침 같은 문제를 피하는 것

    • 즉, 클러스터 전체에 걸쳐 데이터를 균등하게 분배하는 것
  • 심각하게 치우친 키를 다뤄야 한다면 고급 파티셔닝 기능을 사용

    • 병렬성을 개선하고 실행 과정에서 out of memory error를 방지할 수 있도록 키를 최대한 분할해야함
path= "/FileStore/tables/retail-data/all/"
df= spark.read.option('header', 'true').option('inferSchema', 'true').csv(path)
df.printSchema()
root -- InvoiceNo: string (nullable = true) -- StockCode: string (nullable = true) -- Description: string (nullable = true) -- Quantity: integer (nullable = true) -- InvoiceDate: string (nullable = true) -- UnitPrice: double (nullable = true) -- CustomerID: integer (nullable = true) -- Country: string (nullable = true)
rdd = df.coalesce(10).rdd
def partitionFunc(key):
  if key in [17850, 12583]:
    return 0
  return random.randint(1,2)

case: 두 고객의 데이터가 너무 많아서 다른 고객의 정보와 두 고객의 정보를 분리하려함(두 그룹 생성)

  • 두 고객의 아이디: 17850, 12583
#row[6]: customerid 값임
keyedRDD = rdd.keyBy(lambda row: row[6])
keyedRDD.take(1)
Out[58]: [(17850, Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=2.55, CustomerID=17850, Country='United Kingdom'))]
keyedRDD.partitionBy(3, partitionFunc).map(lambda x: x[0])\
.glom().map(lambda x: len(set(x))).take(5)
Out[59]: [2, 4315, 4301]

사용자 정의 직렬화

  • 스파크는 Kryo 라이브러리를 사용해서 객체를 빠르게 직렬화 할 수 있음
    • 이는 자바 직렬화보다 약 10배 이상 성능이 좋음
profile
Data Engineer

0개의 댓글