spark.mllib vs spark.mlspark.mllib가 RDD 기반이고, spark.ml은 데이터프레임 기반spark.mllib는 RDD 위에서 동작하는 이전 라이브러리로 더 이상 업데이트 안됨spark.ml을 사용할 것import pyspark.mlhttps://docs.google.com/spreadsheets/d/1OYWPijmIhvVpvbSA-79DlInydidPrbfynfbsbINXmKw/edit?gid=880912933#gid=880912933

Regression 사용이 코드는 Google Colab에서 PySpark를 사용하여 보스턴 주택 가격 예측 모델을 구축하는 과정을 다루고 있음. 각 셀에서 수행되는 작업을 단계별로 요약하면 다음과 같음.
!pip install pyspark==3.3.1 py4j==0.10.9.5
PySpark와 Py4J 패키지를 설치함. Py4J는 Python에서 Java 객체에 접근할 수 있게 해줌.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Boston Housing Linear Regression example") \
.getOrCreate()
SparkSession 객체를 생성하여 SparkContext를 초기화하고 애플리케이션 이름을 지정함.
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/boston_housing.csv
data = spark.read.csv('./boston_housing.csv', header=True, inferSchema=True)
data.printSchema()
data.show()
데이터셋을 다운로드하고 Spark DataFrame으로 읽어들임. 스키마와 데이터를 출력함.
from pyspark.ml.feature import VectorAssembler
feature_columns = data.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data_2 = assembler.transform(data)
data_2.show()
VectorAssembler를 사용하여 피쳐 벡터를 생성함. features라는 컬럼에 모든 피쳐를 묶음.
train, test = data_2.randomSplit([0.7, 0.3])
from pyspark.ml.regression import LinearRegression
algo = LinearRegression(featuresCol="features", labelCol="medv")
model = algo.fit(train)
데이터를 훈련용과 테스트용으로 분할하고, Linear Regression 모델을 학습시킴.
evaluation_summary = model.evaluate(test)
evaluation_summary.meanAbsoluteError
evaluation_summary.rootMeanSquaredError
evaluation_summary.r2
모델을 평가하고, Mean Absolute Error, Root Mean Squared Error, R-squared 값을 출력함.
predictions = model.transform(test)
predictions.show()
model.save("boston_housing_model")
테스트 데이터에 대한 예측을 수행하고, 결과를 출력함. 모델을 저장함.
from google.colab import drive
drive.mount('/content/gdrive')
model_save_name = "boston_housing_model"
path = F"/content/gdrive/My Drive/boston_housing_model"
model.save(path)
from pyspark.ml.regression import LinearRegressionModel
loaded_model = LinearRegressionModel.load(path)
predictions2 = loaded_model.transform(test)
predictions2.show()
Google Drive에 모델을 저장하고, 저장된 모델을 다시 로드하여 테스트 데이터에 대한 예측을 수행함.
이 코드 셀들은 PySpark를 사용하여 보스턴 주택 가격 예측 모델을 구축하는 전 과정을 보여줌. 데이터 다운로드, 전처리, 모델 학습 및 평가, 모델 저장 및 로드 단계를 포함함. Google Colab 환경에서 실행되도록 구성되어 있음.
https://www.kaggle.com/c/titanic/data

이 코드는 Google Colab에서 PySpark를 사용하여 타이타닉 생존 예측 모델을 구축하는 과정을 다루고 있음. 각 셀에서 수행되는 작업을 단계별로 요약하면 다음과 같음.
!pip install pyspark==3.3.1 py4j==0.10.9.5
PySpark와 Py4J 패키지를 설치함. Py4J는 Python에서 Java 객체에 접근할 수 있게 해줌.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Titanic Binary Classification example") \
.getOrCreate()
SparkSession 객체를 생성하여 SparkContext를 초기화하고 애플리케이션 이름을 지정함.
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/titanic.csv
data = spark.read.csv('./titanic.csv', header=True, inferSchema=True)
data.printSchema()
data.show()
데이터셋을 다운로드하고 Spark DataFrame으로 읽어들임. 스키마와 데이터를 출력함.
final_data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
final_data.show()
사용하지 않을 컬럼을 제거하고 필요한 컬럼만 선택함.
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
imputer_model = imputer.fit(final_data)
final_data = imputer_model.transform(final_data)
final_data.select("Age", "AgeImputed").show()
Imputer를 사용하여 Age 컬럼의 결측치를 평균값으로 대체함.
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
gender_indexer_model = gender_indexer.fit(final_data)
final_data = gender_indexer_model.transform(final_data)
final_data.select("Gender", "GenderIndexed").show()
StringIndexer를 사용하여 성별을 숫자로 인코딩함.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
data_vec = assembler.transform(final_data)
data_vec.show()
VectorAssembler를 사용하여 피쳐 벡터를 생성함. features라는 컬럼에 모든 피쳐를 묶음.
train, test = data_vec.randomSplit([0.7, 0.3])
from pyspark.ml.classification import LogisticRegression
algo = LogisticRegression(featuresCol="features", labelCol="Survived")
model = algo.fit(train)
데이터를 훈련용과 테스트용으로 분할하고, Logistic Regression 모델을 학습시킴.
predictions = model.transform(test)
predictions.select(['Survived','prediction', 'probability']).show()
테스트 데이터에 대한 예측을 수행하고, 결과를 출력함.
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
evaluator.evaluate(predictions)
BinaryClassificationEvaluator를 사용하여 모델 성능을 평가함. Area Under ROC 값을 출력함.
이 코드 셀들은 PySpark를 사용하여 타이타닉 생존 예측 모델을 구축하는 전 과정을 보여줌. 데이터 다운로드, 전처리, 모델 학습 및 평가 단계를 포함함. Google Colab 환경에서 실행되도록 구성되어 있음.

