AWS EC2 t2.xlarge
OS : Red Hat 9.1
Python : 3.9
Spark : 3.3.1
Scala : 2.12.15
Java : OpenJDK 64-Bit Server VM, 1.8.0_352
# create data list
stockSchema = ["name", "ticker", "country", "price", "currency"]
stocks = [
('Google', 'GOOGL', 'USA', 2984, 'USD'),
('Netflix', 'NFLX', 'USA', 645, 'USD'),
('Amazon', 'AMZN', 'USA', 3518, 'USD'),
('Tesla', 'TSLA', 'USA', 1222, 'USD'),
('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
('Toyota', '7203', 'Japan', 2006, 'JPY'),
('Samsung', '005930', 'Korea', 70600, 'KRW'),
('Kakao', '035720', 'Korea', 125000, 'KRW'),
]
# create DataFrame (list to dataframe)
df = spark.createDataFrame(data=stocks, schema=stockSchema)
# create DatFrame (read csv file)
filename = "/my/dir/filename.csv"
# 파일 여러개 인 경우
filename = "/my/dir/*.csv"
df = spark.read.csv(f"file:///{filename}", inferSchema=True, header=True)
# show data type
df.dtypes
"""
[('name', 'string'),
('ticker', 'string'),
('country', 'string'),
('price', 'bigint'),
('currency', 'string')]
"""
# describe() : 기본 통계 값 출력
df.describe().show()
df.select("total_amount").describe().show()
"""
+-------+------------------+
|summary| total_amount|
+-------+------------------+
| count| 9344926|
| mean|18.217332152376397|
| stddev|184.27259172356767|
| min| -647.8|
| max| 398469.2|
+-------+------------------+
"""
# print DataFrame
df.show()
"""
+-------+------+---------+------+--------+
| name|ticker| country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL| USA| 2984| USD|
|Netflix| NFLX| USA| 645| USD|
| Amazon| AMZN| USA| 3518| USD|
| Tesla| TSLA| USA| 1222| USD|
|Tencent| 0700|Hong Kong| 483| HKD|
| Toyota| 7203| Japan| 2006| JPY|
|Samsung|005930| Korea| 70600| KRW|
| Kakao|035720| Korea|125000| KRW|
+-------+------+---------+------+--------+
"""
# "stocks"라는 Spark Temporary View 생성.
df.createOrReplaceTempView("stocks")
# SQL 사용
spark.sql("select name from stocks")
"""
DataFrame[name: string]
"""
spark.sql("select price from stocks")
"""
DataFrame[price: bigint]
"""
# spark.sql.("SQL").show() : show(n) n rows를 출력한다. default 20
spark.sql("select name from stocks").show()
"""
+-------+
| name|
+-------+
| Google|
|Netflix|
| Amazon|
| Tesla|
|Tencent|
| Toyota|
|Samsung|
| Kakao|
+-------+
"""
spark.sql("select name, country from stocks where name like 'S%'").show()
"""
+-------+-------+
| name|country|
+-------+-------+
|Samsung| Korea|
+-------+-------+
"""
# JOIN
spark.sql("select A.name, (A.price/B.eps) from A join B on A.name = B.name ").show()
# explain(True)
spark.sql("select A.name, (A.price/B.eps) from A join B on A.name = B.name ").explain()
# Datetime Format
# EEE : 요일 3글자 (ex. Wed)
# EEEE : 요일 (ex. Wednesday)
query = """
SELECT
d.datetime,
DATE_FORMAT(d.datetime, 'EEEE') AS day_of_week,
COUNT(*) AS cnt
FROM
df as d
GROUP BY
d.datetime,
day_of_week
"""
# DataFrame to pandas DataFrame
# pd_df 는 그냥 판다스 사용하는 것 처럼 seaborn, matplotlib 등등에 사용하면 된다.
pd_df = spark.sql(query).toPandas()
pyspark.sql.SparkSession
Dataset, DatFrame API 로 Spark 프로그래밍하기 위한 진입점
SparkSession은 DataFrame 생성, DataFrame을 table로 등록, parquet 파일 읽기에 사용된다.
SparkSession을 생성하기위해서는 builder 패턴을 사용해야한다.
spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
DATAFRAME.createOrReplaceTempView("VIEW_NAME")
데이터프레임(DATAFRAME)으로 로컬 임시 뷰(VIEW_NAME) 생성/대체.
임시 테이프블의 수명은 이 데이터프레임을 생성하는데 사용된 SparkSession에 달려잇다. 세션이 종료되면 View Table은 Drop된다.
: date에서 fmt 다음 단위를 00 으로 자른 값 반환.
date_trunc(date, fmt)
: fmt 모델 형식에 지정된 단위로 잘린 timestamp를 반환한다. fmt는 ["YEAR", "YYYY", "YY", "MON", "MONTH", "MM", "DAY", "DD", "HOUR", "MINUTE", "SECOND", "WEEK", "QUARTER"] 중에 하나여야 한다.SELECT date_trunc('2015-03-05T09:32:05.359', 'YEAR'); # -> 2015-01-01T00:00:00 SELECT date_trunc('2015-03-05T09:32:05.359', 'MM'); # -> 2015-03-01T00:00:00 SELECT date_trunc('2015-03-05T09:32:05.359', 'DD'); # -> 2015-03-05T00:00:00 SELECT date_trunc('2015-03-05T09:32:05.359', 'HOUR'); # -> 2015-03-05T09:00:00
filename = "*.csv"
df = spark.read.csv(f"file:///{filename}", inferSchema=True, header=True)
스키마가 동일하면 상관없는데
스키마가 다른 파일이(A,B라고 가정) 같이 있다면
df.printSchema()
# -> A의 스키마만 출력
trips_df.select("B_COLUMN").show()
# -> Column 'B_COLUMN' does not exist.
bigint
Spark Docs : pyspark.sql.SparkSession
Spark Docs : pyspark.sql.DataFrame.createOrReplaceTempView
Spark Docs : Spark functions
Spark Docs : Datetime Pattern
PySparkn Datetime Format