Dataset은 구조적 API의 기본 데이터 타입
Dataset은 자바 가상 머신을 사용하는 언어인 스칼라와 자바에선만 사용 가능
도메인별 특정 객체를 효과적으로 지원하기 위해서는 인코더(encoder)의 개념이 필요
인코더: 도메인별 특정 객체를 스파크의 내부 데이터 타입으로 매핑하는 시스템
Dataset을 사용해야 하는 두 가지 이유
단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 사용하려면 Dataset이 적합
import org.apache.spark.sql.Encoders;
public class Flight implements Serializable{
String DEST_COUNTRY_NAME;
String ORIGIN_COUNTRY_NAME;
Long DEST_COUNTRY_NAME;
}
Dataset<Flight> flights = spark.read.parquet('/FileStore/tables/2010_summary.parquet/').as(Encoders.bean(Flight.class))
스칼라에서 Dataset을 생성하려면 스칼라 case class 구문을 사용해 데이터 타입을 정의해야함
케이스 클래스는 다음과 같은 특징을 가진 정규 클래스임
#레코드를 표현할 case class 정의(Flight 데이터 타입의 Dataset)
case class Flight(DEST_COUNTRY_NAME:String, ORIGIN_COUNTRY_NAME: String, count: BigInt)
#데이터를 읽어야 DataFrame이 반환
val flightsDF = spark.read.parquet('/FileStore/tables/2010_summary.parquet/')
#as 메서드로 Flight 데이터 타입으로 변환
val flights = flightsDF.as[Flight]
def originIsDestination(flight_row:Flight):Boolean = {
return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}
#목적지 컬럼을 추출하여 매핑
val destinations = flights.map(f => f.DEST_COUNTRY_NAME)
val localDestinations = destinations.take(5)
flights.groupBy('DSET_COUNTRY_NAME').count()
flights.groupByKey(x => x.DSET_COUNTRY_NAME).count()