스파크 데이터분석

のの·2021년 1월 6일

spark-shell --master yarn --deploy-mode client

sc.textFile(path)
sc.textFile(path).first
sc.textFile(path).take(n)
sc.textFile(path).take(n).length

rdd.count() : RDD안의 객체 수를 반환한다.
rdd.collect() : RDD의 모든 객체를 배열에 담아 반환한다. 이 배열은 클러스터가 아니라 로컬 메모리에 위치한다.
rdd.saveAsTextFile(path) : 디렉터리를 생성하고 그 안에 파일 형태로 각 파티션의 내용을 저장한다.

데이터의 가독성을 높이기 위해 foreach와 println을 사용해서
배열의 각 값을 줄 단위로 출력한다.

sc.textFile(path).take(n).length.foreach(println)

csv의 head 제거

def isHeader(line: String) = line.contains("id_1")
def isHeader(line: String) : Boolean = line.contains("id_1")

val head sc.textFile(path).take(n)

첫행 출력
head.filter(isHeader).foreach(println)
첫행 제외 출력
head.filter(x => !isHeader(x)).foreach(println)
head.filterNot(isHeader).foreach(println)
head.filter(!isHeader(_)).foreach(println)

스파크의 DataFrame은 클러스터에 분산된 데이터 셋이며 데이터의 모든 행이 동일한 시스템에 저장되는 로컬 데이터가 아니기 때문이다.

? 문자는 결측값으로 처리해야 한다.

각 열의 데이터 타입 유추

스파크의 csv 리더는 Reader API에서 설정할 수 있는 옵션을 통해 이 기능을 제공한다.(https://github.com/databricks/spark-csv#features)

ex)

val parsed = spark.read.
option("header","true").
option("nullValue","?").
option("inferSchema","true").
csv("/user/linkage")

스키마 출력 parsed.printSchema()


데이터 형식 및 데이터프레임

json : CSV 형식과 매우 비슷한 스키마 추론 기능 지원
parquet, orc : 열 기반 바이너리 파일 포맷(경쟁자 관계)
jdbc : 표준 JDBC 연결을 통해 관계형 데이터베이스에 연결
libsvm : 밀도가 낮고 (비어 있는 영역이 많은) 레이블이 제공되는 관측값에 사용되는 텍스트 파일 형식
text : 파일의 각 행을 string 형식의 단일 열로 구성한 데이터프레임에 매핑

val d1 = spark.read.format('json').load('file.json')
val d2 = spark.read.json('file.json')

json -> parquet

d1.write.format('parquet').save('file.parquet')
d1.write.parquet('file.parquet')

이미 파일이 존재하는 경우
d2.write.mode(SaveMode.Ignore).parquet('file.parquet')


count : 레코드 수를 계산
countByValue : 레코드의 값으로부터 히스토그램 생성
stats: 요약 통곗값을 계싼

캐시 사용하기

데이터프레임과 RDD에 들어 있는 데이터는 기본적으로 일시적이지만, 스파크에서는 이 데이터를 유지하게끔 하는 메커니즘을 제공한다.

cached.cache()
cached.count()
cached.take(10)

cached 메서드를 호출하면 데이터프레임의 내용을 다음번 계산 때 메모리에 저장한다.

parsed.cache()

parsed.groupBy("is_match").count().orderBy($"count".desc).show()

parsed.agg(avg("cmpsex"),stddev("cmp_sex"), stddev("cmp_sex")).show()

표본 표준편차 stddev
모 표준편차 stddev_pop

parsed 임시 테이블을 Spark SQL 엔진에 등록

parsed.createOrReplaceTempView("/user/linkage")

val data = spark.read.
option("header","true").
option("nullValue","?").
option("inferSchema","true").
csv("/user/linkage")

describe : 요약 통계
describe().show()
describe().select(열1,열2,열3 ...).show()

val matches = data.where("is_match = true")
val matchSummary = matches.describe()

val misses = data.filter($"is_match" === "true")
val missSummary = misses.describe()


데이터프레임의 축 회전과 형태변환

요약 통계(describe)를 변형하기 위해 해야 할 첫 번째 일은 matchSummary와 missSummary를 메트릭의 행과 변수의 열이 있는 넓은 모양에서 메트릭과 변수 쌍의 값 형태로 각 행에 하나의 변수와 하나의 메트릭만 들어있는 긴모양으로 바꾸는 것이다. 그 다음에 긴 모야의 데이터 프레임을 또 다른 넓은 모양의 데이터 프레임으로 바꾸면 변형이 마무리 되는데 이 데이터프레임에서는 변수는 행에 메트릭은 열에 대응될 것이다.

넓은 모양에서 긴 모양으로 변환

flatMap

val schema = summary.schema
val longForm = summary.flatMap(row => {
val metric = row.getString(0)
(1 until row.size).map( i => {
(metric, schema(i).name, row.getString(i).toDouble)
})})

summary 데이터 프레임의 각 row에서 row.getString(0)을 호출하여 해당 행의 메트릭 이름을 위치를 통해 알 수 있다. 다른 열에 대해서는 1의 위치에서 끝까지 flatMap 연산을 반복하여 연속된 튜플을 만든다.
튜플의 첫 번째 항목은 메트릭의 이름이고, 두 번째 항목은 열의 이름 세번째 항목은 통계 값이다. row.getString(i)에 toDouble 메소드를 사용하여 원래의 문자열을 Double 값으로 변환하고 있다.

profile
wannabe developer

0개의 댓글