[Apache Spark] RDD

연수·2021년 11월 30일
0

spark

목록 보기
12/26

👉 저수준 API란?

  1. 분산 데이터 처리를 위한 RDD
  2. 브로드캐스트 변수와 어큐뮬레이터처럼 분산형 공유 변수를 배포하고 다루기 위한 API

1️⃣ 언제 사용할까?!

  • 고수준 API에서 제공하지 않는 기능이 필요한 경우 예) 클러스터의 물리적 데이터의 배치를 세밀하게 제어해야 하는 상황
  • RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우
  • 사용자가 정의한 공유 변수를 다뤄야 하는 경우

위와 같은 상황에서만 저수준 API 기능을 사용하지만, 스파크의 모든 워크로드는 저수준 기능을 사용하는 기초적인 형태로 컴파일되므로 이를 이해하는 것이 중요하다! (DataFrame 트랜스포메이션을 호출하면 실제로 다수의 RDD 트랜스포메이션으로 변환된다.)

2️⃣ 어떻게 사용할까?!

  • SparkContext: 저수준 API 기능을 사용하기 위한 진입 지점 spark.sparkContext

 

🐇 RDD

  • 스파크 1.x 버전의 핵심 API
  • DataFrame이나 Dataset 코드는 RDD로 컴파일된다.
  • 스파크 UI에서 RDD 단위로 잡이 수행된다.
  • RDD는 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음이다.
  • RDD의 레코드는 자바, 스칼라, 파이썬의 객체
  • 이러한 객체에는 사용자가 원하는 포맷을 사용해 원하는 모든 데이터를 저장할 수 있다. → 강력한 제어권을 가질 수 있지만, 모든 값을 다루거나 값 사이의 상호작용 과정을 수동으로 정의해야 한다.
  • 구조적 API와 다르게 레코드의 내부 구조를 스파크에서 파악할 수 없으므로 최적화를 하려면 더 많은 수작업이 필요하다.
  • RDD와 Dataset 사이의 전환은 매우 쉬우므로 두 API를 모두 사용해 각 API의 장점을 동시에 활용할 수 있다.

👻 RDD 유형

  • RDD는 DataFrame API에서 최적화된 물리적 실행 계획을 만드는 데 대부분 사용된다.
  • 두 가지 타입의 RDD
    • 제네릭 RDD
    • 키-값 RDD (특수 연산, 키를 이용한 사용자 지정 파티셔닝)
  • RDD의 주요 속성
    • 파티션의 목록
    • 각 조각을 연산하는 함수
    • 다른 RDD와의 의존성 목록
    • (부가적) 키-값 RDD를 위한 Partitioner
    • (부가적) 각 조각을 연산하기 위한 기본 위치 목록
  • 이러한 속성은 프로그램을 스케줄링하고 실행하는 스파크의 모든 처리 방식을 결정한다.
  • RDD도 분산 환경에서 데이터를 다루는 데 필요한 지연 처리 방식의 트랜스포메이션과 즉시 실행 방식의 액션을 제공한다. (단, '로우'라는 개념은 없다.)
  • 파이썬으로 RDD를 다룰 때에는 성능 저하가 발생할 수 있다. 직렬화 과정을 거친 데이터를 파이썬 프로세스에 전달하고, 파이썬에서 처리가 끝나면 다시 직렬화하여 자바 가상 머신에 반환한다. → 높은 오버헤드 발생! (구조적 API 사용 권장)

🦄 언제 사용할까?!

  • 정말 필요한 경우가 아니라면 수동으로 RDD를 생성하면 안 된다.
  • RDD는 많은 강점이 있지만, 구조적 API가 제공하는 여러 최적화 기법을 사용할 수 없으며, DataFrame에 비해 안정성과 표현력이 부족하다.
  • 물리적으로 분산된 데이터(데이터 파티셔닝)에 세부적인 제어가 필요할 때 RDD를 사용하는 것이 가장 적합하다.

