spark-shell --master yarn --deploy-mode client
sc.textFile(path)
sc.textFile(path).first
sc.textFile(path).take(n)
sc.textFile(path).take(n).length
rdd.count() : RDD안의 객체 수를 반환한다.
rdd.collect() : RDD의 모든 객체를 배열에 담아 반환한다. 이 배열은 클러스터가 아니라 로컬 메모리에 위치한다.
rdd.saveAsTextFile(path) : 디렉터리를 생성하고 그 안에 파일 형태로 각 파티션의 내용을 저장한다.
데이터의 가독성을 높이기 위해 foreach와 println을 사용해서
배열의 각 값을 줄 단위로 출력한다.
sc.textFile(path).take(n).length.foreach(println)
csv의 head 제거
def isHeader(line: String) = line.contains("id_1")
def isHeader(line: String) : Boolean = line.contains("id_1")
val head sc.textFile(path).take(n)
첫행 출력
head.filter(isHeader).foreach(println)
첫행 제외 출력
head.filter(x => !isHeader(x)).foreach(println)
head.filterNot(isHeader).foreach(println)
head.filter(!isHeader(_)).foreach(println)
스파크의 DataFrame은 클러스터에 분산된 데이터 셋이며 데이터의 모든 행이 동일한 시스템에 저장되는 로컬 데이터가 아니기 때문이다.
? 문자는 결측값으로 처리해야 한다.
각 열의 데이터 타입 유추
스파크의 csv 리더는 Reader API에서 설정할 수 있는 옵션을 통해 이 기능을 제공한다.(https://github.com/databricks/spark-csv#features)
ex)
val parsed = spark.read.
option("header","true").
option("nullValue","?").
option("inferSchema","true").
csv("/user/linkage")
스키마 출력 parsed.printSchema()
데이터 형식 및 데이터프레임
json : CSV 형식과 매우 비슷한 스키마 추론 기능 지원
parquet, orc : 열 기반 바이너리 파일 포맷(경쟁자 관계)
jdbc : 표준 JDBC 연결을 통해 관계형 데이터베이스에 연결
libsvm : 밀도가 낮고 (비어 있는 영역이 많은) 레이블이 제공되는 관측값에 사용되는 텍스트 파일 형식
text : 파일의 각 행을 string 형식의 단일 열로 구성한 데이터프레임에 매핑
val d1 = spark.read.format('json').load('file.json')
val d2 = spark.read.json('file.json')
json -> parquet
d1.write.format('parquet').save('file.parquet')
d1.write.parquet('file.parquet')
이미 파일이 존재하는 경우
d2.write.mode(SaveMode.Ignore).parquet('file.parquet')
count : 레코드 수를 계산
countByValue : 레코드의 값으로부터 히스토그램 생성
stats: 요약 통곗값을 계싼
캐시 사용하기
데이터프레임과 RDD에 들어 있는 데이터는 기본적으로 일시적이지만, 스파크에서는 이 데이터를 유지하게끔 하는 메커니즘을 제공한다.
cached.cache()
cached.count()
cached.take(10)
cached 메서드를 호출하면 데이터프레임의 내용을 다음번 계산 때 메모리에 저장한다.
parsed.cache()
parsed.groupBy("is_match").count().orderBy($"count".desc).show()
parsed.agg(avg("cmp_sex")).show()
표본 표준편차 stddev
모 표준편차 stddev_pop
parsed 임시 테이블을 Spark SQL 엔진에 등록
parsed.createOrReplaceTempView("/user/linkage")
val data = spark.read.
option("header","true").
option("nullValue","?").
option("inferSchema","true").
csv("/user/linkage")
describe : 요약 통계
describe().show()
describe().select(열1,열2,열3 ...).show()
val matches = data.where("is_match = true")
val matchSummary = matches.describe()
val misses = data.filter($"is_match" === "true")
val missSummary = misses.describe()
데이터프레임의 축 회전과 형태변환
요약 통계(describe)를 변형하기 위해 해야 할 첫 번째 일은 matchSummary와 missSummary를 메트릭의 행과 변수의 열이 있는 넓은 모양에서 메트릭과 변수 쌍의 값 형태로 각 행에 하나의 변수와 하나의 메트릭만 들어있는 긴모양으로 바꾸는 것이다. 그 다음에 긴 모야의 데이터 프레임을 또 다른 넓은 모양의 데이터 프레임으로 바꾸면 변형이 마무리 되는데 이 데이터프레임에서는 변수는 행에 메트릭은 열에 대응될 것이다.
넓은 모양에서 긴 모양으로 변환
flatMap
val schema = summary.schema
val longForm = summary.flatMap(row => {
val metric = row.getString(0)
(1 until row.size).map( i => {
(metric, schema(i).name, row.getString(i).toDouble)
})})
summary 데이터 프레임의 각 row에서 row.getString(0)을 호출하여 해당 행의 메트릭 이름을 위치를 통해 알 수 있다. 다른 열에 대해서는 1의 위치에서 끝까지 flatMap 연산을 반복하여 연속된 튜플을 만든다.
튜플의 첫 번째 항목은 메트릭의 이름이고, 두 번째 항목은 열의 이름 세번째 항목은 통계 값이다. row.getString(i)에 toDouble 메소드를 사용하여 원래의 문자열을 Double 값으로 변환하고 있다.