

데이터 병렬처리가 가능하려면?
spark.sql.files.maxPartitionBytes에 있는 파일을 읽어올 때만 적용

spark.sql.shuffle.partitions이 결정


이 사진은 Apache Spark의 SQL 엔진 아키텍처를 설명하는 다이어그램입니다. 각 구성 요소와 그 기능을 다음과 같이 설명할 수 있습니다:
상단 레이어:
중간 레이어:
하단 레이어:
이 다이어그램은 Spark SQL 엔진이 쿼리와 연산을 어떻게 처리하고 최적화하는지를 시각적으로 설명함. 각 단계는 데이터 처리의 효율성을 높이기 위해 중요한 역할을 함.
2016년에 데이터프레임과 Data set은 하나의 API로 통합
모두 파티션으로 나뉘어 Spark에서 처리됨

로우레벨 데이터로 클러스터내의 서버에 분산된 데이터를 지칭
레코드별로 존재하지만 스키마가 존재하지 않음
변경이 불가능한 분산 저장된 데이터
일반 파이썬 데이터는 parallelize 함수로 RDD 변환

#pyspark는 Spark SQL Engine이 중심으로 동작
from pyspark.sql import SparkSession
# SparkSession은 싱글턴
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.getOrCreate()
…
spark.stop()

❖ pyspark.sql.SparkSession
❖ pyspark.sql.DataFrame
❖ pyspark.sql.Column
❖ pyspark.sql.Row
❖ pyspark.sql.functions
❖ pyspark.sql.types
❖ pyspark.sql.Window
spark.executor.memory (기본값: 1g)spark.executor.cores (YARN에서는 기본값 1)spark.driver.memory (기본값: 1g)spark.sql.shuffle.partitions (기본값: 최대 200)$SPARK_HOME/conf/spark_defaults.conf 파일을 사용함spark-submit 명령의 커맨드라인 파라미터를 사용함 (나중에 따로 설명함)SparkSession 만들 때 지정함 (SparkConf 객체 사용)SparkSession 생성 시 일일히 지정함from pyspark.sql import SparkSession
# SparkSession은 싱글턴임
spark = SparkSession.builder \
.master("local[*]") \
.appName('PySpark Tutorial') \
.config("spark.some.config.option1", "some-value") \
.config("spark.some.config.option2", "some-value") \
.getOrCreate()
spark_defaults.conf, spark-submit로 들어온 환경 설정이 우선순위를 고려한 상태로 정리된 상태임SparkConf 객체에 환경 설정하고 SparkSession에 지정함from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")
# SparkSession은 싱글턴임
spark = SparkSession.builder \
.config(conf=conf) \
.getOrCreate()

spark.read(DataFrameReader)를 사용하여 데이터프레임으로 로드함DataFrame.write(DataFrameWriter)를 사용하여 데이터프레임을 저장함
!pip install pyspark==3.3.1 py4j==0.10.9.5
pyspark, py4j 사용from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.getOrCreate()
spark 호출시의 결과

!lscpu의 결과

!grep MemTotal /proc/meminfo의 결과

#python list 생성
name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
for n in name_list_json:
print(n)
# 파이썬 리스트를 RDD로 변환
# RDD로 변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨 (파티션)
rdd = spark.sparkContext.parallelize(name_list_json)
rdd
rdd.count()
import json
parsed_rdd = rdd.map(lambda el: json.loads(el))
parsed_rdd.collect()
parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"])
parsed_name_rdd.collect() 을 통해 name 필드 값들이 포함된 리스트 반환

이 코드는 파이썬 리스트를 PySpark 데이터프레임으로 변환하고, 해당 데이터프레임의 정보와 내용을 확인하는 과정을 보여줌. 각 코드 셀을 단계별로 설명함.
from pyspark.sql.types import StringType
df = spark.createDataFrame(name_list_json, StringType())
name_list_json이라는 파이썬 리스트를 PySpark 데이터프레임으로 변환함.StringType: 데이터프레임의 컬럼 타입을 문자열로 지정함.createDataFrame: PySpark 데이터프레임을 생성하는 메소드임.df.count()
df.printSchema()
value가 있으며, 타입은 string이고 nullable(널 값을 허용)함을 확인함.df.select('*').collect()
*)을 선택하고, collect() 메소드를 사용하여 모든 데이터를 드라이버 프로그램으로 수집하여 출력함.[Row(value='{"name": "keeyong"}'),
Row(value='{"name": "benjamin"}'),
Row(value='{"name": "claire"}')]df_parsed_rdd = parsed_rdd.toDF()
toDF메서드는 RDD를 DF로 변환
결과로 df_parsed_red라는 DF 생성
df_parsed_rdd.printSchema()
스키마(컬럼 데이터 타입) 확인

