pySpark Session 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApplication") \
.master("spark://spark-master:7077") \ // master 노드 지정
.getOrCreate()
MySQL에서 데이터 로딩
mysql_host = "mysql"
mysql_port = "portnumber"
mysql_database = "database"
mysql_username = "username"
mysql_password = "userpassword"
mysql_query = "query"
df = spark.read.format("jdbc").options(
url=f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}",
driver="com.mysql.cj.jdbc.Driver",
dbtable=mysql_query,
user=mysql_username,
password=mysql_password
).load()
DataFrame이란?
- 분산된 테이블 데이터를 다루는 구조.
- Pandas의 DataFrame과 유사하게 생겼음.
- 대용량 데이터를 클러스터에서 병렬처리할 수 있게 설계됨.
- 특징 5가지
- 컬럼 + 행 구조(SQL 테이블 처럼)
- 각 컬럼의 타입을 명시적으로 가진다.
- 지연 실행(Lazy Evaluation)
- .select() 또는 .filter()등은 실제 실행되지 않는다.
- 액션(.show() 또는 .collect())이 있어야 실행된다.
- 분산 처리
- 데이터가 여러 워커 노드에 분산되어 저장 및 처리된다.
자주 사용하는 함수
성능향상을 위한 팁
- .cache()
- .repartition(n)
- .explain()
- persist()
UDF(User-Defined Function)
- pySpark의 DataFrame 연산에서 사용자 정의 함수를 컬럼 단위로 적용하기 위한 도구
- 내가 직접 만든 함수를 컬럼에 적용하려면 사용해야한다.
- 기본 함수로 해결안될 때 사용한다. ex)전처리
- 단점
- 기본 사용법
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def to_upper(text):
return text.upper()
to_upper_udf = udf(to_upper, StringType())
df = df.withColumn("upper_name", to_upper_udf(df["name"]))
- 성능 팁
- 같은 값 반복 사용하면 broadcast variable 사용한다. → 불용어 리스트를 UDF에서 반복 사용하면 비효율적이므로 broadcast한다. ex)
broadcast_stopwords = spark.sparkContext.broadcast(["이", "그", "저", "있다", "하다"])
@udf(ArrayType(StringType()))
def remove_stopwords(nouns):
sw = broadcast_stopwords.value
return [n for n in nouns if n not in sw]
- 자주 쓰는 UDF 는 .cache()로 중간 결과를 저장한다.
- .cache()는 Spark DataFrame을 메모리에 저장해서 재사용할 때 빠르게 쓰기 위한 방법.
- spark는 기본적으로 지연 평가이기 때문에 .withColumn()이나 .select() 등을 여러 번 쓰면, 매변 처음부터 전체 DAG를 다시 계산한다.
- 중간 결과를 여러 번 사용할 예정이면 .cache()해놓으면 다음 연산부터는 메모리에 저장된 결과를 재활용한다.
KIWI UDF 적용
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from kiwipiepy import Kiwi
@udf(ArrayType(StringType()))
def extract_nouns(text):
if text is None:
return []
kiwi = Kiwi()
return [token.form for token in kiwi.tokenize(text) if token.tag.startswith("N")]
불용어 제거 UDF
stopwords = set(["있다", "되다", "하다", "이다", "등", "또한", "그", "이", "수", "것"])
broadcast_stopwords = spark.sparkContext.broadcast(stopwords)
@udf(ArrayType(StringType()))
def remove_stopwords(nouns):
return [n for n in nouns if n not in broadcast_stopwords.value]
df = df.withColumn("filtered_nouns", remove_stopwords(df["nouns"]))
CountVectorizer 코드
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="filtered_nouns", outputCol="rawFeatures", vocabSize=10, minDF=2)
cv_model = cv.fit(df)
featurized_data = cv_model.transform(df)
featurized_data.select("rawFeatures").show()
IDF
from pyspark.ml.feature import IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)
Silhouette_scores
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_scores = []
for k in range(2, 11):
kmeans = KMeans(featuresCol="features", predictionCol="prediction", k=k, seed=42)
model = kmeans.fit(rescaled_data)
predictions = model.transform(rescaled_data)
evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction", metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette = evaluator.evaluate(predictions)
silhouette_scores.append((k, silhouette))
print(f"K={k} → Silhouette Score = {silhouette:.4f}")
KMeans
best_k = max(silhouette_scores, key=lambda x: x[1])[0]
kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=best_k, seed=42)
model = kmeans.fit(rescaled_data)
final_clustered = model.transform(rescaled_data)
군집 중심 벡터 가져오기.
from collections import Counter
import numpy as np
centers = model.clusterCenters()
vocab = cv_model.vocabulary
keyword_scores = Counter()
for center in centers:
top_indices = np.argsort(center)[::-1][:10]
for idx in top_indices:
keyword = vocab[idx]
keyword_scores[keyword] += center[idx]
최종 핵심 키워드 출력
top_keywords = keyword_scores.most_common(10)
print("최종 핵심 키워드:")
for word, score in top_keywords:
print(f"- {word} ({score:.4f})")
Kmeans와 TF-IDF 코드
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApplication") \
.master("spark://spark-master:7077") \ // master 노드 지정
.getOrCreate()
mysql_host = "mysql"
mysql_port = "portnumber"
mysql_database = "database"
mysql_username = "username"
mysql_password = "userpassword"
mysql_query = "query"
df = spark.read.format("jdbc").options(
url=f"jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}",
driver="com.mysql.cj.jdbc.Driver",
dbtable=mysql_query,
user=mysql_username,
password=mysql_password
).load()
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from kiwipiepy import Kiwi
@udf(ArrayType(StringType()))
def extract_nouns(text):
if text is None:
return []
kiwi = Kiwi()
return [token.form for token in kiwi.tokenize(text) if token.tag.startswith("N")]
stopwords = set(["있다", "되다", "하다", "이다", "등", "또한", "그", "이", "수", "것"])
broadcast_stopwords = spark.sparkContext.broadcast(stopwords)
@udf(ArrayType(StringType()))
def remove_stopwords(nouns):
return [n for n in nouns if n not in broadcast_stopwords.value]
df = df.withColumn("filtered_nouns", remove_stopwords(df["nouns"]))
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="filtered_nouns", outputCol="rawFeatures", vocabSize=10, minDF=2)
cv_model = cv.fit(df)
featurized_data = cv_model.transform(df)
featurized_data.select("rawFeatures").show()
from pyspark.ml.feature import IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_scores = []
for k in range(2, 11):
kmeans = KMeans(featuresCol="features", predictionCol="prediction", k=k, seed=42)
model = kmeans.fit(rescaled_data)
predictions = model.transform(rescaled_data)
evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction", metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette = evaluator.evaluate(predictions)
silhouette_scores.append((k, silhouette))
print(f"K={k} → Silhouette Score = {silhouette:.4f}")
best_k = max(silhouette_scores, key=lambda x: x[1])[0]
kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=best_k, seed=42)
model = kmeans.fit(rescaled_data)
final_clustered = model.transform(rescaled_data)
from collections import Counter
import numpy as np
centers = model.clusterCenters()
vocab = cv_model.vocabulary
keyword_scores = Counter()
for center in centers:
top_indices = np.argsort(center)[::-1][:10]
for idx in top_indices:
keyword = vocab[idx]
keyword_scores[keyword] += center[idx]
top_keywords = keyword_scores.most_common(10)
print("최종 핵심 키워드:")
for word, score in top_keywords:
print(f"- {word} ({score:.4f})")
spark.stop()