Spark 완벽 가이드 ch11. Dataset

Q·2023년 1월 18일
0

Spark 완벽 가이드

목록 보기
12/24
  • Dataset은 구조적 API의 기본 데이터 타입

    • DataFrame은 Row 타입의 Dataset
  • Dataset은 자바 가상 머신을 사용하는 언어인 스칼라와 자바에선만 사용 가능

  • 도메인별 특정 객체를 효과적으로 지원하기 위해서는 인코더(encoder)의 개념이 필요

    • 스칼라에서는 스키마가 정의된 케이스 클래스 객체를 사용해 Dataset 정의
    • 자바에서는 자바빈객체를 사용해 Dataset 정의
  • 인코더: 도메인별 특정 객체를 스파크의 내부 데이터 타입으로 매핑하는 시스템

    • Dataset API 사용 시 스파크는 데이터셋에 접근할 때마다 사용자 정의 데이터 타입으로 변환
      • 이 변환 작업은 느림
      • 하지만 더 많은 유연성 제공

Dataset을 사용할 시기

  • Dataset을 사용해야 하는 두 가지 이유

    • DataFrame 기능만으로는 수행할 연산을 표현할 수 없는 경우
    • 성능 저하를 감수하더라도 타입 안정성(type-safe)을 가진 데이터 타입을 사용하고 싶은 경우
  • 단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 사용하려면 Dataset이 적합

    • 케이스 클래스로 구현된 데이터 타입을 사용해 모든 데이터와 트랜스포메이션을 정의하면 재사용 가능

Dataset 생성

  • Dataset을 생성하는 것은 수동 작업이므로 정의할 스키마를 미리 알아야함

자바: Encoders

  • 데이터 타입 클래스를 정의하고 DataFrame에 지정해 인코딩
import org.apache.spark.sql.Encoders;

public class Flight implements Serializable{
  String DEST_COUNTRY_NAME;
  String ORIGIN_COUNTRY_NAME;
  Long DEST_COUNTRY_NAME;
}
Dataset<Flight> flights = spark.read.parquet('/FileStore/tables/2010_summary.parquet/').as(Encoders.bean(Flight.class))

스칼라: 케이스 클래스

  • 스칼라에서 Dataset을 생성하려면 스칼라 case class 구문을 사용해 데이터 타입을 정의해야함

  • 케이스 클래스는 다음과 같은 특징을 가진 정규 클래스임

    • 불변성
      • 객체들이 언제 어디서 변경되었는지 추적할 필요 없음
    • 패턴 매칭으로 분해 가능
      • 로직 분기를 단순화해 버그를 줄이고 가독성을 좋게 만듦
    • 참조값 대신 클래스 구조를 기반으로 비교
      • 값으로 비교하면 인스턴스를 마치 원시 데이터 타입의 값처럼 비교함
      • 따라서 클래스 인스턴스가 값으로 비교되는지, 참조로 비교되는지 더는 불확실하지 않게됨
    • 사용하기 쉽고 다루기 편함
#레코드를 표현할 case class 정의(Flight 데이터 타입의 Dataset)
case class Flight(DEST_COUNTRY_NAME:String, ORIGIN_COUNTRY_NAME: String, count: BigInt)  
#데이터를 읽어야 DataFrame이 반환
val flightsDF = spark.read.parquet('/FileStore/tables/2010_summary.parquet/')

#as 메서드로 Flight 데이터 타입으로 변환
val flights = flightsDF.as[Flight]  

액션

  • Dataset과 DataFrame에 collect, take, count 같은 액션을 적용할 수 있음
  • 케이스 클래스에 실제로 접근할 때 어떠한 데이터 타입도 필요하지 않음
  • case class 속성명을 지정하면 속성에 맞는 값과 데이터 타입 모두를 반환함

트랜스포메이션

  • Dataset 트랜스포메이션은 DataFrame과 동일
  • Dataset을 사용하면 원형의 JVM 데이터 타입을 다루므로 DataFrame만 사용해서 트랜스포메이션을 수행하는 것보다 좀 더 복잡하고 강력한 데이터 타입으로 수행 가능

필터링

  • 필터링은 단순한 트랜스포메이션


    ex) 출발지가 도착지와 동일한지 반환하는 함수 생성
def originIsDestination(flight_row:Flight):Boolean = {
  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}  
  • 위 함수는 사용자 정의 함수가 아닌 일반 함수임
  • 따라서 모든 로우를 평가하므로 매우 많은 자원을 사용
  • 단순 필터라면 SQL표현식을 사용하는 것이 좋음

매핑

  • 특정 값을 다른 값으로 매핑
    ex) Flight데이터 타입을 입력으로 사용해 불리언값 반환하는 함수 생성
#목적지 컬럼을 추출하여 매핑
val destinations = flights.map(f => f.DEST_COUNTRY_NAME)
val localDestinations = destinations.take(5)  
  • 스파크는 결과로 반환할 JVM 데이터 타입을 알고 있기에 컴파일 타임에 데이터 타입의 유효성을 검사할 수 있음

조인

  • joinWith 메서드 제공
  • 각 컬럼은 단일 Dataset이므로 Dataset 객체를 컬럼처럼 다룰 수 있음
  • 따라서 조인 수행 시 더 많은 정보를 유지할 수 있고, 고급 맵이나 필터처럼 정교하게 데이터를 다룰 수 있다.

그룹화와 집계

  • 기본 표준을 따르므로 groupBy, rollup, cube 메서드를 사용할 수 있음
  • 하지만 Dataset 대신 DataFrame을 반환하므로 데이터 타입 정보를 잃음
  • 근데 유지할 방법이 있긴 함 -> groupByKey
    • groupByKey 메서드: Dataset의 특정 키를 기준으로 그룹화하고 형식화된 Dataset 반환
      • groupByKey 메서드의 파라미터로 컬럼명 대신 함수를 사용하여 유연성을 얻을 수 있음
    • 하지만 스파크는 함수와 JVM 데이터 타입을 최적화할 수 없으므로 트레이드오프가 발생
flights.groupBy('DSET_COUNTRY_NAME').count()
flights.groupByKey(x => x.DSET_COUNTRY_NAME).count()
profile
Data Engineer

0개의 댓글