[ZB] SQL분석 - chap.7 대규모 주류 판매 데이터

porii·2024년 11월 19일

[edu] zerobase

목록 보기
22/28

<대규모 주류 판매 데이터 분석>

  • Role : 주류 산업 데이터 컨설턴트
  • 데이터 처리 ~ 리포팅
    데이터셋 : kaggle
  • Iowa Liquor Sales
  • 데이터 소개
  • Raw 데이터를 다룰 줄 아는 능력 중요
  • Spark - 데이터 처리,데이터 분석 + colab
    • SQL, 스트리밍, 머신러닝, 그래프 처리를 위한 기본 제공 모듈이 있는 대규모 데이터 처리용 통합 분석 엔진
  • MySQL - 데이터 분석 + workbench
    • 관계형 데이터베이스 관리
  • Tableau - 리포팅
    • 데이터 분석 및 시각화 BI 도구

Spark 와 PySpark

  1. Spark

    : 분산 클러스터 컴퓨팅 오픈소스 프레임워크

    • 분산 클러스터란 ? 시스템 전반적 성능 향상을 위해 계산 부하량을 여러 노드에서 분담하여 병렬처리 하도록 구성
    1. Cluster manager
      • 사용 가능한 자원 파악
      • 클러스터 데이터 처리 작업 관리 및 조율
      • 사용자는 클러스터 관리자에게 스파크 어플리케이션 제출
    2. Driver Process
      • 클러스터 노드 중 하나
      • 명령을 Executor에서 실행하도록 분석, 배포, 스케줄링
      • 각 클러스터 당 1개의 노드에만 존재
      1. Spark Session
        • 스파크 응용 프로그램 통합 진입점
        • 스파크 기능과 구조가 상호방식하는 방식 제공
      2. User Code
    3. Executors
      • 드라이버 프로세스가 할당한 작업 수행
      • 진행 상광을 다시 드라이버 노드에 보고
    • 데이터 처리 방식
      1. Partition
        모든 executor가 병렬로 작업 수행할 수 있도록 청크 단위로 데이터를 분할
        파티션 : 클러스터의 물리적 머신에 존재하는 row의 집합
        • Spark의 병렬성은 파티션과 익스큐터의 개수로 결정됨
      2. Transformation
        변경 원할 때 변경 방법을 Spark에게 알려줌
        → 논리적 실행 계획을 세움. 실제 연산이 일어나는 것은 X
      3. Lazy Evaluation & Action
        1. Lazy Evaluation
          특정 연산 명령이 내려진 즉시 데이터 수정하지 않고, 원시 데이터에 적용할 트랜드포메이션만 실행
          액션 전까지 전체 데이터 흐름을 계산해 최적화 = 강점
        2. Action
          실제 연산 수행 위한 사용자 명령
          트랜드포메이션으로부터 결과 계산하도록 지시
    • 카탈리스트 : 트랜스포메이션 적용 시 스파크 SQL은 논리 계획이 담긴 트리 그래프 생성 해당 optimizer에 의해 최적의 논리를 통해 데이터 반환 → 성능 좋음
      • 논리 계획 4단계
        1. Analysis
        2. LogicalOptimization
        3. Physical Planning
        4. Code Generation
      • 장점
        • 데이터 연산에 앞서 데이터를 계획적으로 자동 최적화 함 (데이터 Shuffle하고 Partition 하여 나뉜 데이터에서 연산 처리 시, 네트워크 연산(node들 간의 통신) 발생.
          but 연산 속도의 경우 “인메모리 >>디스크 I/O >> 네트워크”)
  2. PySpark

    Python 환경에서 Spark를 사용할 수 있는 인터페이스 ( Spark용 API )

    • 기능 및 라이브러리
      • Spark SQL and Dataframes 대용량 정형데이터 처리를 위해 SQL 인터페이스 지원 SQL쿼리 사용 가능 데이터표현 형식 = Dataframe
      • Pandas API on Spark Pandas API 지원 Pandas와 같은 문법 사용
      • Structured Streaming Spark SQL엔진에 구축된 스트림 처리 엔진 정적 데이터에 배치계산을 하는 것과 같은 방식 스트리밍 계산 표현 가능
      • Machine Learning Spark의 머신러닝 라이브러리 데이터 병렬처리 방법론을 활용한 모델링 Classification, Regression, Clustering Demension Reduction, Optimization 등

