실시간 빅데이터 처리를 위한 Spark & Flink Online 4) Reduction Operations

Bradley·2022년 3월 8일
0

Parallel Transformations

주로 변형을 적용시키는 작업들

  • map, flatMap, filter

이번강의에서는 Action이 분산된 환경에서 작동하는 원리

대부분의 Spark의 Action은 Reduction

  • 파일 저장, collect() 등과 같이 Reduction이 아닌 액션도 있다.

Reduction Operations

요소들을 모아서 하나로 합치는 작업
많은 Spark의 연산들이 reduction

Parallel Reduction

병렬/분산된 환경에서의 Reduction 작업

복습

  • Transformations & Actions
  • 지연 실행으로 인한 성능 최적화
  • Cache() & persist()
  • Cluster Topology

Reduction 이란?

대부분의 Action은 Reduction
Reduction:
근접하는 요소들을 모아서 하나의 결과로 만드는 일

  • 파일 저장, collect()등과 같이 Reduction이 아닌 액션도 있다.

병렬처리 하기 가능한 예

Parallel Reduction

  • 병렬 처리가 가능한 경우
    각 Task나 Partition이 독립적이어야함
  • 병렬 처리 하기 힘든 경우 (병렬처리 하는 의미가 없는 경우)
    각 Task나 Partition이 의존적이고 순차적인 경우

Spark의 대표적인 Reduction Actions

Reduce

사용자가 지정하는 function을 받아서 여러개의 값을 줄여주는 역할을 함

Partition

>>> sc.parallelize([1,2,3,4]).reduce(lambda x,y: (x*2)+y)
26
>>> sc.parallelize([1,2,3,4], 1).reduce(lambda x,y: (x*2)+y)
26
>>> sc.parallelize([1,2,3,4], 2).reduce(lambda x,y: (x*2)+y)
18
>>> sc.parallelize([1,2,3,4], 3).reduce(lambda x,y: (x*2)+y)
18
>>> sc.parallelize([1,2,3,4], 4).reduce(lambda x,y: (x*2)+y)
26
* (1,2,3,4) -> ((1*2+2)*2+3)*2+4=26
* (1,2) (3,4) -> ((1*2+2)*2 + (3*2)+4)=18

파티션이 어떻게 나뉠지 프로그래머가 정확하게 알기 어렵다

  • 연산의 순서와 상관없이 결과값을 보장하려면
    • 교환법칙 (ab = ba)
    • 결합법칙 (ab)c=a(bc)

Fold

  • RDD.fold(zeroValue, <"function">)
>>> from operatior import add
>>> sc.parallelize([1,2,3,4,5]).fold(0,add)
15

Fold & Partition

rdd = sc.paraelleize([2,3,4], 4)
rdd.reduce(lambda x, y: x*y)
# 24
rdd.fold(lambda x, y: x*y)
# 24
  • (234) =24
  • (123*4) = 24
rdd.reduce(lambda x, y: x+y)
# 9
rdd.fold(1, lambda x, y: x*y)
# 14
  • 0+2+3+4 = 9
  • (1+1)+(1+2)+(1+3)+(1+4)=14

GroupBy

  • RDD.groupBy(<기준 함수>)
>>> rdd = sc.parallelize([1,1,2,3,5,8])
>>> result = rdd.groupBy(lambda x: x%2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2,8]), (1, [1,1,3,5]))]

Aggregate

  • RDD 데이터 타입과 Action결과 타입이 다를 경우 사용
  • 파티션 단위의 연산 결과를 합치는 과정을 거친다.
  • RDD.aggregate(zeroValue, seqOp, combOp)
    • zeroValue: 각 파티션에서 누적할 시작 값
    • seqOp: 타입 변경 함수
    • combOp: 합치는 함수
  • 많이 쓰이는 reduction action
  • 대부분의 데이터 작업은 크고 복잡한 데이터 타입 -> 정제된 데이터

Aggregate 예제

>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1,2,3,4]).aggregate((0,0), seqOp, combOp)
(10,4)
>>> sc.parallelize([]).aggregate((0,0), seqOp, combOp)
(0,0)
  • x[0]=0, x[1]=0 (zeroValue)

  • x[0]+y = (0+1), x[1]+1=(0+1)->(1,1)

  • x[0]+y = (1+2), x[1]+1=(1+1)->(3,2)

  • x[0]=0, x[1]=0 (zeroValue)

  • x[0]+y = (0+3), x[1]+1=(0+1)->(3,1)

  • x[0]+y = (3+4), x[1]+1=(1+1)->(7,2)

Key-Value RDD Reduction

  • 배운것 외에도 여러가지 Operation이 존재
  • Key-Value RDD
    • groupByKey
    • reduceByKey
profile
2022년부턴 후회없이

0개의 댓글

관련 채용 정보