groupBy: 주어지는 함수를 기준으로 Group
>>> rdd = sc.parallelize([1,1,3,5,8])
>>> result = rdd.groupBy(lambda x: x%2).collect()
>>> sorted([(x, sorted(y)) for (x,y) in result])
[(0, [2,8]), (1, [1,1,3,5])]
groupByKey: 주어지는 Key를 기준으로 Group
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a',2), ('b',1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1,1]), ('b', [1])]
grouped = sc.parallelize([
]).groupBy(lambda x: x[0]).collect()
for k,v in grouped:
print(k, list(v))
###
J ['java']
C ['C', 'C++', 'C#']
P ['Python']
x = sc.parallelize([
("MATH", 7), ("MATH", 2), ("ENGLISH", 7),
("SCIENCE", 7), ("ENGLISH", 4), ("ENGLISH", 9),
("MATH", 8), ("MATH", 3), ("ENGLISH", 4),
("SCIENCE", 6), ("SCIENCE", 9), ("SCIENCE", 5)], 3)
y = x.groupByKey()
print(y.getNumPartitions())
# 3
y = x.groupByKey(2)
print(y.getNumPartitions())
# 2
for t in y.collect():
print(t[0], list(t[1]))
# MATH [7,2,8,3]
# ENGLISH [7,4,9,4]
# SCIENCE [7,6,9,5]
reduce: 주어지는 함수를 기준으로 요소들을 합침 (action)
>>> sc.parallelize([1,2,3,4,5]).reduce(add)
15
reduceByKey: Key를 기준으로 그룹을 만들고 합침 (trans)
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
ReduceByKey
x = sc.parallelize([
("MATH", 7), ("MATH", 2), ("ENGLISH", 7),
("SCIENCE", 7), ("ENGLISH", 4), ("ENGLISH", 9),
("MATH", 8), ("MATH", 3), ("ENGLISH", 4),
("SCIENCE", 6), ("SCIENCE", 9), ("SCIENCE", 5)], 3)
x.reduceByKey(lambda a,b: a+b).collect()
# [('MATH', 20), ('ENGLISH', 24), ('SCIENCE', 27)]
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
>>> def f(x): return len(x)
>>> x.mapValues(f).collect()
# [('a', 3), ('b', 1)]
파티션과 키를 왔다갔다하려면 네트워크 코스트가 크기 때문에 mapValues 사용 시 성능 향상
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
# [('a',2), ('b',1)]
파티션을 유지하거나 Key가 굉장히 많은 경우가 있기 때문에 transformation
m = sc.prarallelize([(1,2), (3,4)]).keys()
>>> m.collect()
# [1,3]
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6), ("zoo", 1)])
rdd1.join(rdd2).collect()
# [("bar", (2, 5)), ("bar", (2, 6)), ("foo", (1, 4))]
rdd1.leftOuterJoin(rdd2).collect()
# [("baz", (3, None)), ("bar", (2, 5)), ("bar", (2, 6)), ("foo", (1, 4))]
rdd1.rightOuterJoin(rdd2).collect()
# [("bar", (2, 5)), ("bar", (2, 6)), ("zoo", (None, 1)), ("foo", (1, 4))]