df_parsed_rdd.select('name').collect()
의 결과
[Row(name='keeyong'), Row(name='benjamin'), Row(name='claire')]
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv
wget 명령어를 사용하여 name_gender.csv 파일을 다운로드함.name_gender.csv 파일이 현재 디렉토리에 저장됨.df = spark.read.csv("name_gender.csv")
df.printSchema()
spark.read.csv 메소드를 사용하여 name_gender.csv 파일을 데이터프레임으로 로드함._c0, _c1 등의 이름으로 컬럼이 지정됨.df = spark.read.option("header", True).csv("name_gender.csv")
df.printSchema()
option("header", True)를 추가하여 헤더가 있는 CSV 파일로 로드함.name, gender 컬럼이 지정됨.df.show()
df.head(5)
Row 객체로 반환됨.[Row(name='Adaleigh', gender='F'),
Row(name='Amryn', gender='Unisex'),
Row(name='Apurva', gender='Unisex'),
Row(name='Aryion', gender='M'),
Row(name='Alixia', gender='F')]df.groupby(["gender"]).count().collect()
gender 컬럼을 기준으로 그룹화하고 각 그룹의 개수를 세어 수집함.Row 객체로 반환됨.[Row(gender='F', count=65),
Row(gender='M', count=28),
Row(gender='Unisex', count=7)]df.rdd.getNumPartitions()
df.createOrReplaceTempView("namegender")
namegender라는 이름의 임시 테이블이 생성됨.namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1")
namegender_group_df.collect()
gender 컬럼을 기준으로 그룹화하고 각 그룹의 개수를 셈.Row 객체로 반환됨.[Row(gender='F', count=65),
Row(gender='M', count=28),
Row(gender='Unisex', count=7)]spark.catalog.listTables()
namegender 임시 테이블이 목록에 포함되어 있음을 확인함.[Table(name='namegender', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]two_namegender_group_df = namegender_group_df.repartition(2)
namegender_group_df 데이터프레임을 2개의 파티션으로 나눔.two_namegender_group_df 데이터프레임이 2개의 파티션으로 나뉨.two_namegender_group_df.rdd.getNumPartitions()
two_namegender_group_df 데이터프레임의 파티션 수를 확인함.Dataframe의 컬럼 지칭
from pyspark.sql.functions import col, column
stationTemps = minTemps.select(
"stationID",
col("stationID"),
column("stationID"),
minTemps.stationID
)
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv
ls -tl

!head -5 1800.csv

import pandas as pd
pd_df = pd.read_csv(
"1800.csv",
names=["stationID", "date", "measure_type", "temperature"],
usecols=[0, 1, 2, 3]
)
pd_df.head()

# Filter out all but TMIN entries
pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"]
pd_minTemps.head()
# Select only stationID and temperature
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]
# Aggregate to find minimum temperature for every station
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")
pd_minTempsByStation.head()
pd_df: Pandas DataFrame으로 로드된 CSV 파일pd_minTemps: measure_type이 TMIN인 행만 필터링pd_stationTemps: stationID와 temperature 열만 선택pd_minTempsByStation: 각 stationID별로 최소 온도를 계산from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
SparkConf를 사용하여 Spark 애플리케이션 설정SparkSession을 생성하여 Spark 클러스터와 통신df = spark.read.format("csv").option("inferSchema", "true").load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
# 혹은 명시적으로 스키마를 지정
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
schema = StructType([
StructField("stationID", StringType(), True),
StructField("date", IntegerType(), True),
StructField("measure_type", StringType(), True),
StructField("temperature", FloatType(), True)
])
df = spark.read.schema(schema).csv("1800.csv")
df.printSchema()
inferSchema 옵션을 사용하여 자동으로 스키마를 추론하거나, 명시적으로 스키마를 지정# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")
# Aggregate to find minimum temperature for every station
minTempsByStation = minTemps.groupBy("stationID").min("temperature")
minTempsByStation.show()
measure_type이 TMIN인 행만 필터링stationID별로 최소 온도를 계산# Collect, format, and print the results
results = minTempsByStation.collect()
for result in results:
print(result[0] + "\t{:.2f}F".format(result[1]))
df.createOrReplaceTempView("station1800")
results = spark.sql("""
SELECT stationID, MIN(temperature)
FROM station1800
WHERE measure_type = 'TMIN'
GROUP BY stationID
""").collect()
for r in results:
print(r)
stationID별 최소 온도를 계산하고 출력