PySpark - Google Colab

환경설정

# 리눅스 기반 코드
!apt-get install openjdk-8-jdk-headless                                                  # jdk 설치
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz # spark 파일
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz                                                   # 스파크 압축 풀기
!pip install -q findspark                                                                # 스파크 찾기
!pip install kaggle --upgrade                                                   # 캐글 데이터 다운받기 위한 kaggle library 설치
  • apt-get : 시스템에서 사용가능한 패키지에 대한 설치, 패키지 검색, 업데이트 및 기타 여러 작업
  • wget : 웹 상의 파일을 다운로드
  • tar : 여러 개의 파일을 하나의 파일로 묶거나 풀 때
  • pip : 파이썬에서 외부 라이브러리를 설치, 업그레이드, 제거, 검색 등
import os         # 운영체제와의 상호작용 돕는 다양한 기능 제공
import findspark

# 환경변수에 path 지정
os.environ["JAVA_HOME"] = ""
os.environ["SPARK_HOME"] = ""

findspark.init()   # spark경우 잘 찾지 모샇는 경우가 있어 findspark 이용

Spark Session 개체 생성

from pyspark.sql import SparkSession

spark = (
		SparkSession
		.builder
		.appName("Pyspark_test")
		.master("local[*]")           # local에서 사용하며, 모든 쓰레드를 사용
		.getOrCreate()
		)
# 만들어진 spark 객체 설정 알아보기
spark.sparkContext.getConf().getAll()

Kaggle 연결 및 api 다운로드

kaggle - account - API - Create New Token - json파일 다운 -

토큰 잃어버리면 다시 찾을 수 X - expire한 후 새 토큰 재발급

from google.colab import files

files.upload()                 # json 파일 업로드
!mkdir -p /root/.kaggle                        # kaggle 폴더 생성
!cp kaggle.json /root/.kaggle/                     # json 파일 복사
!chmod 600 /root/.kaggle/kaggle.json               # file 접근 권한 할당
!kaggle datatsets download -d wethanielaw/iowa-liquor-sales-20230401           # data download
!unzip iowa-liquor-sales-20230401.zip          # 압축 풀기
  • mkdir : 디렉토리(폴더) 생성 명령어
  • cp : 파일 복사 / 이동
  • chmod : 기존파일 or 디렉토리에 대한 접근권한 변경
    • 600 : 나에게만 읽기, 쓰기
  • unzip : zip 파일 압축 풀기
# 파일 크기 알기
import os

def convert_size(size_bytes):
    import math
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return "%s %s" % (s, size_name[i])

file_size = os.path.getsize('./파일명.csv')
print('File Size:', convert_size(file_size), 'bytes')

Spark dataframe으로 읽어오기

df = spark.read.csv(
			path = "파일명.csv", header = True, inferSchema = True
			)
  • header : 열 이름이 데이터 내에 포함되어 있으면 True
  • inferschema : 스키마 자동 설정
import pandas as pd
temp = pd.read_csv("Iowa_Liquor_Sales.csv") # 데이터 용량이 너무 크면 오류
  • Pyspark가 지원하는 파일 형식
    1. csv

      • 단순한 값 목록
      • 쉼표, 다른 구분문자로 나타냄
      • 첫번째 헤더 필드 행 선택
    2. json

      {"데이터 이름" : "값"}

      • 사람이 읽을 수 있는 텍스트 사용하여 데이터 저장 및 전송
      • 데이터 소유 위한 개방형 표준 파일 형식
    3. JDBC
      - 자바에서 데이터베이스에 접속할 수 있도록 하는 자바 api
      - DB 불러와 DF로 처리

      <빅데이터 위한 파일 포맷>

    • 기계가 읽을 수 있는 바이너리 포맷
    • 여러개의 디스크로 나뉠 수 있음 - 확장성, 동시처리 O
    1. Avro
      • row 기반 저장
      • 모든 필드에 접근해야 할 때 유용
      • platform - Kafka, Druid
    2. Parquet
      • column 기반 저장
      • 특정 필드에 접근해야할 때 유용
      • platform - Impala, Spark
    3. ORC
      • column 기반 저장
      • platform - Hive, Presto
  • 스키마?
    • 데이터베이스에서 데이터가 구성되는 방식, 서로 다른 엔티티 간의 관계를 설명

