PySpark 기본 사용 방법

이형욱·2025년 7월 4일

pySpark Session 생성


from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApplication") \ # 실행하는 앱 이름
    .master("spark://spark-master:7077") \ // master 노드 지정
    .getOrCreate()

MySQL에서 데이터 로딩


mysql_host = "mysql"
mysql_port = "portnumber"
mysql_database = "database"
mysql_username = "username"
mysql_password = "userpassword"
mysql_query = "query"

df = spark.read.format("jdbc").options(
        url=f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable=mysql_query,
        user=mysql_username,
        password=mysql_password
    ).load()

DataFrame이란?


  • 분산된 테이블 데이터를 다루는 구조.
  • Pandas의 DataFrame과 유사하게 생겼음.
  • 대용량 데이터를 클러스터에서 병렬처리할 수 있게 설계됨.
  • 특징 5가지
    • 컬럼 + 행 구조(SQL 테이블 처럼)
    • 각 컬럼의 타입을 명시적으로 가진다.
    • 지연 실행(Lazy Evaluation)
      • .select() 또는 .filter()등은 실제 실행되지 않는다.
      • 액션(.show() 또는 .collect())이 있어야 실행된다.
    • 분산 처리
      • 데이터가 여러 워커 노드에 분산되어 저장 및 처리된다.

자주 사용하는 함수


  • show()
    • df 출력.
    • 옵션
      • n : 위에서 n개 줄 출력 ex) n=5 → 위에서 5개 출력
      • truncate: 각 셀의 문자열 최대 길이 출력. ex)truncate=100 → 최대 100자 출력
  • select()
    • 특정 컬럼 선택
  • filter() 또는 where()
    • 조건 필터링
  • groupBy().agg()
    • 집계
  • withColumn()
    • 컬럼 추가
  • drop()
    • 컬럼 삭제
  • orderBy()
    • 정렬
  • join()
    • 조인
  • printSchema()
    • 컬럼 타입 확인
  • toPandas()
    • Pandas DataFrame으로 변환(작은 데이터일때만)
  • 예시
    df.filter(df["age"] > 26) \
      .select("name") \
      .orderBy("name") \
      .show()

성능향상을 위한 팁


  • .cache()
    • 반복 사용하는 df를 캐싱
  • .repartition(n)
    • 파티션 수 조절(병렬성 조절)
  • .explain()
    • 실행 계획 확인(SQL처럼)
  • persist()
    • 디스크까지 유지

UDF(User-Defined Function)


  • pySpark의 DataFrame 연산에서 사용자 정의 함수를 컬럼 단위로 적용하기 위한 도구
  • 내가 직접 만든 함수를 컬럼에 적용하려면 사용해야한다.
  • 기본 함수로 해결안될 때 사용한다. ex)전처리
  • 단점
    • 느리다. 최적화가 안된다.
  • 기본 사용법
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    # Python 함수 정의
    def to_upper(text):
        return text.upper()
    
    # UDF로 등록
    to_upper_udf = udf(to_upper, StringType())
    
    # DataFrame에 적용
    df = df.withColumn("upper_name", to_upper_udf(df["name"]))
  • 성능 팁
    • 같은 값 반복 사용하면 broadcast variable 사용한다. → 불용어 리스트를 UDF에서 반복 사용하면 비효율적이므로 broadcast한다. ex)
      broadcast_stopwords = spark.sparkContext.broadcast(["이", "그", "저", "있다", "하다"])
      
      @udf(ArrayType(StringType()))
      def remove_stopwords(nouns):
          sw = broadcast_stopwords.value
          return [n for n in nouns if n not in sw]
    • 자주 쓰는 UDF 는 .cache()로 중간 결과를 저장한다.
      • .cache()는 Spark DataFrame을 메모리에 저장해서 재사용할 때 빠르게 쓰기 위한 방법.
      • spark는 기본적으로 지연 평가이기 때문에 .withColumn()이나 .select() 등을 여러 번 쓰면, 매변 처음부터 전체 DAG를 다시 계산한다.
      • 중간 결과를 여러 번 사용할 예정이면 .cache()해놓으면 다음 연산부터는 메모리에 저장된 결과를 재활용한다.

KIWI UDF 적용


from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from kiwipiepy import Kiwi

@udf(ArrayType(StringType()))
def extract_nouns(text):
if text is None:
return []
kiwi = Kiwi() 
return [token.form for token in kiwi.tokenize(text) if token.tag.startswith("N")]

불용어 제거 UDF


stopwords = set(["있다", "되다", "하다", "이다", "등", "또한", "그", "이", "수", "것"])
broadcast_stopwords = spark.sparkContext.broadcast(stopwords)

@udf(ArrayType(StringType()))
def remove_stopwords(nouns):
    return [n for n in nouns if n not in broadcast_stopwords.value]

df = df.withColumn("filtered_nouns", remove_stopwords(df["nouns"]))

