1) ec2 서버 2개 더 생성
2) ec2 서버간, 연동이 가능하도록 설정
1) net-tools 다운
sudo apt update
sudo apt install net-tools
2) IP 주소 확인
ifconfig
3) 파일 수정
sudo vi /etc/hosts
---
server1_private_ip server1
server2_private_ip server2
server3_private_ip server3
---
4) 인증키 생성
# 3개 서버에서 모두 실행
## 그냥 enter 3번쳐주면 됨.
ssh-keygen -t rsa
5) 인증키 넣어주기
sudo vi ~/.ssh/authorized_keys
6) 확인
7) 새로운 서버 세팅
# 자바
sudo apt install openjdk-17-jre-headless
vim .bashrc
# 맨 아래 추가
export JAVA_HOME=/usr/lib/jvm/java-1.17.0-openjdk-amd64
# 가상환경
sudo apt-get update; sudo apt-get install make build-essential libssl-dev zlib1g-dev \
libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm \
libncursesw5-dev xz-utils tk-dev libxml2-dev libxmlsec1-dev libffi-dev liblzma-dev
curl https://pyenv.run | bash
vim .bashrc
# 가장 아래에 추가
export PYENV_ROOT="$HOME/.pyenv"
[[ -d $PYENV_ROOT/bin ]] && export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"
source .bashrc
exec $SHELL
# 가상환경생성
pyenv install 3.11.9
pyenv virtualenv 3.11.9 py3_11_9
pyenv versions
# 각종 폴더 생성
mkdir app work
cd app
mkdir spark
# 스파크 다운로드
cd ~/app/spark
wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
tar -zxvf spark-3.5.1-bin-hadoop3.tgz
# 스파크 설정
vim ~/.bashrc
export SPARK_HOME=/home/ubuntu/app/spark/spark-3.5.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_PYTHON=/home/ubuntu/.pyenv/versions/py3_11_9/bin/python
# 확인
pyspark
1) 마스터 설정
cd /home/ubuntu/app/spark/spark-3.5.1-bin-hadoop3/conf
cp spark-env.sh.template spark-env.sh
vi spark-env.sh
---
SPARK_MASTER_HOST='server1의 내부 아이피'
---
cp workers.template workers
vi workers
---
#localhost
server1내부아이피
server2내부아이피
---
2) 워커 설정
cd app/spark/spark-3.5.1-bin-hadoop3/conf
cp spark-env.sh.template spark-env.sh
vi spark-env.sh
---
SPARK_MASTER_HOST='server1의 내부 아이피'
---
3) 실행
cd /home/ubuntu/app/spark/spark-3.5.1-bin-hadoop3/sbin
./start-all.sh
4) 확인하기
pyenv activate py3_11_9
jupyter kernelspec list
pyenv deactivate
# 주피터 디렉토리 생성
cd /home/ubuntu/.pyenv/versions/3.11.9/envs/py3_11_9/share/jupyter/kernels/
ls
mkdir py3spark
cd py3spark
vi kernel.json
---
{
"argv": [
"python",
"-m",
"ipykernel_launcher",
"-f",
"{connection_file}"
],
"display_name": "py3spark",
"language": "python",
"env": {
"SPARK_HOME":
"/home/ubuntu/app/spark/spark-3.5.1-bin-hadoop3",
"PYSPARK_PYTHON":
"/home/ubuntu/.pyenv/versions/py3_11_9/bin/python",
"PYTHONPATH":
"/home/ubuntu/app/spark/spark-3.5.1-bin-hadoop3/python:/home/ubuntu/app/spark/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip"
}
}
---
# 주피터 실행
pyenv activate py3_11_9
jupyter lab
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()
# 데이터 불러오기
df = spark.read.parquet("/home/ubuntu/work/spark01/data/iris.parquet")
#원하는 열만 선택하기
df.select("sepal_length").show(5)
# 조건으로 필터링하기 (2가지 방법)
df.filter(df['sepal_length']>6).show()
df.where(df["class"] ==2).show()
from pyspark.sql.functions import col
df = df.withColumn("sepal_length_cm", col("sepal_length") * 10)
df.show(5)
from pyspark.sql.functions import when
df_col = df.withColumn(
"class_string",
when(df['class']==0, "Setosa")
.when(df['class']==1, "Versicolour")
.when(df['class']==2, "Virginica")
)
df_col.show(10)
from pyspark.sql.functions import substring
df_sub = df_col.withColumn("class_string3", substring(df_col['class_string'], 1,3))
df_sub.show(5)
df.groupby('class').agg({'sepal_length': 'avg', 'sepal_width':'avg'}).show()
import matplotlib.pyplot as plt
pandas_df = df_sub.toPandas()
pandas_df.hist(bins=50, figsize=(20,15))
plt.show()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
test_sql = spark.range(10).rdd
test_sql.collect()
df 로 변경하기
test_df = spark.range(10).rdd.toDF()
test_df.show()
rdd | df |
---|---|
![]() | ![]() |
parauet to rdd
df = spark.read.parquet("/home/ubuntu/work/spark01/data/iris.parquet")
df.show(10)
df_rdd = df.rdd
df_rdd.take(10)
class_rdd = df_rdd.map(lambda row : row['class'])
class_rdd.distinct()
class_rdd.distinct().collect()
class_rdd.distinct().count()