주류 산업의 데이터 분석 컨설턴트가 된 본인. 데이터 처리부터, 대시보드로 비즈니스 인사이트를 전달하기까지의 과정을 직접 처리해 보자!
캐글에서 가져온 데이터를 활용한다
https://www.kaggle.com/datasets/wethanielaw/iowa-liquor-sales-20230401




즉, 데이터 분석가는 raw 데이터 처리, 데이터 분석, 리포팅까지의 모든 업무를 할 수 있어야 한다!
이번 프로젝트에서는 위에서 언급했던 것과 같이 주류 산업의 데이터 분석 컨설턴트가 된 본인이 아래와 같은 업무를 맡았다고 가정한다.
- 해당 지역(여기서는, 데이터가 수집된 미국 아이오와 주를 의미함)의 주류 판매 시장의 동향은 어떤지 알려달라.
- 어느 지역에 어떤 상품을 주력으로 팔아야 할 지, 관련 인사이트를 제공해 달라.
이제, 분석 컨설턴트로서의 업무를 시작해 보자.
Spark, 중에서도 Pyspark를 활용할 것이다.
우선은 Spark의 구조와 데이터 처리 방식에 대해 알아보자!
스파크? 뭔가요??
분산 클러스터란건 뭔가요?

Cluster Manager
Driver Process
Spark Session
Executor Process
Spark의 데이터 처리 방식(1) - Partition

Spark의 데이터 처리 방식(2) - Transformation
Spark의 데이터 처리 방식(3) - Lazy Evaluation & Action
1) Lazy Evaluation
특정 연산 명령이 내려진 경우, raw 데이터에 적용할 transformation만 실행한다.
=> Action 전까지의 transformation을 모두 실행해서, 전 과정을 최적화하게 된다.
=> 데이터 흐름을 최적화하는 과정.
2) Action
실제 연산을 수행하는 과정.
ex) 카운트, 데이터 출력, 데이터 저장 등.
- 카탈리스트란?
: transformation을 적용할 때 스파크 SQL은 논리 계획이 담긴 tree 그래프를 생성하는데, 이를 카탈리스트라 한다.
=> 카탈리스트에 의해 action을 위한 최적의 논리로 데이터가 반환되기 때문에 성능이 좋다.

Python 환경에서 Apache Spark를 사용할 수 있는 인터페이스. 즉, Spark용 파이썬 API
Pyspark의 기능과 라이브러리엔 다음과 같은 것들이 있다.
1) Spark SQL and DataFrames
2) Pandas API on Spark
3) Structured Streaming
4) Machine Learning(MLib)
이제 PySpark를 직접 사용해보자! 구글 코랩에서 진행한다.
아래와 같이 코드를 실행해주자. 최신 버전은 아니지만, pyspark를 직접 사용하는 것이 더 중요하므로 넘어가도록 하자.

pyspark 객체 생성
생성된 pyspark객체의 설정 알아보기
PySpark로 데이터 처리를 하려면? 당연히 데이터가 있어야 한다.
직접 다운 받던 외부에서 가져오던 어쩌던 상관 없으나 보통은 다운로드를 받지.
하지만 그 데이터가 용량이 굉장히 크다면?
이런 경우엔 직접 다운받는 것이 아니라, API를 사용해서 데이터를 불러오곤 한다.
캐글에서 우리가 사용할 아이오와 주 주류 판매 데이터를 API를 이용해 가져와 볼 것이다.
우선, 캐글에 접속해서 우측 상단 계정 아이콘 - Settings - Account 항목의 API를 확인해보자.
Create New Token을 누르면, 해당 계정의 kaggle api token이 json 파일로 다운로드 된다.
이전에 발급받은 토큰이 있고 잃어버리지 않았다면 굳이 create 할 필요는 없으나, 잃어버렸다면 Expire Token 진행 후 Create 하도록 하자.

이제, 다운받은 토큰을 구글 코랩에 올려주자.
아래 코드 실행 후 파일 선택을 눌러서 토큰을 업로드하면 된다.

이후, 아래 코드를 실행시키면 데이터를 받아올 수 있다.
파일 크기가 6GB에 달한다.

이제 데이터를 dataframe 형태로 불러와보자. 크기가 커서 꽤나 시간이 걸린다
(Pandas의 dataframe 아님 주의)

데이터가 잘 불러와졌다!
컬럼도 확인해볼 수 있다.(df.columns 도 가능)
빅데이터를 이용한 파일 포맷엔 여러가지가 있다.