이 실습에서는 Pandas와 유사한 방식으로 PySpark를 사용하여 CSV 파일을 처리하는 방법을 다룸. 주요 작업은 CSV 파일에서 고객별로 지출 금액의 합계, 최대값, 평균값을 계산하는 것임.
!pip install pyspark==3.3.1 py4j==0.10.9.5
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark DataFrame #2')\
.getOrCreate()
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/customer-orders.csv
!ls -tl
!head -5 customer-orders.csv
customer-orders.csv 파일을 다운로드하고, 첫 5개의 행을 확인함.from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType
schema = StructType([
StructField("cust_id", StringType(), True),
StructField("item_id", StringType(), True),
StructField("amount_spent", FloatType(), True)
])
df = spark.read.schema(schema).csv("customer-orders.csv")
df.printSchema()
df_ca = df.groupBy("cust_id").sum("amount_spent")
df_ca.show()
amount_spent의 합계를 계산하고 출력함.df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")
df_ca.show(10)
import pyspark.sql.functions as f
df_ca = df.groupBy("cust_id") \
.agg(f.sum('amount_spent').alias('sum'), f.max('amount_spent').alias('max'), f.avg('amount_spent').alias('avg'))
df_ca.show(5)
sum(amount_spent) 컬럼의 이름을 sum으로 변경하고, 고객별로 지출 금액의 합계(sum), 최대값(max), 평균값(avg)을 계산함.df.createOrReplaceTempView("customer_orders")
results = spark.sql("""
SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg
FROM customer_orders
GROUP BY cust_id
""").head(5)
print(results)
spark.catalog.listTables()
이 실습을 통해 PySpark를 사용하여 대용량 데이터셋을 효율적으로 처리하고 집계하는 방법을 배울 수 있음.



