myCollection = "Spark The Definitive Guide: Big Data Processing Made Simple".split(' ')
words = spark.sparkContext.parallelize(myCollection, 2)
#(키, 값)
words.map(lambda word: (word.lower(), 1)).collect()
Out[2]: [('spark', 1),
('the', 1),
('definitive', 1),
('guide:', 1),
('big', 1),
('data', 1),
('processing', 1),
('made', 1),
('simple', 1)]
#단어의 첫번째 문자를 키로 만들어 RDD 생성
keyword = words.keyBy(lambda word: word.lower()[0])
keyword.collect()
Out[3]: [('s', 'Spark'),
('t', 'The'),
('d', 'Definitive'),
('g', 'Guide:'),
('b', 'Big'),
('d', 'Data'),
('p', 'Processing'),
('m', 'Made'),
('s', 'Simple')]
#위에서 생성한 키에 값 매핑
keyword.mapValues(lambda word: word.upper()).collect()
Out[4]: [('s', 'SPARK'),
('t', 'THE'),
('d', 'DEFINITIVE'),
('g', 'GUIDE:'),
('b', 'BIG'),
('d', 'DATA'),
('p', 'PROCESSING'),
('m', 'MADE'),
('s', 'SIMPLE')]
#flatMap함수는 단어의 각 문자를 값으로 하도록 함
keyword.flatMapValues(lambda word: word.upper()).collect()
Out[5]: [('s', 'S'), ('s', 'P'), ('s', 'A'), ('s', 'R'), ('s', 'K'),
('t', 'T'), ('t', 'H'), ('t', 'E'),
('d', 'D'), ('d', 'E'), ('d', 'F'), ('d', 'I'), ('d', 'N'), ('d', 'I'), ('d', 'T'), ('d', 'I'), ('d', 'V'), ('d', 'E'),
('g', 'G'), ('g', 'U'), ('g', 'I'), ('g', 'D'), ('g', 'E'), ('g', ':'),
('b', 'B'), ('b', 'I'), ('b', 'G'),
('d', 'D'), ('d', 'A'), ('d', 'T'), ('d', 'A'),
('p', 'P'), ('p', 'R'), ('p', 'O'), ('p', 'C'), ('p', 'E'), ('p', 'S'), ('p', 'S'), ('p', 'I'), ('p', 'N'), ('p', 'G'),
('m', 'M'), ('m', 'A'), ('m', 'D'), ('m', 'E'),
('s', 'S'), ('s', 'I'), ('s', 'M'), ('s', 'P'), ('s', 'L'), ('s', 'E')]
keyword.keys().collect()
Out[6]: ['s', 't', 'd', 'g', 'b', 'd', 'p', 'm', 's']
keyword.values().collect()
Out[7]: ['Spark', 'The', 'Definitive', 'Guide:', 'Big', 'Data', 'Processing', 'Made', 'Simple']
#키가 s인 값
keyword.lookup('s')
Out[8]: ['Spark', 'Simple']
import random
distinctChars = words.flatMap(lambda word: list(word.lower())).distinct().collect()
distinctChars
Out[10]: ['s', 'p', 'r', 'h', 'd', 'i', 'g', 'b', 'c', 'l', 'a', 'k', 't', 'e', 'f', 'n', 'v', 'u', ':', 'o', 'm']
sampleMap = dict(map(lambda c: (c, random.random()), distinctChars))
sampleMap
Out[12]: {'s': 0.4297864903860694,
'p': 0.18868029016164345,
'r': 0.22862016724286993,
'h': 0.14526448288068994,
'd': 0.665248317840256,
'i': 0.5521114195447983,
'g': 0.8753878879131628,
'b': 0.2829208684453799,
'c': 0.20586361665842334,
'l': 0.21774410547593126,
'a': 0.196883744534185,
'k': 0.016270573164381852,
't': 0.35061350763694465,
'e': 0.7724279823577984,
'f': 0.9128642381493561,
'n': 0.7248651110759695,
'v': 0.5816624409071538,
'u': 0.30639859612050724,
':': 0.12432781497534262,
'o': 0.18115111299784814,
'm': 0.9135147283787879}
words.map(lambda word: (word.lower()[0], word)).sampleByKey(True, sampleMap, 6).collect()
Out[13]: [('t', 'The'), ('g', 'Guide:'), ('m', 'Made')]
chars= words.flatMap(lambda word: word.lower())
KVcharacters = chars.map(lambda letter: (letter, 1))
chars.take(5)
Out[15]: ['s', 'p', 'a', 'r', 'k']
KVcharacters.take(5)
Out[16]: [('s', 1), ('p', 1), ('a', 1), ('r', 1), ('k', 1)]
def maxFunc(left, right):
return max(left, right)
def addFunc(left, right):
return left+right
nums = sc.parallelize(range(1,31),5)
KVcharacters.countByKey()
Out[19]: defaultdict(int, {'s': 4,
'p': 3,
'a': 4,
'r': 2,
'k': 1,
't': 3,
'h': 1,
'e': 7,
'd': 4,
'f': 1,
'i': 7,
'n': 2,
'v': 1,
'g': 3,
'u': 1,
':': 1,
'b': 1,
'o': 1,
'c': 1,
'm': 2,
'l': 1})
from functools import reduce
#각 키에 1의 값을 매핑했었음
KVcharacters.groupByKey().map(lambda row: (row[0],reduce(addFunc, row[1]))).collect()
Out[20]: [('s', 4),
('p', 3),
('r', 2),
('h', 1),
('d', 4),
('i', 7),
('g', 3),
('b', 1),
('c', 1),
('l', 1),
('a', 4),
('k', 1),
('t', 3),
('e', 7),
('f', 1),
('n', 2),
('v', 1),
('u', 1),
(':', 1),
('o', 1),
('m', 2)]
KVcharacters.reduceByKey(addFunc).collect()
Out[21]: [('s', 4),
('p', 3),
('r', 2),
('h', 1),
('d', 4),
('i', 7),
('g', 3),
('b', 1),
('c', 1),
('l', 1),
('a', 4),
('k', 1),
('t', 3),
('e', 7),
('f', 1),
('n', 2),
('v', 1),
('u', 1),
(':', 1),
('o', 1),
('m', 2)]
nums.aggregate(0, maxFunc, addFunc)#시작값, 파티션 내에 수행될 함수, 모든 파티션에 걸쳐 수행될 함수
Out[22]: 90
nums.treeAggregate(0, maxFunc, addFunc, depth=3)
Out[23]: 90
KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()
Out[24]: [('s', 3),
('p', 2),
('r', 1),
('h', 1),
('d', 2),
('i', 4),
('g', 2),
('b', 1),
('c', 1),
('l', 1),
('a', 3),
('k', 1),
('t', 2),
('e', 4),
('f', 1),
('n', 1),
('v', 1),
('u', 1),
(':', 1),
('o', 1),
('m', 2)]
def valToCombiner(v):
return [v]
def mergeValuesFunc(v, valToAppend):
v.append(valToAppend)
return v
def mergeCombinerFunc(v1, v2):
return v1+v2
outputPartitions=6
KVcharacters.combineByKey(valToCombiner, mergeValuesFunc, mergeCombinerFunc,outputPartitions).collect()
Out[26]: [('s', [1, 1, 1, 1]),
('d', [1, 1, 1, 1]), ('l', [1]),
('v', [1]),
(':', [1]),
('p', [1, 1, 1]),
('r', [1, 1]),
('c', [1]),
('k', [1]),
('t', [1, 1, 1]),
('n', [1, 1]),
('u', [1]),
('o', [1]),
('h', [1]),
('i', [1, 1, 1, 1, 1, 1, 1]),
('g', [1, 1, 1]),
('b', [1]),
('a', [1, 1, 1, 1]),
('e', [1, 1, 1, 1, 1, 1, 1]),
('f', [1]),
('m', [1, 1])]
KVcharacters.foldByKey(0,addFunc).collect()
Out[27]: [('s', 4),
('p', 3),
('r', 2),
('h', 1),
('d', 4),
('i', 7),
('g', 3),
('b', 1),
('c', 1),
('l', 1),
('a', 4),
('k', 1),
('t', 3),
('e', 7),
('f', 1),
('n', 2),
('v', 1),
('u', 1),
(':', 1),
('o', 1),
('m', 2)]
import random
distinctChars = words.flatMap(lambda word: word.lower()).distinct()
charRDD = distinctChars.map(lambda c: (c, random.random()))
charRDD2 = distinctChars.map(lambda c: (c, random.random()))
charRDD.cogroup(charRDD2).take(5)
Out[28]: [('s', (<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8430>,
<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b87c0>)),
('p', (<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8610>,
<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8670>)),
('r', (<pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8250>, <pyspark.resultiterable.ResultIterable at 0x7f4c5d9b8fa0>)),
('i', (<pyspark.resultiterable.ResultIterable at 0x7f4c5e277340>, <pyspark.resultiterable.ResultIterable at 0x7f4c5e277190>)),
('g', (<pyspark.resultiterable.ResultIterable at 0x7f4c5e277580>, <pyspark.resultiterable.ResultIterable at 0x7f4c5e277160>))]
keyedChars = distinctChars.map(lambda c:(c, random.random()))
outputPartitions =10
# KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()
Out[39]: 51
KVcharacters.join(keyedChars, outputPartitions).take(5)
Out[43]: [('u', (1, 0.04255988777577302)),
('m', (1, 0.9649551230089524)),
('m', (1, 0.9649551230089524)),
('k', (1, 0.74456038306458)),
('t', (1, 0.8738054481806644))]
numRange= sc.parallelize(range(9), 2)
words.zip(numRange).collect()
Out[37]: [('Spark', 0),
('The', 1),
('Definitive', 2),
('Guide:', 3),
('Big', 4),
('Data', 5),
('Processing', 6),
('Made', 7),
('Simple', 8)]
#파티션 수 지정 -> 1
words.coalesce(1).getNumPartitions()
Out[44]: 1
words.repartition(10).getNumPartitions()
Out[46]: 10
RDD를 사용하는 가장 큰 이유 중 하나이므로 가장 중요
사용자 정의 파티셔너는 저수준 API의 세부적인 구현 방식
사용자 정의 파티셔닝의 유일한 목표: 데이터의 치우침 같은 문제를 피하는 것
심각하게 치우친 키를 다뤄야 한다면 고급 파티셔닝 기능을 사용
path= "/FileStore/tables/retail-data/all/"
df= spark.read.option('header', 'true').option('inferSchema', 'true').csv(path)
df.printSchema()
root -- InvoiceNo: string (nullable = true) -- StockCode: string (nullable = true) -- Description: string (nullable = true) -- Quantity: integer (nullable = true) -- InvoiceDate: string (nullable = true) -- UnitPrice: double (nullable = true) -- CustomerID: integer (nullable = true) -- Country: string (nullable = true)
rdd = df.coalesce(10).rdd
def partitionFunc(key):
if key in [17850, 12583]:
return 0
return random.randint(1,2)
#row[6]: customerid 값임
keyedRDD = rdd.keyBy(lambda row: row[6])
keyedRDD.take(1)
Out[58]: [(17850, Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=2.55, CustomerID=17850, Country='United Kingdom'))]
keyedRDD.partitionBy(3, partitionFunc).map(lambda x: x[0])\
.glom().map(lambda x: len(set(x))).take(5)
Out[59]: [2, 4315, 4301]