Resilient Distributed Datasets
RDD는 스파크의 분산 컬렉션 abstraction이며
RDD는 변경 불가능하고 순차적인 혹은 병렬적인 컬렉션이다.
List는 map, flatMap, filter,reduce
SparkSession을 통해 스파크 클러스터를 다룬다. 새로운 job을 실행하려면 먼저 이 스파크 세션 객체를 생성해야한다.
parallelize()
정규 스칼라 컬렉션으로부터 새로운 RDD를 생성한다.
textFile()
HDFS나 로컬 시스템의 텍스트 파일로부터 데이터를 읽어서 String으로 이루어진 RDD를 반환한다.
이 후 RDD를 어떤 타입으로 바꿀건지 알아내야 한다.
Transformation and Actions
트랜스포메이션과 액션은 RDD에 행해지는 서로 다른 종류의 연산(Operation, 함수)입니다
transformer(ex: map, filter) - 변환자
변환자는 새로운 컬렉션을 변환해주는 연산이다. 그러므로 어떤 '단일 값'은 아니다. 예를 들어
어떤 리스트에다가 함수를 적용하고 리스트를 반환받으면 이건 원칙적으로 변환자이다.
accessors(reduce, fold, aggregate) - 접근자
reduce의 파라미터로 어떤 함수를 주면 파라미터 함수를 참조하여 모든 원소를 계산하여 하나의 값으로 반환해준다.
스칼라에서는 변환자와 접근자가 있지만 스파크에서는 변환자 대신 트랜스포메이션(transformation)이 접근자 대신 액션(action)이 있다. 트랜스포메이션의 정의는 스칼라의 변환자와 매우 비슷한데 컬렉션이 아니라 다른 RDD를 결과로 변환하는 연산을 말한다.
액션은 RDD를 가지고 어떤 계산을 한 뒤 어떤 값을 반환하거나 컴퓨터 외부 저장장치에 저장한다. 그러나 RDD를 반환하지는 않는다. 액션은 RDD와 다른 무언가나 값을 반환한다.
트랜스포메이션 - lazy하다
액션 - eager
스파크는 네트워크를 다루는 방식이기 때문에 lazy, eager 개념은 아주 중요하다.
트랜스포메이션은 lazy하고 액션은 eager하다는 특징이 스파크가 잡을 실행할 떄 내부에서 동작하는 네트워크 통신을 상당히 줄여준다.
map은 트랜스포메이션 함수로 실행이 연기되어 아무일도 일어나지 않는다. map의 반환 값은 RDD의 레퍼런스일 뿐이며 그 RDD는 존재하지 않는다. 그럼 어떻게 이 계산 연산이 클러스터에서 완료되었다는 것을 알 수 있을까? 계산 연산을 시작하고 그 결과를 반환받기 위해선 액션 연산을 추가하면 된다. reduce 연산을 적용하면 전체 문자열 RDD의 문자 갯수를 얻을 수 있다.
map이나 filter 연산이 끝나면 실제 계산 연산이 완료되있을 거라 생각하지만 사실 클러스터에서 아무 일도 일어나지 않는다. 액션으로 어떤 값을 반환 받기 전까지 아무 일도 일어나지 않는다.
트랜스포메이션 함수
map : RDD의 각각의 원소에 함수를 적용하여 각 원소에 함수가 적용되어 반환된 값으로 이루어진 다른 RDD를 반환
flatMap : 이 함수는 map의 결과를 flatten한다. Iterator가 반환한 내용의 RDD를 반환한다.
filter : predicate함수로 사용된다. filter의 경우에는 해당 predicate 조건을 통과한 원소들로 이루어진 RDD를 반환한다. filter의 파라미터함수는 f(x) => Boolean의 형태를 가지고 있다.
distinct : distinct on a set operation
RDD의 중복된 원소들을 제거해 중복된 원소가 없는 RDD를 반환한다.
액션함수
collect : 데이터셋의 크기를 줄인 후에, 그 결과로 나온 데이터의 서브 그룹이나 필터링 한 더 작은 데이터 셋을 모을 때 collect를 사용할 수 있다. RDD의 모든 원소들을 얻을 수 있다. 하지만 보통 몇몇 트랜스포메이션을 적용한 후에 collect를 사용한다. 데이터셋이 작아질 것이라는 것을 알기 때문에 collect를 적용해 하나의 컴퓨터에 결과들을 모아 저장한다.
count: RDD의 원소의 갯수를 반환해준다.
take: N개의 T타입 Array를 반환
reduce
foreach 반환 타입이 Unit으로 RDD를 반환하지 않기 때문에 액션함수로 분류된다.
S3에 저장된 로그를 읽어 에러를 분석
연-월-일-시간-분-초
에러로그는 에러가 발생하면 단어 error가 접두사로 들어가는 로그가 찍힌다.
트랜스포메이션 함수가 실행될 시점에는 아무것도 일어나지 않으며 단지 단계를 설정하는 계산이다. 이런 단계 설정 연산들은 결국 실행되기는 하지만 그 시점에서 아직 시작은 하지 않은 연산들이다 lazy 단께를 설정하는 이러한 연산은 더 많이 적용할 수 있다. 여러 개의 단계를 설정하고 큐에다 넣고 마지막에 액션을 취할 수 있다. 개개의 머신들은 데이터를 더하고 결과를 드라이버 노드에게 보낸다. 로그 데이터를 aggregate하고 combine하여 로그에서 에러가 등장하는 횟수를 하나의 정수 혹은 long으로 받을 것이다.
union
intersection
val rdd3 = rdd1.union(rdd2) // 합집합
val rdd4 = rdd1.intersection(rdd2) // 교진합
val rdd5 = rdd1.subtract(rdd2) // 차집합
Cartesian product
스칼라 컬렉션에 존재하지 않는 액션함수, 분산 데이터 혹은 매우 큰 데이터를 다룰 때 유용한다.
takeSample : 만약 데이터에서 샘플을 추출하거나 그 크기를 줄이고 싶을 떄 takeSample은 원하는 갯수 만큼 랜덤하게 원소를 반환해주는데 RDD가 아닌 Array로 준다.
매우 큰 데이터 셋에서 각기 다른 샘플을 추출하여 서브샘플들을 한 머신에다가 모으는ㄷ 분산되어 존재하는 RDD가 아닌 한 Array 형태로 모은다.
takeOrdered : 이 함수는 take 함수와 동일한데 그 값을 정렬된 상태로 받는다.
이 함수를 통해 자연적인 순서든 나만의 custom 비교를 통한 순서든 처음 n개의 원소들을 반환받을 수 있다.
implicit 순서 파라미터
save
saveAsTextFile
saveAsSequenceFile : 각 RDD 원소들을 로컬 파일시스템이나 HDFS에 Hadoop SequenceFile 형태로 저장한다.
Evaluation
스파크가 데이터 과학에 좋은 이유?
대부분의 데이터 과학 문제는 반복을 포함한다.
변환, 동작 및 메모리 내 계산을 어떻게 되는가?
하둡에서의 반복작업은 읽기와 쓰기가 반복의 각각에서 진행되며 새로운 반복을 시작할 떄마다 중간 데이터를 작성해야한다. 디스크 IO 또는 네트워크 IO에서 많은 시간을 소비한다. 스파크에서 반복을 수행하려면 모든 읽기 및 디스크 쓰기가 spark에서 완전히 피할 수 있기 때문에 훨씬 더 성능이 좋을 수 있다. 물론 HDFS에서 데이터를 읽어야한다. 하지만 일단 작업이 모두 메모리에 있으면 메모리에서 반복할 수 있다.