이 실습에서는 PySpark를 사용하여 텍스트 파일을 읽고, 데이터를 파싱하여 구조화된 데이터로 변환한 후 CSV 및 JSON 형식으로 저장하는 방법을 다룸.
!pip install pyspark==3.3.1 py4j==0.10.9.5
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt
!ls -tl
!head -5 transfer_cost.txt
transfer_cost.txt 파일을 다운로드하고, 첫 5개의 행을 확인함.import pyspark.sql.functions as F
from pyspark.sql.types import *
schema = StructType([StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")
transfer_cost_df.show(truncate=False)
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
df_with_new_columns = transfer_cost_df\
.withColumn('week', regexp_extract('text', regex_str, 1))\
.withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
.withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
.withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
.withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
df_with_new_columns.printSchema()
final_df = df_with_new_columns.drop("text")
final_df.write.csv("extracted.csv")
!ls -tl
!ls -tl extracted.csv/
!head -5 extracted.csv/part-00000-a909db0a-d743-4a0c-96fc-60c1a8eef076-c000.csv
text 컬럼을 제거하고, 데이터를 CSV 형식으로 저장함.final_df.write.format("json").save("extracted.json")
!ls -tl extracted.json/
!head -1 extracted.json/part-00000-104f95b9-f2c6-4f77-a170-583c78106e11-c000.json
이 실습을 통해 PySpark를 사용하여 텍스트 데이터를 파싱하고, 구조화된 데이터로 변환한 후 다양한 형식으로 저장하는 방법을 배울 수 있음.


이 실습에서는 PySpark를 사용하여 Stackoverflow 설문조사 데이터를 분석하고, 가장 많이 사용되는 언어와 가장 배우고 싶은 언어를 찾는 방법을 다룸.
!pip install pyspark==3.3.1 py4j==0.10.9.5
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark DataFrame #4')\
.getOrCreate()
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/survey_results_public.csv
!ls -tl
survey_results_public.csv 파일을 다운로드하고, 파일이 제대로 저장되었는지 확인함.df = spark.read.csv("survey_results_public.csv", header=True, inferSchema=True)
df.printSchema()
df_language_have = df.select(
df.ResponseId,
F.explode(F.split(df.LanguageHaveWorkedWith, ";")).alias("language_have")
)
df_language_have.show(5)
df_language50_have = df_language_have.groupby("language_have").count().orderBy('count', ascending=False).limit(50)
df_language50_have.write.mode('overwrite').csv("language50_have")
LanguageHaveWorkedWith 열을 기반으로 데이터를 분리하여 각 언어를 추출함.df_language_want = df.select(
df.ResponseId,
F.explode(F.split(df.LanguageWantToWorkWith, ";")).alias("language_want")
)
df_language_want.show(5)
df_language50_want = df_language_want.groupby("language_want").count().orderBy('count', ascending=False).limit(50)
df_language50_want.write.mode('overwrite').csv("language50_want")
LanguageWantToWorkWith 열을 기반으로 데이터를 분리하여 각 언어를 추출함.!ls -tl language50_have/
!cat language50_have/part-00000-d133bb73-6f57-4c38-a963-0c9ddf10dabf-c000.csv
!ls -tl language50_want/
!cat language50_want/part-00000-c59aa8f9-e4cc-4425-8ff0-368152be6934-c000.csv
LanguageHaveWorkedWith 열을 분리하여 각 언어를 추출하고, 상위 50개 언어를 계산하여 CSV 파일로 저장함.LanguageWantToWorkWith 열을 분리하여 각 언어를 추출하고, 상위 50개 언어를 계산하여 CSV 파일로 저장함.이 실습을 통해 PySpark를 사용하여 대규모 설문조사 데이터를 분석하고, 인기 있는 프로그래밍 언어를 파악하는 방법을 배울 수 있음.


이 실습에서는 PySpark를 사용하여 Amazon Redshift 데이터베이스에 연결하고, 데이터를 로드한 후 분석하는 방법을 다룸.
!pip install pyspark==3.3.1 py4j==0.10.9.5
!cd /usr/local/lib/python3.8/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("PySpark DataFrame #5") \
.getOrCreate()
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234") \
.option("dbtable", "raw_data.session_timestamp") \
.load()
raw_data.user_session_channel 및 raw_data.session_timestamp 테이블을 로드함.df_user_session_channel.printSchema()
df_user_session_channel.rdd.getNumPartitions()
df_session_timestamp.printSchema()
df_session_timestamp.rdd.getNumPartitions()
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")
session_df.printSchema()
session_df.show(5)
sessionid를 기준으로 조인하고, 조인 결과의 스키마와 일부 데이터를 확인함.session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
"userid", df_user_session_channel.sessionid, "channel", "ts"
)
channel_count_df = session_df.groupby("channel").count().orderBy("count", ascending=False)
channel_count_df.show()
channel 별로 데이터 수를 집계하여 출력함.from pyspark.sql.functions import date_format, asc, countDistinct
session_df.withColumn('month', date_format('ts', 'yyyy-MM')).groupby('month').\
agg(countDistinct("userid").alias("mau")).sort(asc('month')).show()
ts 컬럼을 기반으로 월을 추출하여, 월별로 활성 사용자 수(MAU)를 계산함.df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
channel_count_df = spark.sql("""
SELECT channel, count(distinct userId) uniqueUsers
FROM session_timestamp st
JOIN user_session_channel usc ON st.sessionID = usc.sessionID
GROUP BY 1
ORDER BY 1
""")
channel_count_df.show()
mau_df = spark.sql("""
SELECT
LEFT(A.ts, 7) AS month,
COUNT(DISTINCT B.userid) AS mau
FROM session_timestamp A
JOIN user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1
ORDER BY 1 DESC""")
mau_df.collect()
channel 별로 고유 사용자 수를 계산함.channel 별로 데이터 수를 집계하여 출력함.ts 컬럼을 기반으로 월을 추출하여, 월별로 활성 사용자 수(MAU)를 계산함.channel 별 고유 사용자 수와 월별 활성 사용자 수(MAU)를 계산함.이 실습을 통해 PySpark를 사용하여 Redshift 데이터베이스와 연동하고, 데이터를 분석하는 방법을 배울 수 있음.