df.show() - 기본적으로 20개 행 보여줌. 숫자 기입 가능

df.printSchema()

df.columns

Parquet 파일로 저장

# 공백 및 괄호 제외
def replace_word(col_name):
    replace_words = {
        " ": "",
        "(": "_",
        ")": ""
    }
    for word in replace_words:
        col_name = col_name.replace(word, replace_words[word])
    return col_name

for col in df.columns:
    df = df.withColumnRenamed(col, replace_word(col))
df.write.format("parquet").save(
    path = "data_parquet",
    header = True
)
# google colab에서 파일 다운로드
from google.colab import files
download_list = os.listdir("./data_parquet")
for file_name in download_list:
	if file_name[-3:] != 'crc':
		files.download("./data_parquet/" + file_name)

Parquet파일 불러와서 csv로

  • 시간 체크
    • 데이터 불러오는 데에 : csv >>> parquet

    • transformation : 비슷

    • action (결과값 반환) :

      %%time              # 시간체크해주는 것
      spark.read.csv("Iowa_Liquor_Sales.csv",header=True, inferSchema=True)

PySpark SQL vs DataFrame

  • Spark SQL 사용 위해 dataframe을 테이블이나 뷰로 등록 필요
# dataframe -> 뷰 등록
df_parquet.createOrreplaceTempView("sql_df")

테이블명.filter(조건)
  • SparkSQL과 DataFrame api중 쿼리 수행 시 성능상 효율 - 비슷

  • SparkSQL

    • (+) : 문자열 형태 쿼리로 읽기 쉬움
    • (-) : 쿼리 유지보수 어려움
  • DataFrameAPI

    • (+) : code 모듈화 - 효율성 높음
      • 보통 대용량 데이터 처리 위해서 spark 사용
      • low level의 데이터 많음 → 로그 형태나 비즈니스 로직 따라 코드 변화 큼
    • (-) : 로직이 복잡하면 이해하기 어려움
  • select 사용하지 않으면 모든 column 반환

  • output = 또다른 dataframe 반환

PySpark로 데이터 다루기

함수기능SQL사용/비고
df.show()데이터프레임 보여줌
df.count()행 수- 항상 dataframe형태로 나오는 것은 아님count
df.printSchema()schema 보여줌(컬럼명 - datatype - nulltype)
결측치 처리
df.select()특정 컬럼 선택select
whenif문 같은 조건문 만들어
case whenF.when(조건, True일 경우 반환값).otherwise(아닐 경우 반환값)
aliasas.
새롭게 연산된 컬럼의 컬럼명(as)
isnull, isnotnull컬럼의 값이 null인지 아닌지isnull
filter구문의 true인 값 반환 = where 함수wheredf.filter(F.col(””).isnull())
drop열 제거-
withcolumn
withcolumnrename
기존 컬럼 업데이터, 타입변경, 신규 컬럼값 추가-withcolumn(신규컬럼명, 신규컬럼값)
기존 컬럼명과 동일할 경우 기존컬럼에 덮어쓰기
size컬럼 array 길이 반환
list 형태의 len과 유사 기능
collect_set
collect_list
object 고유 집합 반환(set만)( = distinct)
요소를 set(list)로 묶음-
over컬럼 window 정의 (=window over)
* window : 일정 기준으로 묶인 row
windowwindow 정의
partitionByW) window 중 그룹 (=groupby)
first, last첫번째, 마지막 값 반환ignorenulls=True: null인 경우 제외하고서
orderby정렬(default: 오름차순, desc(): 내림차순)
데이터 형 변환
to_date
to_timestamp
string → date 형태
string → timestampto_dateF.to_date(변형 컬럼, format=”현재 string의 형태 명시”)
형태 - ( MM/dd/yyyy or MM/dd/yyyy HH:MM:ss.SSS)
groupbygrouped dataframe 반환
이에 대해 grgregation function 적용 가능
sum합계
countDistinct고유 개수 계산
cast형태 변환cast(”long”)
cast(”string”)
lag
lead
이전 row 값 반환
이후 row 값 반환
lag
lead
nlargest상위값 추출nlargest(n=10, columns=’’)
# pyspark sql function 사용

