<대규모 주류 판매 데이터 분석>
- Role : 주류 산업 데이터 컨설턴트
- 데이터 처리 ~ 리포팅
데이터셋 : kaggle- Iowa Liquor Sales
- 데이터 소개
Spark
: 분산 클러스터 컴퓨팅 오픈소스 프레임워크
PySpark
Python 환경에서 Spark를 사용할 수 있는 인터페이스 ( Spark용 API )
# 리눅스 기반 코드
!apt-get install openjdk-8-jdk-headless # jdk 설치
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz # spark 파일
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz # 스파크 압축 풀기
!pip install -q findspark # 스파크 찾기
!pip install kaggle --upgrade # 캐글 데이터 다운받기 위한 kaggle library 설치
apt-get : 시스템에서 사용가능한 패키지에 대한 설치, 패키지 검색, 업데이트 및 기타 여러 작업wget : 웹 상의 파일을 다운로드tar : 여러 개의 파일을 하나의 파일로 묶거나 풀 때pip : 파이썬에서 외부 라이브러리를 설치, 업그레이드, 제거, 검색 등import os # 운영체제와의 상호작용 돕는 다양한 기능 제공
import findspark
# 환경변수에 path 지정
os.environ["JAVA_HOME"] = ""
os.environ["SPARK_HOME"] = ""
findspark.init() # spark경우 잘 찾지 모샇는 경우가 있어 findspark 이용
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Pyspark_test")
.master("local[*]") # local에서 사용하며, 모든 쓰레드를 사용
.getOrCreate()
)
# 만들어진 spark 객체 설정 알아보기
spark.sparkContext.getConf().getAll()
kaggle - account - API - Create New Token - json파일 다운 -
토큰 잃어버리면 다시 찾을 수 X - expire한 후 새 토큰 재발급
from google.colab import files
files.upload() # json 파일 업로드
!mkdir -p /root/.kaggle # kaggle 폴더 생성
!cp kaggle.json /root/.kaggle/ # json 파일 복사
!chmod 600 /root/.kaggle/kaggle.json # file 접근 권한 할당
!kaggle datatsets download -d wethanielaw/iowa-liquor-sales-20230401 # data download
!unzip iowa-liquor-sales-20230401.zip # 압축 풀기
mkdir : 디렉토리(폴더) 생성 명령어cp : 파일 복사 / 이동chmod : 기존파일 or 디렉토리에 대한 접근권한 변경unzip : zip 파일 압축 풀기# 파일 크기 알기
import os
def convert_size(size_bytes):
import math
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
file_size = os.path.getsize('./파일명.csv')
print('File Size:', convert_size(file_size), 'bytes')
df = spark.read.csv(
path = "파일명.csv", header = True, inferSchema = True
)
import pandas as pd
temp = pd.read_csv("Iowa_Liquor_Sales.csv") # 데이터 용량이 너무 크면 오류
csv
json
{"데이터 이름" : "값"}
JDBC
- 자바에서 데이터베이스에 접속할 수 있도록 하는 자바 api
- DB 불러와 DF로 처리
<빅데이터 위한 파일 포맷>
df.show() - 기본적으로 20개 행 보여줌. 숫자 기입 가능
df.printSchema()
df.columns
# 공백 및 괄호 제외
def replace_word(col_name):
replace_words = {
" ": "",
"(": "_",
")": ""
}
for word in replace_words:
col_name = col_name.replace(word, replace_words[word])
return col_name
for col in df.columns:
df = df.withColumnRenamed(col, replace_word(col))
df.write.format("parquet").save(
path = "data_parquet",
header = True
)
# google colab에서 파일 다운로드
from google.colab import files
download_list = os.listdir("./data_parquet")
for file_name in download_list:
if file_name[-3:] != 'crc':
files.download("./data_parquet/" + file_name)
데이터 불러오는 데에 : csv >>> parquet
transformation : 비슷
action (결과값 반환) :
%%time # 시간체크해주는 것
spark.read.csv("Iowa_Liquor_Sales.csv",header=True, inferSchema=True)
# dataframe -> 뷰 등록
df_parquet.createOrreplaceTempView("sql_df")
테이블명.filter(조건)
SparkSQL과 DataFrame api중 쿼리 수행 시 성능상 효율 - 비슷
SparkSQL
DataFrameAPI
select 사용하지 않으면 모든 column 반환
output = 또다른 dataframe 반환
| 함수 | 기능 | SQL | 사용/비고 |
|---|---|---|---|
| df.show() | 데이터프레임 보여줌 | ||
| df.count() | 행 수- 항상 dataframe형태로 나오는 것은 아님 | count | |
| df.printSchema() | schema 보여줌(컬럼명 - datatype - nulltype) | ||
| 결측치 처리 | |||
| df.select() | 특정 컬럼 선택 | select | |
| when | if문 같은 조건문 만들어 | ||
| case when | F.when(조건, True일 경우 반환값).otherwise(아닐 경우 반환값) | ||
| alias | as. | ||
| 새롭게 연산된 컬럼의 컬럼명 | (as) | ||
| isnull, isnotnull | 컬럼의 값이 null인지 아닌지 | isnull | |
| filter | 구문의 true인 값 반환 = where 함수 | where | df.filter(F.col(””).isnull()) |
| drop | 열 제거 | - | |
| withcolumn withcolumnrename | 기존 컬럼 업데이터, 타입변경, 신규 컬럼값 추가 | - | withcolumn(신규컬럼명, 신규컬럼값) 기존 컬럼명과 동일할 경우 기존컬럼에 덮어쓰기 |
| size | 컬럼 array 길이 반환 list 형태의 len과 유사 기능 | ||
| collect_set collect_list | object 고유 집합 반환(set만)( = distinct) | ||
| 요소를 set(list)로 묶음 | - | ||
| over | 컬럼 window 정의 (=window over) | ||
| * window : 일정 기준으로 묶인 row | |||
| window | window 정의 | ||
| partitionBy | W) window 중 그룹 (=groupby) | ||
| first, last | 첫번째, 마지막 값 반환 | ignorenulls=True: null인 경우 제외하고서 | |
| orderby | 정렬(default: 오름차순, desc(): 내림차순) | ||
| 데이터 형 변환 | |||
| to_date to_timestamp | string → date 형태 | ||
| string → timestamp | to_date | F.to_date(변형 컬럼, format=”현재 string의 형태 명시”) 형태 - ( MM/dd/yyyy or MM/dd/yyyy HH:MM:ss.SSS) | |
| groupby | grouped dataframe 반환 이에 대해 grgregation function 적용 가능 | ||
| sum | 합계 | ||
| countDistinct | 고유 개수 계산 | ||
| cast | 형태 변환 | cast(”long”) cast(”string”) | |
| lag lead | 이전 row 값 반환 이후 row 값 반환 | lag lead | |
| nlargest | 상위값 추출 | nlargest(n=10, columns=’’) | |
# pyspark sql function 사용
from pyspark.sql import functions as F
(df.select("City")).show(5)
(df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])).show(5) # df.columns: 리스트
df.filter(F.col(”CountyNumber”).isnull()).show(5)
# 행 제거
df= df.filter(F.col("City").isnotnull())
# 설치
%pip install plotly==5.11.0
# graph_objects : customizing 할 것 많아
import plotly.graph_objects as go
fig = go.Figure(
data = [go.Line(x=dataframe[""], y=dataframe[""])],
layout = go.Layout(
title = go.layout.Title(text="Title")
)
fig.show()
# express: graph_objects 보편화 버전
import plotly.express as px
fig = px.line(x=dataframe[""], y=dataframe[""], title="Title")
fig.show()
# 수평선 그리기
fig.add_hline(y=값, line_color='')
| 함수 | 기능 |
|---|---|
| toPandas | pyspark dataframe → pandas dataframe * 이유 - 시각화 위해서 모든 데이터 한번에 로드 |
# Pandas 기본 시각화로 plotly 사용
import pandas as pd
pd.options.plotting.backend = "plotly"
Hover Label
fig.update_layout(
hoverlabel_bgcolor="",
hoverlabel_font_size=,
hoverlabel_font_color='',
hoverlabel_font_family="")
# 간단한 html 사용하여 레이블 형식
fig.update_traces(hovertemplate='A:%{text} <br>'
'B:%{text} <br>' +
'C:%{text}')
Slider
fig.update_layout(xaxis=dic(rangeslider_visible=True) # x축 기준
Drop Down
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Line(
name="A",
x=dataframe[""],
y=dataframe[""]
))
fig.add_trace(go.Line(
name="B",
x=dataframe[""],
y=dataframe[""]
))
fig.update_layout(
updatemenus=[
dict(
type="dropdown",
direction="down",
buttons=list([
dict(label="Both",
method="update",
args=[{"visible": [True, True]},
{"title": "Title"}]),
dict(label="A",
method="update",
args=[{"visible": [True, False]},
{"title": "Title"}]),
dict(label="B",
method="update",
args=[{"visible": [False, True]},
{"title": "Title"}])
)]
)
udf : User Defined Functions
- 사용자 정의 함수
- 사용자가 정의한 python 함수가 column 형태를 받아서 column return 하도록 함
→ but pyspark function보다 성능 떨어지므로 기본 pyspark function사용 권장
from pyspark.sql.functions import udf
from pyspark.sql import types as T
def 함수(파라미터):
내용
함수명_udf = udf(
udf로 만들 함수,
T.return type
)