AWS์์ Spark์ ์คํํ๊ธฐ์ํ ์ค๋น
EMR
AWS์ Hadoop Service(On-demand Hadoop)
->Hadoop(YARN), Spark, Hive, Notebook ๋ฑ๋ฑ์ด ์ค์น๋์ด ์ ๊ณต๋๋ ์๋น์ค์ด๋ค.
EC2 Server๋ค์ worker node๋ก ์ฌ์ฉํ๊ณ S3๋ฅผ HDFS๋ก ์ฌ์ฉํ๋ค.
AWS๋ด์ ๋ค๋ฅธ Service๋ค๊ณผ ์ฐ๋์ด ์ฝ๋ค.(Kinesis, DynamoDB, Redshift, ..)
Spark on EMR ์คํ ๋ฐ ์ฌ์ฉ ๊ณผ์
Spark Cluster Manager์ ์คํ Model ์์ฝ
| Cluster manager | Deployed mode | Program Run Method |
|---|---|---|
| loacl[n] | Client | Spark Shell, IDE, Notebook |
| YARN | Client | Spark Shell, Notebook |
| YARN | Client | spark-submit |
EMR Service Page๋ก ์ด๋ & AWS Console์์ EMR ์ ํ

Create Cluster ์ ํ

EMR Cluster ์์ฑ -> ์ด๋ฆ๊ณผ ๊ธฐ์ ์คํ ์ ํ

Software configuration
Zeppelin์ด๋
-> Notebook -> Spark, SQL, Python
m5.xlarge node 3๊ฐ ์ ํ -> ํ๋ฃจ $35 ๋น์ฉ ๋ฐ์
Create Cluster ์ ํ





EMR Cluster ์์ฑ๊น์ง ๋๊ธฐ
Cluster๊ฐ Waiting์ผ๋ก ๋ณํ ๋๊น์ง ๋๊ธฐ


๋ง์คํฐ ๋
ธ๋์ ํฌํธ๋ฒํธ 22๋ฒ์ผ๋ก ์ด๊ธฐ
-> ๋คํธ์ํฌ ๋ฐ ๋ณด์ -> EC2 ๋ณด์ ๊ทธ๋ฃน(๋ฐฉํ๋ฒฝ) -> ํ๋ผ์ด๋จธ๋ฆฌ ๋
ธ๋ -> ์ธ๋ฐ์ด๋ ๊ท์น -> ์ธ๋ฐ์ด๋ ๊ท์น ํธ์ง(SSH๊ฐ ์๋ค๋ฉด)
EMR Cluster Summary Tab ์ ํ
Security Groups for Master ๋งํฌ ์ ํ
Security Groups ํ์ด์ง์์ ๋ง์คํฐ ๋ ธ๋์ security group ID๋ฅผ ํด๋ฆญ
Edit inbound rules ๋ฒํผ ํด๋ฆญ ํ Add rule ๋ฒํผ ์ ํ
Port ๋ฒํธ๋ก 22๋ฅผ ์ ๋ ฅ
Anywhere IP v4 ์ ํ
Save rules ๋ฒํผ ์ ํ






spark-submit์ ์ด์ฉํ์ฌ ์คํํ๋ฉด์ ๋๋ฒ๊น ํ๋ค.
๋ ๊ฐ์ Job์ AWS EMR ์์์ ์คํํด ๋ณผ ์์ ์ด๋ค.
์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ S3๋ก ๋ก๋ฉ
Stackoverflow 2022๋
๊ฐ๋ฐ์ Survey CSV ํ์ผ์ S3 Bucket์ผ๋ก ์
๋ก๋
-> ์ต๋ช
ํ๋ 83,339๊ฐ์ Survey ์๋ต
s3://spark-tutorial-dataset/survey_results_public.csv
PySpark Job Code
์ ๋ ฅ CSV ํ์ผ์ ๋ถ์ํ์ฌ ๊ทธ ๊ฒฐ๊ณผ๋ฅผ S3์ ๋ค์ ์ ์ฅ(stackoverflow.py)
๋ฏธ๊ตญ ๊ฐ๋ฐ์๋ง ๋์์ผ๋ก ์ฝ๋ฉ์ ์ฒ์ ์ด๋ป๊ฒ ๋ฐฐ์ ๋์ง๋ฅผ ์นด์ดํธ ํ์ฌ S3์ ์ ์ฅ
๐ stackoverflow.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
S3_DATA_INPUT_PATH = 's3://spark-tutorial-dataset/survey_results_public.csv'
S3_DATA_OUTPUT_PATH = 's3://spark-tutorial-dataset/data-output'
spark = SparkSession.builder.appName('Tutorial').getOrCreate()
df = spark.read.csv(S3_DATA_INPUT_PATH, header=True)
print('# of records {}'.format(df.count()))
learnCodeUS = df.where((col('Country')=='United States of America')).groupby('LearnCode').count()
learnCodeUS.write.mode('overwrite').csv(S3_DATA_OUTPUT_PATH) # parquet
learnCodeUS.show()
print('Selected data is successfully saved to S3: {}'.format(S3_DATA_OUTPUT_PATH))
PySpark Job Run
Spark Master node์ SSH๋ก ๋ก๊ทธ์ธ ํ์ฌ spark-submit์ ํตํด ์คํ
์์ ๋ค์ด๋ก๋ ๋ฐ์ .ppk ํ์ผ์ ์ฌ์ฉํ์ฌ SSH ๋ก๊ทธ์ธ
Code Run
[hadoop@ip-172-31-47-191 ~]$ nano stackoverflow.py
[hadoop@ip-172-31-47-191 ~]$ spark-submit --master yarn stackoverflow.py