from pyspark.sql import functions as F

(df.select("City")).show(5)

(df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])).show(5) # df.columns: 리스트

df.filter(F.col(”CountyNumber”).isnull()).show(5)

# 행 제거
df= df.filter(F.col("City").isnotnull())

PySpark - Plotly(시각화)

Plotly

  • Interactive한 그래프
  • Python에서는 dash 사용
# 설치
%pip install plotly==5.11.0
# graph_objects : customizing 할 것 많아

import plotly.graph_objects as go
fig = go.Figure(
			data = [go.Line(x=dataframe[""], y=dataframe[""])],
			layout = go.Layout(
								title = go.layout.Title(text="Title")
			)
fig.show()
# express: graph_objects 보편화 버전

import plotly.express as px
fig = px.line(x=dataframe[""], y=dataframe[""], title="Title")
fig.show()
  • graph_objects - express : 마치 matplotlib - seaborn
# 수평선 그리기

fig.add_hline(y=, line_color='')
함수기능
toPandaspyspark dataframe → pandas dataframe
* 이유 - 시각화 위해서 모든 데이터 한번에 로드
# Pandas 기본 시각화로 plotly 사용

import pandas as pd
pd.options.plotting.backend = "plotly"

Plotly - Interactive 기능

  1. Hover Label

    fig.update_layout(
    				hoverlabel_bgcolor="",
    				hoverlabel_font_size=,
    				hoverlabel_font_color='',
    				hoverlabel_font_family="")
    				
    # 간단한 html 사용하여 레이블 형식 
    fig.update_traces(hovertemplate='A:%{text} <br>'
    								'B:%{text} <br>' +
    								'C:%{text}')
  2. Slider

    • 범위 움직일 수 있도록
    fig.update_layout(xaxis=dic(rangeslider_visible=True) # x축 기준
  3. Drop Down

    • 각각의 그래프를 하나씩 생성
    import plotly.graph_objects as go
    
    fig = go.Figure()
    
    fig.add_trace(go.Line(
    		name="A",
    		x=dataframe[""],
    		y=dataframe[""]
    		))
    		
    fig.add_trace(go.Line(
    		name="B",
    		x=dataframe[""],
    		y=dataframe[""]
    		))
    		
    fig.update_layout(
    		updatemenus=[
    					dict(
                        	type="dropdown",
    						direction="down",
    						buttons=list([
    								dict(label="Both",
    									method="update",
    									args=[{"visible": [True, True]},
    										{"title": "Title"}]),
    								dict(label="A",
    									method="update",
    									args=[{"visible": [True, False]},
    										{"title": "Title"}]),
    								dict(label="B",
    									method="update",
    									args=[{"visible": [False, True]},
    										{"title": "Title"}])
    							)]
    		)

    PySpark - udf

    udf : User Defined Functions
    - 사용자 정의 함수
    - 사용자가 정의한 python 함수가 column 형태를 받아서 column return 하도록 함
    → but pyspark function보다 성능 떨어지므로 기본 pyspark function사용 권장

    from pyspark.sql.functions import udf
    from pyspark.sql import types as T
    
    def 함수(파라미터):
    	내용
    
    함수명_udf = udf(
    				udf로 만들 함수,
    				T.return type
    			)

0개의 댓글