본 내용은 Fast Campus의 올인원 패키지: 실시간 빅데이터 처리를 위한 Spark & Flink Online 강의를 듣고 정리한 내용입니다.
Data parallel과 Distributed Data parallel
-
Data Parallel?
- 데이터를 여러개로 쪼갬
- 여러 쓰레드에서 각자 task를 적용
- 최종적으로 각각의 쓰레드에서 만든 결과값을 합침
-
Distributed Data Parallel?
- 데이터를 여러개로 쪼갬
- 여러 노드로 보낸 후 여러 노드에서 각자 독립적으로 task를 적용
- 최종적으로 각각의 노드에서 만든 결과값을 합침
- 노드간 통신과 같이 처리해야할 것들이 늘어남
- Spark 를 이용하면 분산된 환경에서도 일반적인 병렬처리를 하듯 코딩이 가능
- 단, 노드간 통신 속도를 신경쓰면서 코딩해야 성능을 끌어낼 수 있음
분산처리 문제
- 분산처리로 넘어가면서 신경써야될 문제가 많아짐
- 속도: 많은 네트워크 통신을 필요로 하는 작업은 속도가 저하
- 부분실패: 일부 노드가 프로그램과 상관 없는 이유로 인해 실패
- 필터링을 한 후에 노드간 통신이 이루어져야 성능이 좋음
Structured Data와 RDD
Key-Value RDD
-
정의
- Key와 Value 쌍을 갖는 Key-Value RDD
- (Key, Value) 쌍을 갖기 때문에 Pairs RDD라고도 불림
- 간단한 데이터베이스처럼 다룰 수 있음
- 예) 쇼핑몰에서 상품당 평점 구하기
-
Key Value RDD로 할 수 있는 것들
- Reduction
- reduceByKey() - 키값을 기준으로 task 처리
- groupByKey() - 키값을 기준으로 Value를 grouping
- sortByKey() - 키값을 기준으로 정렬
- keys() - key 값 추출
- values() - value 값 추출
rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
- Join
- join
- rightOuterJoin
- leftOuterJoin
- subtractByKey
-
Mapping values
- Key Value RDD 에서 Key를 바꾸지 않는 경우
map()
함수 대신 value만 다루는 mapValues()
함수를 사용하자 (Spark 내부에서 파티션을 유지할 수 있어 효율적)
mapValues()
, flatMapValues()
- 결과값으로 새로운 RDD를 반환
- 지연실행(Lazy Execution)
- map(), flatMap(), filter(), distinct(), reduceByKey(), groupByKey(), sortByKey() 등
- Wide Transformation과 Narrow Transformation가 존재
- Wide Transformation
- 1:1 변환
- filter(), map(), flatMap(), union(), sample()
- 1열의 데이터를 조작하기 위해 다른 열/파티션의 데이터를 사용할 필요가 없음
- 정렬이 필요하지 않은 경우
- Narrow Transformation
- shuffling
- intersection, join, distinct, reduceByKey(), GroupByKey()
- output RDD의 파티션에 다른 파티션의 데이터가 사용될 수 있음
- 많은 네트워크 통신과 리소스가 사용됨(성능을 위해 최적화가 중요)
Actions
- 결과값을 연산하여 출력하거나 저장
- 즉시실행(Eager Execution)
- collect(), count(), countByValue(), take(), top(), reduce(), fold(), foreach()
왜 지연실행 필요한가?
- 머신러닝과 같은 작업은 task에서 처리한 데이터를 disk에 저장하고 다시 task에서 disk의 데이터를 읽는 작업이 반복되는 비효율성이 존재
- 이때 메모리를 활용하면 더욱 효율적인 작업이 가능한데 이때 어떤 데이터를 메모리에 남겨야 할지를 알아야 함
- Transformations는 지연실행 되기 때문에 메모리에 저장 가능
Cache & Persist
- cache()와 persist()로 데이터를 메모리에 저장해 놓고 사용 가능
- 다음과 같은 storage level이 존재
- MEMORY_ONLY: 메모리만 사용
- MEMORY_AND_DISK: 메모리에 더이상 저장할 수 없는 경우 디스크에 저장
- MEMORY_ONLY_SER: 직렬화 된 데이터를 메모리에 저장
- MEMORY_AND_DISK_SER: 직렬화 된 데이터를 메모리와 디스크에 저장
- DISK_ONLY: 디스크만 사용
- _SER 이 포함된 storage level의 경우 직렬화 함으로써 메모리나 디스크의 공간을 절약할 수는 있지만 데이터를 읽어올 때 추가적인 역직렬화 연산이 필요함
- Cache
- default storage level을 사용
- RDD's default storage level: MEMORY_ONLY
- DF's default storage level: MEMORY_AND_DISK
- Persist
- 사용자가 원하는 storage level을 지정 가능