CountVectorizer 코드


from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="filtered_nouns", outputCol="rawFeatures", vocabSize=10, minDF=2)
cv_model = cv.fit(df)
featurized_data = cv_model.transform(df)
featurized_data.select("rawFeatures").show()

IDF


from pyspark.ml.feature import IDF

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

Silhouette_scores


from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

silhouette_scores = []

for k in range(2, 11):  # K=2부터 10까지 시도
    kmeans = KMeans(featuresCol="features", predictionCol="prediction", k=k, seed=42)
    model = kmeans.fit(rescaled_data)
    predictions = model.transform(rescaled_data)

    evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction", metricName="silhouette", distanceMeasure="squaredEuclidean")
    silhouette = evaluator.evaluate(predictions)
    silhouette_scores.append((k, silhouette))
    print(f"K={k} → Silhouette Score = {silhouette:.4f}")

KMeans


best_k = max(silhouette_scores, key=lambda x: x[1])[0]  # 가장 높은 점수의 K

kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=best_k, seed=42)
model = kmeans.fit(rescaled_data)
final_clustered = model.transform(rescaled_data)

군집 중심 벡터 가져오기.


from collections import Counter
import numpy as np

# 군집 중심 벡터 가져오기
centers = model.clusterCenters()

# 단어 사전 (index → 단어)
vocab = cv_model.vocabulary

# 전체 키워드 점수 저장
keyword_scores = Counter()

for center in centers:
    top_indices = np.argsort(center)[::-1][:10]  # 각 클러스터 상위 10개
    for idx in top_indices:
        keyword = vocab[idx]
        keyword_scores[keyword] += center[idx]  # 점수 누적

최종 핵심 키워드 출력


top_keywords = keyword_scores.most_common(10)
print("최종 핵심 키워드:")
for word, score in top_keywords:
    print(f"- {word} ({score:.4f})")

Kmeans와 TF-IDF 코드


from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApplication") \ # 실행하는 앱 이름
    .master("spark://spark-master:7077") \ // master 노드 지정
    .getOrCreate()
    
mysql_host = "mysql"
mysql_port = "portnumber"
mysql_database = "database"
mysql_username = "username"
mysql_password = "userpassword"
mysql_query = "query"

df = spark.read.format("jdbc").options(
        url=f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable=mysql_query,
        user=mysql_username,
        password=mysql_password
    ).load()
    
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from kiwipiepy import Kiwi

@udf(ArrayType(StringType()))
def extract_nouns(text):
if text is None:
return []
kiwi = Kiwi()
return [token.form for token in kiwi.tokenize(text) if token.tag.startswith("N")]

stopwords = set(["있다", "되다", "하다", "이다", "등", "또한", "그", "이", "수", "것"])
broadcast_stopwords = spark.sparkContext.broadcast(stopwords)

@udf(ArrayType(StringType()))
def remove_stopwords(nouns):
    return [n for n in nouns if n not in broadcast_stopwords.value]

df = df.withColumn("filtered_nouns", remove_stopwords(df["nouns"]))

from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="filtered_nouns", outputCol="rawFeatures", vocabSize=10, minDF=2)
cv_model = cv.fit(df)
featurized_data = cv_model.transform(df)
featurized_data.select("rawFeatures").show()

from pyspark.ml.feature import IDF

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

silhouette_scores = []

for k in range(2, 11):  # K=2부터 10까지 시도
    kmeans = KMeans(featuresCol="features", predictionCol="prediction", k=k, seed=42)
    model = kmeans.fit(rescaled_data)
    predictions = model.transform(rescaled_data)

    evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction", metricName="silhouette", distanceMeasure="squaredEuclidean")
    silhouette = evaluator.evaluate(predictions)
    silhouette_scores.append((k, silhouette))
    print(f"K={k} → Silhouette Score = {silhouette:.4f}")
    
best_k = max(silhouette_scores, key=lambda x: x[1])[0]  # 가장 높은 점수의 K

kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=best_k, seed=42)
model = kmeans.fit(rescaled_data)
final_clustered = model.transform(rescaled_data)

from collections import Counter
import numpy as np

# 군집 중심 벡터 가져오기
centers = model.clusterCenters()

# 단어 사전 (index → 단어)
vocab = cv_model.vocabulary

# 전체 키워드 점수 저장
keyword_scores = Counter()

for center in centers:
    top_indices = np.argsort(center)[::-1][:10]  # 각 클러스터 상위 10개
    for idx in top_indices:
        keyword = vocab[idx]
        keyword_scores[keyword] += center[idx]  # 점수 누적
        
        
top_keywords = keyword_scores.most_common(10)
print("최종 핵심 키워드:")
for word, score in top_keywords:
    print(f"- {word} ({score:.4f})")

spark.stop()
profile
바나나는 하드디스크보다 따듯하다.

0개의 댓글