1️⃣ 영화 데이터로 배우는 고급 DataFrame 연산

이번 섹션은 Spark 프로그래밍의 “심화 편”이다.
주요 주제는 함수형 연산, 조인, 브로드캐스트 변수, 캐시, 그리고 성능 최적화다.
MovieLens와 Marvel 데이터셋을 기반으로 실제 분석 및 추천 시스템 구축 과정을 다룬다.


2️⃣ 실습 ① — 인기 영화 찾기 (GroupBy + 정렬)

목표:
MovieLens의 u.data 파일에서 가장 많이 평가된 영화를 찾는다.

📘 데이터 구조

u.data:

UserID, MovieID, Rating, Timestamp
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, LongType

spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

schema = StructType([
    StructField("userID", IntegerType(), True),
    StructField("movieID", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", LongType(), True)
])

moviesDF = spark.read.option("sep", "\t").schema(schema).csv("data/u.data")

movieCounts = moviesDF.groupBy("movieID").count().orderBy(F.desc("count"))
movieCounts.show(10)

핵심 포인트

  • .groupBy().count() → 각 영화 ID별 등장 횟수 계산
  • .orderBy(desc("count")) → 내림차순 정렬
  • Spark의 데이터 병렬 집계 성능을 직접 경험할 수 있는 기본 예제

출력 예시:

| movieID | count |
|---------|-------|
| 50      | 584   |
| 258     | 509   |
| 100     | 508   |

3️⃣ 실습 ② — 영화 이름 조인하기 (Join, Broadcast, UDF)

ID만으로는 해석이 어렵다.
이를 위해 u.item 파일의 영화명과 조인한다.
하지만, 대용량 조인 시 브로드캐스트 변수를 이용하면 훨씬 효율적이다.

방법 1. 단순 Join

namesSchema = StructType([
    StructField("movieID", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("junk", StringType(), True)
])

namesDF = spark.read.option("sep", "|").schema(namesSchema).csv("data/u.item")

popular = movieCounts.join(namesDF, "movieID")
popular.select("title", "count").orderBy(F.desc("count")).show(10)

방법 2. Broadcast + UDF 활용 (고급)

브로드캐스트 변수를 사용하면 작은 조회 테이블을 클러스터 전역으로 공유할 수 있다.

def loadMovieNames():
    names = {}
    with open("data/u.item", encoding="ISO-8859-1") as f:
        for line in f:
            fields = line.strip().split("|")
            names[int(fields[0])] = fields[1]
    return names

movieNames = spark.sparkContext.broadcast(loadMovieNames())

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

lookupName = udf(lambda movieID: movieNames.value.get(movieID, "Unknown"), StringType())

moviesWithNames = movieCounts.withColumn("title", lookupName(F.col("movieID")))
moviesWithNames.orderBy(F.desc("count")).show(10)

💡 브로드캐스트 변수

  • 모든 Executor에 공통 객체를 전송 → 반복 조인 비용 제거
  • .value로 접근하여 Python 객체처럼 사용 가능
  • 자주 참조되지만 크기가 작은 데이터에 최적

출력 예시

| title                 | count |
|-----------------------|-------|
| Star Wars (1977)      | 584   |
| Contact (1997)        | 509   |
| Fargo (1996)          | 508   |

4️⃣ 실습 ③ — 슈퍼히어로 네트워크 분석 (RDD + DataFrame 혼합)

목표:
Marvel Universe 데이터에서 가장 많이 등장한 슈퍼히어로 찾기.

① 데이터 구조

  • Marvel-names.txt → ID와 이름 매핑
  • Marvel-graph.txt → 히어로 ID와 등장 관계 리스트

② 데이터 로드

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

namesSchema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

namesDF = spark.read.option("sep", " ").schema(namesSchema).csv("data/Marvel-names.txt")

graph = spark.read.text("data/Marvel-graph.txt")

③ 관계 수 계산

from pyspark.sql import functions as F

connections = graph.withColumn("id", F.split(F.col("value"), " ")[0]) \
    .withColumn("connections", F.size(F.split(F.col("value"), " ")) - 1)

connectionsByHero = connections.groupBy("id").agg(F.sum("connections").alias("connections"))
mostPopular = connectionsByHero.orderBy(F.desc("connections")).first()

④ 이름 매칭 및 결과 출력

heroName = namesDF.filter(F.col("id") == mostPopular[0]).select("name").first()[0]
print(f"가장 인기 있는 슈퍼히어로: {heroName}, 연결 수: {mostPopular[1]}")

결과:

가장 인기 있는 슈퍼히어로: Captain America, 연결 수: 1937

5️⃣ 실습 ④ — BFS (너비 우선 탐색)과 누산기

목표:
두 슈퍼히어로 간의 “분리 정도(Degrees of Separation)” 계산하기
→ 그래프 탐색 알고리즘을 Spark RDD로 구현

핵심 개념

  • Node 구조: (connections, distance, color)
    • color: WHITE(미방문), GRAY(탐색 대상), BLACK(완료)
  • Accumulator: 클러스터 전역 카운터
    • 모든 Executor가 공유할 수 있는 “전역 상태 변수”

개념 요약

  1. 초기 GRAY 노드(시작점)에서 탐색 시작
  2. 인접 노드를 새 GRAY로 확장
  3. 각 반복에서 distance += 1
  4. 목표 노드를 찾으면 accumulator 증가
  5. accumulator가 0보다 크면 탐색 종료

코드 요약

hitCounter = sc.accumulator(0)

def bfsMap(node):
    heroID, (connections, distance, color) = node
    results = []
    if color == "GRAY":
        for connection in connections:
            newDistance = distance + 1
            newColor = "GRAY"
            if connection == targetHeroID:
                hitCounter.add(1)
            results.append((connection, ([], newDistance, newColor)))
        color = "BLACK"
    results.append((heroID, (connections, distance, color)))
    return results

이후 RDD 변환, reduceByKey, 반복 수행
hitCounter가 증가하면 탐색 성공.
결과적으로 스파이더맨은 Adam 3031과 2단계 분리로 연결되어 있음.


6️⃣ 실습 ⑤ — 영화 추천 시스템 (Collaborative Filtering + Cache)

목표:
MovieLens 데이터를 이용해 영화 간 유사도 계산 및 추천 구현

🔹 개념: 아이템 기반 협업 필터링(Item-Based Collaborative Filtering)

  • 같은 영화를 평가한 사용자의 패턴을 통해 영화 간 유사도 추정
  • 유사도 계산식: 코사인 유사도 (Cosine Similarity)

🔹 핵심 로직

ratings = spark.read.option("sep", "\t").schema(schema).csv("data/u.data")
ratings = ratings.select("userID", "movieID", "rating")

ratings1 = ratings.alias("r1")
ratings2 = ratings.alias("r2")

pairings = ratings1.join(ratings2, (F.col("r1.userID") == F.col("r2.userID")) &
                                   (F.col("r1.movieID") < F.col("r2.movieID"))) \
                   .select(F.col("r1.movieID").alias("movie1"),
                           F.col("r2.movieID").alias("movie2"),
                           F.col("r1.rating").alias("rating1"),
                           F.col("r2.rating").alias("rating2"))
pairings.cache()

💡 캐시 활용

  • .cache()로 메모리에 저장 → 반복 쿼리 시 재계산 방지
  • .persist()는 디스크 캐시로 복구성 강화 가능

🔹 유사도 계산

pairStats = pairings.withColumn("xx", F.col("rating1")*F.col("rating1")) \
    .withColumn("yy", F.col("rating2")*F.col("rating2")) \
    .withColumn("xy", F.col("rating1")*F.col("rating2"))

simDF = pairStats.groupBy("movie1", "movie2").agg(
    (F.sum("xy") / (F.sqrt(F.sum("xx")) * F.sqrt(F.sum("yy")))).alias("similarity"),
    F.count("xy").alias("numPairs")
).filter((F.col("numPairs") > 50) & (F.col("similarity") > 0.97))

🔹 결과 필터링

results = simDF.filter((F.col("movie1") == 50) | (F.col("movie2") == 50)) \
    .orderBy(F.desc("similarity"))
results.show(10)

출력 예시

| 영화쌍 | 유사도 | 평가자 수 |
|--------|--------|-----------|
| Star WarsEmpire Strikes Back | 0.989 | 200 |
| Star WarsReturn of the Jedi  | 0.984 | 185 |
| Star WarsRaiders of the Lost Ark | 0.972 | 121 |

7️⃣ 요약: Spark 최적화 기법

기술설명사용 예시
GroupBy & Join대규모 집계/병합 연산MovieLens 인기 영화
Broadcast Variable작은 Lookup 테이블을 전 노드에 배포영화 이름 매칭
UDF사용자 정의 함수ID → 제목 변환
Accumulator클러스터 전역 카운터BFS 종료 조건
Cache / Persist반복 연산 최적화영화 유사도 계산

Spark의 본질은 “데이터를 함수로 다루는 병렬 모델”이다.
이제 RDD 수준의 추상화와 DataFrame 기반 최적화 모두 이해했을 것이다.

profile
okorion's Tech Study Blog.

0개의 댓글