병렬처리와 분산처리
병렬처리
- 데이터를 여러개로 쪼개고
- 여러 쓰레드에서 각자 task를 적용
- 각자 만든 결과값을 합치는 과정
분산된 환경에서의 병렬처리
- 데이터를 여러개로 쪼개서 여러 노드로 보낸다
- 여러 노드에서 각자 독립적으로 task를 적용
- 각자 만든 결과값을 합치는 과정
분산처리 문제
- 부분 실패 - 노드 몇개가 프로그램과 상관없는 이유로 인해 실패
- 속도 - 많은 네트워크 통신을 필요로 하는 작업은 속도가 저하
처리속도를 항상 생각하며 코드작업
메모리 > 디스크 > 네트워크
정형데이터와 RDD
Key-value RDD
- Key와 Value 쌍을 갖음
- Pairs RDD라고도 함
- 간단한 DB처럼 다룰 수 있음
- key를 기준으로 고차원적인 연산
Single-Value Rdd
- 간단하게 개수를 세거나
- 비정형 데이터를 다루거나
- 1차원적인 연산
Key-value RDD - Reduction Operation
키값을 기준으로 데이터를 묶어서 처리
데이터를 줄이는데 쓰임
- reduceByKey() - 키값을 기준으로 테스크 처리
- groupByKey() - 키값을 기준으로 벨류를 묶는다
- sortByKey() - 키값을 기준으로 정렬
- keys() - 키값 추출
- values() - 벨류값 추출
Key-value RDD - Join Operation
- join
- rightOuterJoin
- leftOuterJoin
- subtractByKey
Key value 데이터에서 key를 바꾸지 않는 경우 map 대신 mapValues를 써주면 Spark 내부 파티션 유지에 효율적
- 결과값으로 새로운 RDD를 반환
- 지연실행 - Lazy Execution
map()
flatMap()
filter()
distinct() - 중복 제거 다른 RDD를 리턴
reduceByKey()
groupByKey()
mapValues()
flatMapValues()
sortByKey()
Actions
- 결과값을 연산하여 출력하거나 저장
- 즉시실행 - Eager Execution
collect() - RDD 안 value를 확인, 실제 운용 환경에서는 사용안함 (데이터를 모두가져오기 때문)
count()
countByValue()
take() - value의 개수를 지정해 확인
first() - 첫 번째 value를 확인
top()
reduce()
fold()
foreach() - 워커노드에 실행
- 1:1 변환
- filter(), map(), flatMap(), sample(), union()
- 1열을 조작하기 위해 다른 열/파티션의 데이터를 쓸 필요가 없다
- 정렬이 필요하지 않은 경우
Wide
- Shuffling
- Intersection and join, distinct, cartesian, reduceByKey(), groupByKey()
- 아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있음
Lazy Execution이 유용한 경우
- 메모리를 최대한 활용할 수 있다.
- 디스크, 네트워크 연산을 최소화 할 수 있다
- 데이터를 다루는 task는 반복되는 경우가 많다
task->disk->task->disk 오래걸림
task -> task 인메모리방식
인메모리방식
어떤 데이터를 메모리에 남겨야할지 알아야 가능하다.
Transformations는 지연실행되기 때문에 메모리에 저장해둘 수 있다
Cache() & Persist()
데이터를 메모리에 남겨둘때 사용하는 함수
Cache
- 디폴트 Storage Level 사용
- RDD: MEMORY_ONLY
- DF: MEMORY_AND_DISK
Persist
- Storage Level을 사용자가 원하는대로 지정 가능
Storage Level
- MEMORY_ONLY: 메모리에만 데이터를 저장하는 것
- MEMORY_AND_DISK: 메모리와 디스크 두 곳에 저장, 메모리에 데이터가 없을 경우 디스크까지 보겠다
- MEMORY_ONLY_SER: 메모리의 용량을 아끼기 위해서 구조화된 데이터를 SERIALIZED하는 과정을 거침
-SER: SERIALIZATION 용량을 아낄 수 있으나 다시 데이터를 읽어올 때 DESERIALIZATION과정을 거쳐야하므로 약간의 연산이 추가되는 트레이드 오프가 있음
- MEOMORY_AND_DISK_SER: 메모리와 디스크의 용량을 아끼기 위해서 구조화된 데이터를 SERIALIZED하는 과정을 거침
- DISK_ONLY: 디스크에만 저장
Spark 클러스터의 내부구조 (Cluster Topology)
Master Worker Topology
Spark는 Master Worker Topology로 구성되어있다.
Spark를 쓰면서 잊지 말아야할 점
- 항상 데이터가 여러곳에 분산되어 있다는 것
- 같은 연산이어도 여러 노드에 걸쳐서 실행된다는 점
기본적인 구조
- Driver Program
스파크의 메인프로그램
개발자나 유저가 프로그램과 상호작용
Transformation 과 Action 을 저장 또는 워커노드에 전달
- SparkContext: 새로운 RDD를 생성
- Worker Node
워커
- Cluster Manager
Driver Program과 Worker Node의 소통을 담당
클러스터 전반에 대한 작업을 담당
일반적으로 Aciton은 워커노드로부터 데이터를 받는 것까지 포함
모두 함수들로만 이루어져있으므로 함수들이 어디서 작동하는지를 항상 생각하면서 작성
궁금한 점
wide transformations
아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있음 이란 말이 여러 파티션의 데이터에서 불러올때만 해당되는 것인가?