이 중에 Parquet는 PySpark에 특화된 파일 포맷이다.
해서, 불러온 데이터를 Parquet 파일로 저장해 볼 것이다.
parquet에 사용할 수 없는 문자들이 있다.
공백, 괄호 등이 있다. 컬럼명에 해당 문자들이 있다면 저장할 수 없으므로, 바꿔 주어야 한다.

바꿔주고 나서, df 데이터를 parquet 포맷으로 바꿔주자. 시간이 꽤 걸린다.

이제 직접 PySpark를 사용해보자. 우선 데이터 파악부터.



PySpark sql function을 사용할 것이다.

아래 함수를 실행해보자. 컬럼 별 null값을 얻을 수 있다.
isNull(), isNotNull함수는 대/소문자가 구분됨에 유의하자.
즉 isnull(), isnotnull()로 하면 실행 안된다는 말.

데이터 분석 컨설턴트인 본인의 이번 업무는 해당 지역(여기서는, 데이터가 수집된 미국 아이오와 주를 의미함)의 주류 판매 시장의 동향은 어떤지, 어느 지역에 어떤 상품을 주력으로 팔아야 할 지, 관련 인사이트를 제공하는 것이다.
즉 구체적인 위도나 경도는 필요 없다는 말이므로,가게의 위/경도를 나타내는 StoreLocation 컬럼과 CountyNumber 컬럼은 필요 없을 것 같다. 없애주자.
지역 정보("Address", "City", "ZipCode", "County") 중 가장 null값이 적은 정보는?
County 빼고는 비슷비슷 하지만, City가 제일 적다
=> City가 null값인 행은 다 빼주자.
=> 결측치가 많은 Address, Zipcode, County 컬럼도 빼버리자

다른 컬럼의 null값들도 더 뜯어보자.
동일 카테고리에 카테고리 네임이 다른 경우도 있을까?
있었다!

그러한 카테고리 네임을, 가장 최신 버전으로 덮어씌워 하나의 카테고리엔 하나의 카테고리 네임만 남도록 할 것이다.
결측치를 마저 처리해 보자. category와 categoryname의 결측치.


전부 다 시각화를 해 볼 필요까지는 없고(하고싶다면 해 보도록 하자!), 특정 city 데이터만 시각화 해 보자.
우선 시각화할 수 있게 pandas dataframe으로 바꿔주자.
plotly 설치 해 주고...
시각화 할 수 있도록 groupby로 데이터 전처리
이제 간단한 시계열 그래프를 그려 보자.
plotly의 장점은 interactive한 그래프를 그릴 수 있다는 것이다.

그런데, matpltlib이나 seaborn에 비해 코드가 복잡하다. 더 쉽게 쓸 수는 없을까?
=> 있다! plotly의 express 패키지를 사용하면 된다.
커스터마이징은 좀 덜 가능할지 모르지만, 훨씬 간단하게 그래프를 그릴 수 있다.
- 전체적인 성장세를 알아보고자 한다. 성장세는 무엇으로 판단할 수 있는가?
- 매출이 증가하고 있는가?
- 점포 수가 증가하고 있는가?
우선은 보고자 하는 정보에 맞춰 데이터를 전처리한 후 pandas로 바꾸자
그리고 나서 plotly의 express 패키지를 이용해서 매출과 상점 수의 변화를 선 그래프로 그려보자.
꾸준히 성장하고 있는 것으로 보인다.
매출의 성장률을 보고 싶다.

위의 결과들로 미루어보아 주류 시장 전체의 매출과 점포 수는 꾸준히 증가하고 있다. 허나 매출의 증가율(성장률)이 최근 들어 낮아졌음을 알 수 있다.
- 영업 이익?
=> 말 그대로, 이것저것 다 하고 남은 순 이익.
=> 매출 - 매출 원가 - 기타 등등 비용.
PySpark udf를 사용해서 계산해볼 것이다.
udf는 별 게 아니고, user define function의 약자로, 그냥 직접 만든 함수를 pyspark에서 사용하겠다는 것이다. 직접 작성한 함수를 pyspark에서 사용하기 위해서는, udf 객체로 만들어주어야 한다.
우선 아래 2가지를 import해야 한다.
그리고, 사용하고 싶은 함수를 정의한 후 udf로 만든다.

이제, 해당 udf를 이용해서 영업이익을 계산해보자.
그래프도 그려볼 수 있다.