머신러닝 알고리즘에 관계없이 일관된 형태의 API를 사용하여 모델링이 가능
ML 모델개발과 테스트를 반복가능해줌
4개의 요소로 구성
dataframe : 데이터 저장 구조
Transformer : DF를 입력받아 변환된 DF 반환 객체
Estimator : DF를 입력받아 학습된 모델을 반환. 예를 들어 LogisiticRegression은 Estimator이고, LogisticRegression.fit()를 호출하면, 머신 러닝 모델(Transformer)을 만들어냄
Parameter : 2와 3의 설정값.
이제 PySpark에서 ML Pipeline을 사용할 때 중요한 구성 요소인 DataFrame, Transformer, Estimator, Parameter에 대해 설명하겠음. 위에서 작성한 코드를 이러한 구성 요소를 사용하여 다시 작성해보겠음.
아래는 위의 구성 요소를 사용하여 타이타닉 생존 예측 모델을 ML Pipeline으로 구현한 코드임.
# 필요한 라이브러리 설치
!pip install pyspark==3.3.1 py4j==0.10.9.5
# SparkSession 생성
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Titanic Binary Classification Pipeline") \
.getOrCreate()
# 데이터 다운로드 및 읽기
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/titanic.csv
data = spark.read.csv('./titanic.csv', header=True, inferSchema=True)
# 데이터 클린업
final_data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
# 결측치 처리
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
# 성별 정보 인코딩
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
# 피쳐 벡터 생성
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
# Logistic Regression 모델
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="Survived")
# Pipeline 생성
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[imputer, gender_indexer, assembler, lr])
# 데이터 분할
train, test = final_data.randomSplit([0.7, 0.3])
# 모델 학습
model = pipeline.fit(train)
# 예측 수행
predictions = model.transform(test)
# 모델 성능 평가
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
auc = evaluator.evaluate(predictions)
print(f"Area Under ROC: {auc}")
# 예측 결과 출력
predictions.select(['Survived','prediction', 'probability']).show()
DataFrame:
data = spark.read.csv('./titanic.csv', header=True, inferSchema=True)
final_data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
데이터를 로드하고 필요한 컬럼만 선택하여 DataFrame을 만듦.
Transformer:
Imputer:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
Age 컬럼의 결측치를 평균값으로 대체하는 Transformer.
StringIndexer:
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
Gender 컬럼을 숫자로 인코딩하는 Transformer.
VectorAssembler:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
여러 피처 컬럼을 하나의 벡터로 결합하는 Transformer.
Estimator:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="Survived")
Logistic Regression 모델을 정의하는 Estimator.
Pipeline:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[imputer, gender_indexer, assembler, lr])
Transformer와 Estimator를 연결하여 하나의 Pipeline을 만듦.
Parameter:
각 Transformer와 Estimator에는 설정값을 지정하는 Parameter가 있음. 예를 들어, Imputer의 strategy 파라미터, StringIndexer의 inputCol 및 outputCol 파라미터, VectorAssembler의 inputCols 및 outputCol 파라미터, LogisticRegression의 featuresCol 및 labelCol 파라미터 등이 있음.
이 코드는 PySpark를 사용하여 데이터 전처리, 피처 변환, 모델 학습 및 평가를 ML Pipeline을 통해 자동화한 예시임. DataFrame, Transformer, Estimator, Parameter를 사용하여 구성됨.







EMR 클러스터 summary 탭 선택
Security groups for master 링크 선택

Security Group 페이지에서 마스터 노드의 security gorup id를 클릭
edit inbound rules에서 포트번호22 오픈

spark-submit --master yarn python_file.py