🎠 Dataset과 케이스 클래스를 사용해 만들어진 RDD의 차이점

  • Dataset은 구조적 API가 제공하는 풍부한 기능과 최적화 기법을 제공
  • Dataset을 사용하면 JVM 데이터 타입과 스파크 데이터 타입 중 어떤 것을 사용하더라도 성능이 동일하므로 쉽게 사용할 수 있다. 유연하게 대응이 가능한 데이터 타입을 선택하면 된다!

 

🖍️ RDD 생성하기

  • Row 타입: 스파크가 구조적 API 데이터를 표현하는 데 사용하는 내부 카탈리스트 포맷. Row 타입을 가진 RDD를 생성하면 구조적 API와 저수준 API를 오고가게 만들 수 있다.
  • 컬렉션 객체로 RDD 생성
  • 데이터소스로 RDD 생성 (DataSource API)

 

🌳 트랜스포메이션

  • RDD에 트랜스포메이션을 지정해 새로운 RDD를 생성할 수 있다.
  • 이때 RDD에 포함된 데이터를 다루는 함수에 따라 다른 RDD에 대한 의존성도 함께 정의한다.
  • distinct, filter, map, flatMap, sortBy, randomSplit

 

🍑 액션

  • 지정된 트랜스포메이션 연산을 시작하려면 액션을 사용한다.
  • 액션은 데이터를 드라이버로 모으거나 외부 데이터소스로 내보낼 수 있다.
  • reduce, count, countApprox, countApproxDistinct, countByValue, countByValueApprox, first, max/min, take

 

🥪 파일 저장하기

  • 데이터 처리 결과를 일반 텍스트 파일로 쓰는 것
  • RDD를 사용하면 일반적인 데이터소스에 저장할 수 없다.
  • 각 파티션의 내용을 저장하려면 전체 파티션을 순회하면서 외부 데이터베이스에 저장해야 한다. (= 고수준 API의 내부 처리 과정을 저수준 API로 구현하는 접근법)
  • 스파크는 각 파티션의 데이터를 파일로 저장한다.
  • 텍스트 파일, 시퀀스 파일, 하둡 파일

 

🌪️ 캐싱

  • RDD를 캐시하거나 저장(persist)할 수 있다.
  • 기본적으로 캐시와 저장은 메모리에 있는 데이터만을 대상으로 한다.
  • 저장소 수준은 싱글턴 객체인 org.apache.spark.storage.StorageLevel의 속성(메모리, 디스크, 둘의 조합, off-heap 등) 중 하나로 지정할 수 있다.

 

📌 체크포인팅

  • RDD를 디스크에 저장하는 방식
  • 나중에 저장된 RDD를 참조할 때 디스크에 저장된 중간 결과 파티션을 참조한다.
  • 메모리 대신 디스크에 저장한다는 사실만 제외하면 캐싱과 유사하다.
  • 반복적인 연산 수행 시 유용한 기능

 

📤 RDD를 시스템 명령으로 전송하기

  • pipe 메서드를 사용하면 파이핑 요소로 생성된 RDD를 외부 프로세스로 전달할 수 있다.
  • 이때 외부 프로세스는 파티션마다 한 번씩 처리해 결과 RDD를 생성한다.
  • 각 입력 파티션의 모든 요소는 개행 문자 단위로 분할되어 여러 줄의 입력 데이터로 변경된 후 프로세스의 표준 입력(stdin)에 전달된다.
  • 결과 파티션은 프로세스의 표준 출력(stdout)으로 생성된다.
  • 표준 출력의 각 줄은 출력 파티션의 하나의 요소가 되며, 비어 있는 파티션을 처리할 때도 프로세스는 실행된다.
  • 사용자가 정의한 두 함수를 인수로 전달하면 출력 방식을 변경할 수 있다.
  • mapPartitions: 개별 파티션에 대해 map 연산을 수행
  • foreachPartition: 파티션의 모든 데이터를 순회할 뿐 결과는 반환 X, 개별 파티션에서 특정 작업을 수행하는 데 매우 적합한 함수
  • glom: 데이터셋의 모든 파티션을 배열로 변환

 

[출처] 스파크 완벽 가이드 (빌 체임버스, 마테이 자하리아 지음)

profile
DCDI

0개의 댓글