[Spark] SQL 연습하기

Denver·2022년 12월 18일
0

Hadoop

목록 보기
6/9
post-thumbnail

0. 실행 환경

AWS EC2 t2.xlarge
OS : Red Hat 9.1
Python : 3.9
Spark : 3.3.1
Scala : 2.12.15
Java : OpenJDK 64-Bit Server VM, 1.8.0_352

1. SQL 연습

# create data list
stockSchema = ["name", "ticker", "country", "price", "currency"]
stocks = [
    ('Google', 'GOOGL', 'USA', 2984, 'USD'), 
    ('Netflix', 'NFLX', 'USA', 645, 'USD'),
    ('Amazon', 'AMZN', 'USA', 3518, 'USD'),
    ('Tesla', 'TSLA', 'USA', 1222, 'USD'),
    ('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
    ('Toyota', '7203', 'Japan', 2006, 'JPY'),
    ('Samsung', '005930', 'Korea', 70600, 'KRW'),
    ('Kakao', '035720', 'Korea', 125000, 'KRW'),
]

# create DataFrame (list to dataframe)
df = spark.createDataFrame(data=stocks, schema=stockSchema)

# create DatFrame (read csv file)
filename = "/my/dir/filename.csv"
# 파일 여러개 인 경우
filename = "/my/dir/*.csv"
df = spark.read.csv(f"file:///{filename}", inferSchema=True, header=True)

# show data type
df.dtypes
"""
[('name', 'string'),
 ('ticker', 'string'),
 ('country', 'string'),
 ('price', 'bigint'),
 ('currency', 'string')]
"""

# describe() : 기본 통계 값 출력
df.describe().show()
df.select("total_amount").describe().show()
"""
+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|           9344926|
|   mean|18.217332152376397|
| stddev|184.27259172356767|
|    min|            -647.8|
|    max|          398469.2|
+-------+------------------+
"""

# print DataFrame
df.show()
"""                                                        
+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+
"""
                                                                                
# "stocks"라는 Spark Temporary View 생성.
df.createOrReplaceTempView("stocks")

# SQL 사용
spark.sql("select name from stocks")
"""
DataFrame[name: string]
"""
spark.sql("select price from stocks")
"""
DataFrame[price: bigint]
"""

# spark.sql.("SQL").show() : show(n) n rows를 출력한다. default 20
spark.sql("select name from stocks").show()
"""
+-------+
|   name|
+-------+
| Google|
|Netflix|
| Amazon|
|  Tesla|
|Tencent|
| Toyota|
|Samsung|
|  Kakao|
+-------+
"""

spark.sql("select name, country from stocks where name like 'S%'").show()
"""
+-------+-------+
|   name|country|
+-------+-------+
|Samsung|  Korea|
+-------+-------+
"""

# JOIN
spark.sql("select A.name, (A.price/B.eps) from A join B on A.name = B.name ").show()

# explain(True)
spark.sql("select A.name, (A.price/B.eps) from A join B on A.name = B.name ").explain()

# Datetime Format
# EEE : 요일 3글자 (ex. Wed)
# EEEE : 요일 (ex. Wednesday)
query = """
SELECT 
    d.datetime,
    DATE_FORMAT(d.datetime, 'EEEE') AS day_of_week,
    COUNT(*) AS cnt
FROM
    df as d
GROUP BY
    d.datetime,
    day_of_week
"""

# DataFrame to pandas DataFrame
# pd_df 는 그냥 판다스 사용하는 것 처럼 seaborn, matplotlib 등등에 사용하면 된다.
pd_df = spark.sql(query).toPandas()


### biging df.dtypes 를 실행하면 price가 bigint 라는 타입이라고 출력된다. bigint는 8 바이트 크기의 SQL 서버에서 가장 큰 정수 데이터 타입이다. (-9,223,372,036,854,775,808 ~ 9,223,372,036,854,775,807)

SparkSession

pyspark.sql.SparkSession
Dataset, DatFrame API 로 Spark 프로그래밍하기 위한 진입점
SparkSession은 DataFrame 생성, DataFrame을 table로 등록, parquet 파일 읽기에 사용된다.
SparkSession을 생성하기위해서는 builder 패턴을 사용해야한다.

spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

createOrReplaceTempView

DATAFRAME.createOrReplaceTempView("VIEW_NAME")
데이터프레임(DATAFRAME)으로 로컬 임시 뷰(VIEW_NAME) 생성/대체.
임시 테이프블의 수명은 이 데이터프레임을 생성하는데 사용된 SparkSession에 달려잇다. 세션이 종료되면 View Table은 Drop된다.

Spark Function

date_trunc(date, fmt)

: date에서 fmt 다음 단위를 00 으로 자른 값 반환.

date_trunc(date, fmt)
: fmt 모델 형식에 지정된 단위로 잘린 timestamp를 반환한다. fmt는 ["YEAR", "YYYY", "YY", "MON", "MONTH", "MM", "DAY", "DD", "HOUR", "MINUTE", "SECOND", "WEEK", "QUARTER"] 중에 하나여야 한다.

SELECT date_trunc('2015-03-05T09:32:05.359', 'YEAR');
# -> 2015-01-01T00:00:00
SELECT date_trunc('2015-03-05T09:32:05.359', 'MM');
# -> 2015-03-01T00:00:00
SELECT date_trunc('2015-03-05T09:32:05.359', 'DD');
# -> 2015-03-05T00:00:00
SELECT date_trunc('2015-03-05T09:32:05.359', 'HOUR');
# -> 2015-03-05T09:00:00

Q

  • 파일 여러개인 경우, 아래와 같이 데이터프레임을 만든다.
filename = "*.csv"
df = spark.read.csv(f"file:///{filename}", inferSchema=True, header=True)

스키마가 동일하면 상관없는데
스키마가 다른 파일이(A,B라고 가정) 같이 있다면

df.printSchema()
# -> A의 스키마만 출력

trips_df.select("B_COLUMN").show() 
# -> Column 'B_COLUMN' does not exist.

- spark.sql("QUERY") VS df.select("").describe().show()



참고 자료

bigint
Spark Docs : pyspark.sql.SparkSession
Spark Docs : pyspark.sql.DataFrame.createOrReplaceTempView
Spark Docs : Spark functions
Spark Docs : Datetime Pattern
PySparkn Datetime Format

profile
까먹었을 미래의 나를 위해

0개의 댓글