이번 섹션은 Spark 프로그래밍의 “심화 편”이다.
주요 주제는 함수형 연산, 조인, 브로드캐스트 변수, 캐시, 그리고 성능 최적화다.
MovieLens와 Marvel 데이터셋을 기반으로 실제 분석 및 추천 시스템 구축 과정을 다룬다.
목표:
MovieLens의 u.data 파일에서 가장 많이 평가된 영화를 찾는다.
u.data:
UserID, MovieID, Rating, Timestamp
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
schema = StructType([
StructField("userID", IntegerType(), True),
StructField("movieID", IntegerType(), True),
StructField("rating", IntegerType(), True),
StructField("timestamp", LongType(), True)
])
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("data/u.data")
movieCounts = moviesDF.groupBy("movieID").count().orderBy(F.desc("count"))
movieCounts.show(10)
핵심 포인트
.groupBy().count() → 각 영화 ID별 등장 횟수 계산.orderBy(desc("count")) → 내림차순 정렬출력 예시:
| movieID | count |
|---------|-------|
| 50 | 584 |
| 258 | 509 |
| 100 | 508 |
ID만으로는 해석이 어렵다.
이를 위해 u.item 파일의 영화명과 조인한다.
하지만, 대용량 조인 시 브로드캐스트 변수를 이용하면 훨씬 효율적이다.
namesSchema = StructType([
StructField("movieID", IntegerType(), True),
StructField("title", StringType(), True),
StructField("junk", StringType(), True)
])
namesDF = spark.read.option("sep", "|").schema(namesSchema).csv("data/u.item")
popular = movieCounts.join(namesDF, "movieID")
popular.select("title", "count").orderBy(F.desc("count")).show(10)
브로드캐스트 변수를 사용하면 작은 조회 테이블을 클러스터 전역으로 공유할 수 있다.
def loadMovieNames():
names = {}
with open("data/u.item", encoding="ISO-8859-1") as f:
for line in f:
fields = line.strip().split("|")
names[int(fields[0])] = fields[1]
return names
movieNames = spark.sparkContext.broadcast(loadMovieNames())
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
lookupName = udf(lambda movieID: movieNames.value.get(movieID, "Unknown"), StringType())
moviesWithNames = movieCounts.withColumn("title", lookupName(F.col("movieID")))
moviesWithNames.orderBy(F.desc("count")).show(10)
💡 브로드캐스트 변수
.value로 접근하여 Python 객체처럼 사용 가능출력 예시
| title | count |
|-----------------------|-------|
| Star Wars (1977) | 584 |
| Contact (1997) | 509 |
| Fargo (1996) | 508 |
목표:
Marvel Universe 데이터에서 가장 많이 등장한 슈퍼히어로 찾기.
Marvel-names.txt → ID와 이름 매핑Marvel-graph.txt → 히어로 ID와 등장 관계 리스트from pyspark.sql.types import StructType, StructField, IntegerType, StringType
namesSchema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
namesDF = spark.read.option("sep", " ").schema(namesSchema).csv("data/Marvel-names.txt")
graph = spark.read.text("data/Marvel-graph.txt")
from pyspark.sql import functions as F
connections = graph.withColumn("id", F.split(F.col("value"), " ")[0]) \
.withColumn("connections", F.size(F.split(F.col("value"), " ")) - 1)
connectionsByHero = connections.groupBy("id").agg(F.sum("connections").alias("connections"))
mostPopular = connectionsByHero.orderBy(F.desc("connections")).first()
heroName = namesDF.filter(F.col("id") == mostPopular[0]).select("name").first()[0]
print(f"가장 인기 있는 슈퍼히어로: {heroName}, 연결 수: {mostPopular[1]}")
결과:
가장 인기 있는 슈퍼히어로: Captain America, 연결 수: 1937
목표:
두 슈퍼히어로 간의 “분리 정도(Degrees of Separation)” 계산하기
→ 그래프 탐색 알고리즘을 Spark RDD로 구현
(connections, distance, color)코드 요약
hitCounter = sc.accumulator(0)
def bfsMap(node):
heroID, (connections, distance, color) = node
results = []
if color == "GRAY":
for connection in connections:
newDistance = distance + 1
newColor = "GRAY"
if connection == targetHeroID:
hitCounter.add(1)
results.append((connection, ([], newDistance, newColor)))
color = "BLACK"
results.append((heroID, (connections, distance, color)))
return results
이후 RDD 변환, reduceByKey, 반복 수행
→ hitCounter가 증가하면 탐색 성공.
결과적으로 스파이더맨은 Adam 3031과 2단계 분리로 연결되어 있음.
목표:
MovieLens 데이터를 이용해 영화 간 유사도 계산 및 추천 구현
ratings = spark.read.option("sep", "\t").schema(schema).csv("data/u.data")
ratings = ratings.select("userID", "movieID", "rating")
ratings1 = ratings.alias("r1")
ratings2 = ratings.alias("r2")
pairings = ratings1.join(ratings2, (F.col("r1.userID") == F.col("r2.userID")) &
(F.col("r1.movieID") < F.col("r2.movieID"))) \
.select(F.col("r1.movieID").alias("movie1"),
F.col("r2.movieID").alias("movie2"),
F.col("r1.rating").alias("rating1"),
F.col("r2.rating").alias("rating2"))
pairings.cache()
💡 캐시 활용
.cache()로 메모리에 저장 → 반복 쿼리 시 재계산 방지.persist()는 디스크 캐시로 복구성 강화 가능pairStats = pairings.withColumn("xx", F.col("rating1")*F.col("rating1")) \
.withColumn("yy", F.col("rating2")*F.col("rating2")) \
.withColumn("xy", F.col("rating1")*F.col("rating2"))
simDF = pairStats.groupBy("movie1", "movie2").agg(
(F.sum("xy") / (F.sqrt(F.sum("xx")) * F.sqrt(F.sum("yy")))).alias("similarity"),
F.count("xy").alias("numPairs")
).filter((F.col("numPairs") > 50) & (F.col("similarity") > 0.97))
results = simDF.filter((F.col("movie1") == 50) | (F.col("movie2") == 50)) \
.orderBy(F.desc("similarity"))
results.show(10)
출력 예시
| 영화쌍 | 유사도 | 평가자 수 |
|--------|--------|-----------|
| Star Wars ↔ Empire Strikes Back | 0.989 | 200 |
| Star Wars ↔ Return of the Jedi | 0.984 | 185 |
| Star Wars ↔ Raiders of the Lost Ark | 0.972 | 121 |
| 기술 | 설명 | 사용 예시 |
|---|---|---|
| GroupBy & Join | 대규모 집계/병합 연산 | MovieLens 인기 영화 |
| Broadcast Variable | 작은 Lookup 테이블을 전 노드에 배포 | 영화 이름 매칭 |
| UDF | 사용자 정의 함수 | ID → 제목 변환 |
| Accumulator | 클러스터 전역 카운터 | BFS 종료 조건 |
| Cache / Persist | 반복 연산 최적화 | 영화 유사도 계산 |
Spark의 본질은 “데이터를 함수로 다루는 병렬 모델”이다.
이제 RDD 수준의 추상화와 DataFrame 기반 최적화 모두 이해했을 것이다.