잠재요인 모델
다수의 사용자와 아이템 사이에서 관측된 상호작용을 상대적으로 적은 수의 관측되지 않은 숨은 원인으로 설명하려 할 때 사용한다.
행렬분해모델
A(ij) = 사용자 i가 아티스트 j의 음악을 들었다면 값이 존재한다.
A는 희소 행렬이다. 사용자-아티스트의 가능한 모든 조합 중 오직 극소수 만이 실제 데이터로 등장하기 때문에 이 행렬의 원소 대부분은 0이 된다. 이들 행렬 분해 알고리즘은 A를 더 작은 행렬 x와 y의 행렬 곱으로 분해하는데, 이 X와 Y는 매우 길쭉하다. A가 다수의 행과 열을 가지기 때문에 X와 Y는 매우 많은 행을 가지게 되는데 열(K)는 몇개 되지 않는다. K개의 열은 상호작용하는 데이터를 설명하는데 사용하는 잠재요인에 해당한다.
ALS
val rawUserArtistData = spark.read.textFile("/user/music/user_artist_data.txt")
rawUserArtistData.mapPartitions{it => Iterator(it.toSeq.size)}.collect.toSeq
val repartition = rawUserArtistData.repartition(4)
repartition.mapPartitions{it=>Iterator(it.toSeq.size)}.collect.toSeq
// 사용자 ID와 아티스트 ID를 정수로 파싱
DataFrame user | artist
val userArtistDF = rawUserArtistData.map{
line => val Array(user, artist, _*) = line.split(" ")
(user.toInt, artist.toInt)
}.toDF("user","artist")
최대 최소값 확인
userArtistDF.agg( min("user"), max("user"), min("artist"), max("artist")).show()
val rawArtistData = spark.read.textFile("/user/music/artist_data.txt")
rawArtistData.map{ line => val (id, name) = line.span(_ != '\t')
span 메서드는 행을 첫 번째 탭 위치를 기준으로 앞부분과 뒷부분으로 나누고, 앞부분을 아티스트ID, 뒷부분은 (탭을 포함한 공백문자 제외) 아티스트 이름으로 파싱한다. 일부 행은 제대로 파싱되지 않을 탠데
filter를 통해 파싱되지 않은 행을 제거할 수 있지만 파싱을 두번해야한다. 이 때 flatMap 함수가 적합하다. 이 함수는 각 입력에 대응하는 0개 이상의 컬렉션을 하나의 커다란 데이터셋에 펼쳐 놓는다. 이 함수는 스칼라 컬렉션 뿐만 아니라 스칼라의 Option 클래스에서도 사용할 수 있다. Option은 때에 따라서 존재하지 않을 수 있는 그런 값이다 1개 또는 0개의 값을 가지는 간단한 컬렉션이며 1과 0은 각각 Option의 하위 클래스인 Some과 None에 대응한다.
val artistById = rawArtistData.flatMap{
line => val (id, name) = line.span(!='\t')
if(name.isEmpty){
None
} else{
Some((id.toInt, name.trim))
} catch{
case : NumberFormatException => None
}
}
}.toDF("id","name")
artist_alias 파일은 오탈자 혹은 아티스트의 별칭에 대응되는 아티스트 id를 공식 이름에 대응되는 아티스트 id로 매핑한 정보를 제공한다.
badid -> goodid로 매핑시킨 Map 컬렉션 생성
val rawArtistAlias = spark.read.textFile("/user/music/artist_alias.txt")
val artistAlias = rawArtistAlias.flatMap{
line => val Array(artist,alias) = line.split('\t')
if(artist.isEmpty){
None
} else{
Some((artist.toInt,alias.toInt))
}}.collect().toMap
첫번째 모델 생성
1.모든 아티스트의 ID는 공식 명칭의 아티스트 ID와 다를 경우 별칭 데이터 셋을 사용해서 공식 명칭의 아티스트 ID로 바꿔야 한다.
2.모든 입력 행을 파싱하여 적절한 열들로 만들어야한다.
브로드캐스트 변수
클러스터에 있는 실행자 하나당 딱 하나의 복사본만을 보내고 메모리에 유지하도록 할 수 있다.
태스트가 수천 개쯤 되고 클러스터 여러 실행자에서 병렬로 수행될 때는 이와 같은 방법으로 네트워크 트래픽과 메모리 사용량을 크게 줄일 수 있다
브로드캐스트 변수
스파크가 어떤 스테이지를 실행하면 그 스테이지에서 태스크를 실행하는데 필요한 모든 정보를 바이너리 형태로 만든다. 이 바이너리를 실행할 함수의 클러저라고 한다. 클로저는 함수가 참조하는 구동자 상의 모든 자료구조를 포함하여, 스파크는 클러스터 상의 실행자에 보내는 모든 태스크 각각에 클로저를 함께 배포한다.
다수의 태스크가 데이터를 변경할 수 없는 동일한 자료구조에 접근해야 할 때는 브로드캐스트 변수를 쓰는 것이 좋다. 브로드캐스트 변수는 태스크의 클로저에 대한 일반적인 처리 방법을 확장하여 다음과 같은 일을 할 수 있게 해준다.
val dict: Seq[String] = ...
val bDict = spark.sparkContext.broadcast(dict)
def query(path: String) = {
spark.read.textFile(path).map(score(_,bDict.value)) // 각 실행자에 딱 한 번씩만 전송
..
}
반복수행하는 작업에는 데이터를 캐시에 저장하는 쪽이 바람직하다.
"Serialized" 데이터가 객체의 형태가 아니라 직렬화된 바이트 들의 형태로 메모리에 저장되었음을 나타냄
import org.apache.spark.ml.recommendation._
import scala.util.Random
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(10).
setRegParam(0.01).
setAlpha(1.0).
setMaxIter(5).
setUserCol("user").
setItemCol("artist").
setRatingCol("count").
setPredictionCol("prediction").
fit(trainData)
ALSModel 모델은 모델 안에 사용자와 상품 각각에 대해서 10개의 값으로 이뤄진 특징 벡터를 가지며 이 자에서 다루는 문제에서는 이런 벡터가 170만개 이상 만들어진다.
ALSModel 모델은 이런 커다란 사용자-특징, 제품-특징 행렬을 데이터프레임 형태로 가지고 있다.
특징벡터 확인
model.userFactors.show(1, truncate=false)
추천 결과 추출 검사
사용자 한 명의 음악 재생 이력과 추천 내역을 조사하여 아티스트 추천 결과가 직관적으로 타당한지 확인
val userId = 2093760
// 해당 유저가 청취한 아티스트 리스트
val existingArtistIDs = trainData.
filter($"user" == userId).
select("artist").as[Int].collect()
artistById.filter($"id" isin (existingArtistID:_*)).show()
_* 가변인자 문법
ALSModel을 이용하여 한 사용자의 모든 아티스트에 대한 선호도를 점수화하고 점수가 높은 몇 개를 반환하는 기능을 구현
def makeRecommendations(
model: ALSModel,
userId: Int,
howMany: Int): DataFrame = {
val toRecommend = model.itemFactors.
select($"id".as("artist")).
withColumn("user",lit(userID))
// 모든 아티스트 ID를 선택해 사용자 ID와 짝짓는다.
model.transform(toRecommend).
select("artist", "prediction").
orderBy($"prediction".desc).
limit(howMany)
// 아티스트별 점수를 매기고 상위권을 반환한다.
}
이미 청취한 아티스트 ID들을 제외하지 않았음
val topRecommdations = makeRecommendations(model, userId, 5)
topRecommendations.show()
추천 결과로부터 아티스트 ID를 추출하여 해당 아티스트 이름을 찾아본다.
val recommendedArtistIds =
topRecommendations.select("artist").as[Int].collect()