2026.04.24(Fri)

오유찬·2026년 4월 24일

DE

목록 보기
16/16

MLlib #pyspark


Rating Class


PySpark(MLlib)에서 추천 시스템 모델(ALS) 만들 때 아주 표준적으로 사용하는 데이터 틀.

  • User
  • Product
  • Rating
from pyspark.mllib.recommendation import Rating

# Rating(user, product, rating)
my_rating = Rating(1, 101, 5.0)

Rating 객체는 내부적으로 namedtuple과 비슷하게 동작한다. my_rating.user처럼 속성값에 쉽게 접근할 수 있다.

namedtuple이란?
Python의 collections 모듈에 있는 가벼운 튜플 기반 자료형
값은 바꿀 필요 없지만, 각 칸에 이름을 붙여 읽기 쉽게 쓰고 싶을 때 사용한다. 불변한 값이고 리스트처럼 수정못 하지만, field name으로 접근할 수 있어 가독성이 좋다.

Rating class → Transformation


Transformation Process
1. Parsing : map → 문자열 줄을 Rating 객체로 변환
2. Filtering : filter → 특정 조건만 남도록 필터링
3. Extraction : map → 특정 정보만 추출

raw_data = sc.parallelize(["1, 101, 5,0", "1, 102, 3.0", "2, 101, 4.0"])

ratings_rdd = raw_data.map(lambda line: line.split(",")) \
					  .map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
					  
high_ratings = ratings_rdd.filter(lambda r: r.rating >= 4.0)

RDD를 다룰 때는 Lazy Evaluation(지연 연산) 특징을 기억해야 한다. map, filter를 쓴다고 바로 계산이 일어나지 않고 나중에 collect, count와 같은 Action 명령할 때까지 계획만 세워둔다.

Lazy Evaluation으로 얻을 수 있는 이점

  1. Query Optimization
    Spark는 명령어를 받으면 바로 실행하지 않고 DAG라는 설계도를 그린다.
    ex : 100만 개의 데이터 중에서 1번 유저만 필터링한 후에 5개만 가져와라.
    → Spark는 100만 개를 다 뒤지는 개 아니라 5개를 찾는 즉시 작업을 멈추는 최적의 경로를 찾아낸다.
  2. 메모리 효율성
    데이터 즉시 변형하지 않기에, 불필요한 중간 데이터 결과를 메모리에 일일이 저장하지 않는다.
  3. 장애 복구 (Fault Tolerance)
    계산 도중에 서버 한 대가 고장 나도, spark 그동안 그려온 설계도(Lineage)가 있기 때문에, 고장 난 부분만 다시 계산해서 복구할 수 있다.

ALS(Alternating Least Squares, 교차 최소 제곱법)


추천 시스템에서 가장 유명한 행렬 분해 알고리즘

데이터 : 사용자와 아이템으로 이루어진 거대한 행렬
→ 사용자의 취향 행렬(U)과 아이템의 특성 행렬(I)로 행렬을 분해한다.
이 두 값을 곱하면 예측 평점이 나온다.

Alternating?
두 행렬을 동시에 맞추는 것는 난이도가 높다.
1. 사용자 취향을 고정하고, 거기에 맞는 아이템 특성을 계산한다.
2. 아이템 특성을 고정하고, 거기에 맞는 사용자 취향을 다시 계산한다.
3. 만족스러운 과정이 나올 때까지 이 두 과정을 번갈아가면서(Alternating) 반복한다.

  • 확장성(Sacalability) : 데이터가 많아도 사용자/아이템별로 계산을 쪼갤 수 있어 Spark 같은 분산 처리 시스템에 적합하다.
  • Cold start 완호 : 사용자의 평점이 몇 개 없어도 비슷한 취향의 데이터를 통해 예측이 가능하다.

ALS의 핵심 파라미터
1. Rank(계수)
- 사용자나 아이템의 특징을 몇 개의 숫자로 표현할 것인가?
- 10~200 사이에서 결정
2. Iterations(반복 횟수)
- 사용자 행렬 ←→ 아이템 행렬을 몇 번 번갈아가며 업데이트할 것인가?
- 10~20
3. Lambda(정규화 매개변수)
- 모델이 너무 복잡해지지 않도록 벌금을 주는 수치
- 0.01, 0.1, 1.0 같은 값들로 테스트한다.

이 수치들을 제각기 조절해가는 것보다는 Grid Search라는 방법을 사용한다.
각 파라미터에 리스트를 설정해두면 모든 조합을 컴퓨터가 테스트한다.

Vector & LabeledPoint


~={cyan}Vector(데이터의 특징을 숫자로)=~
분류 모델이 이해할 수 있도록 데이터를 숫자로 바꾼 것이 벡터이다.
Spark에서는 두 가지 형태의 벡터를 지원한다.

  • Dense Vector (밀집 벡터) : 모든 데이터를 다 적는 방식
    - [1.0, 0.0, 3.5] (모든 위치의 값을 다 기록)
  • Sparse Vector (최소 벡터) : 0이 아주 많을 때, 0이 아닌 값이 어디에 있는지만 적어서 메모리를 아끼는 방식
    - 100개 데이터 중 5번 위치에 1.0, 10번 위치에 3.5가 있고 나머지는 다 0

~={blue}LabeledPoint(정답 + data)=~
벡터에 '이건 스팸이야(1), 이건 정상이야(0)'라는 정답(Label)을 붙여아 모델이 학습을 할 수 있다. 그 정답지 역할을 하는 게 LabeledPoint이다.

  • 구조 : LabeledPoint(label, features)
    - label : 우리가 맞히고 싶은 정답 ( 보통 Double 타입)
    - features : Vector (데이터의 특징)
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

lp = LabeledPoint(1.0, Vectors.dense([0.1, 0.5, 0.2]))

print(lp.label)     # 1.0
print(lp.features)  # [0.1, 0.5, 0.2]

MLlib의 많은 알고리즘은 입력값으로 반드시 LabeledPoint 형태의 RDD를 요구한다.

Map vs flatMap


map을 사용하는 상황

  • 데이터의 형식만 바꾸고 싶을 때 사용한다.
  • 전체 데이터의 개수(Line 수)가 변하지 않아야 할 때 적합하다.

flatMap을 사용하는 상황

  • 데이터를 잘게 쪼개거나 펼치고 싶을 때 사용한다.
  • 하나의 행을 여러 개의 데이터로 분리하고 싶을 때 적합하다.
  • 문장을 단어 단위로 쪼갤 때, 리스트 안에 리스트가 들어있는 구조를 하나로 합치고 싶을 때

flatMapmap + flatten의 합성어다.
map으로 먼저 데이터를 쪼갠 뒤(List의 형태로), 그 리스트의 껍데기를 까서 내용물만 밖으로 꺼내는(flatten) 작업을 동시에 해주는 것이다.

Clustering


라벨 없이 유사도가 높은 그룹들을 조직하는 비지도 학습 과제

  • K-means
  • 가우시안 혼합
  • 전력 반복 클러스터링(PIC)
  • 이분 K-means
  • steraming K-means

K-means clustering

  • 수치형 특성들로 이루어진 데이터
  • 목표 클러스터링 수 : 'K'

Project


columndata typedescriptioncleaning requirements
order_datetimestampDate and time when the order was madeModify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date
time_of_daystringPeriod of the day when the order was madeNew column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm
productstringName of a product orderedRemove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase
categorystringBroader category of a productEnsure all values are lowercase
purchase_statestringUS State of the purchase addressNew column containing: the State that the purchase was ordered from

order_date (5)

  • 12am ~ 5am 제거 (1)
  • timestamp → date

time_of_day (4)

  • 새 컬럼 생성
    - 5-12 am → morning
    - 12-6pm → afternoon
    - 6-12pm → evening
    - lower bound 포함, upper bound 미포함

product

  • 'TV'를 포함하는 행 제거 (1)
  • 모든 values → lowercase (2)

category

  • values → lowercase (2)

purchase_state (3)

  • 컬럼 추가
  • ordered_from에서 State 뽑아서 만들기

Data Processing in Shell #bash


curl(Client for URLs)


서버와 데이터를 주고받기 위한 Unix command line 도구
주로 HTTP 사이트나 FTP 서버에서 데이터를 다운로드할 때 사용한다.

기본 구조 : curl [option flag] [URL]

  • URL 입력은 필수

단일 파일 다운로드

  • 원래 파일 이름으로 저장 : -Ocurl -O [file URL]
  • 다른 이름으로 저장 : -ocurl -o [file URL]

서버에 비슷한 이름의 파일 다중 다운로드
curl -O https://website.com/datafile*.txt

Globbing Parser 활용

  • 연속 다운로드 → [ ] 사용
    - curl -O https://~~~/datafilename[001-100].txt
  • 간격 두고 다운로드 : 10번째 파일마다 다운로드 하고 싶다면 콜론: 추가
    - curl -O https://~~~/datafilename[001-100:10].txt

선제적 트러블슈팅

  • -L : 300번대 에러 코드(리다이렉트) 발생 시 자동으로 해당 URL 따라간다.
  • -C : 다운로드 도중 타임아웃 발생시, 중단된 지점부터 이어서 받는다.

Wget(World Wide Web & Get)


curl과 마찬가지로 HTTP 및 FTP를 통해 파일을 다운로드할 수 있는 도구

curl보다 더 다목적이다.

  • 단일 폴더, 폴더 전체, 웹페이지 자체를 다운로드 가능
  • 여러 파일을 재귀적으로 다운로드 가능

option

  • -b : background에서 실행
  • -q : Wget의 실행 로그 출력을 끈다
  • -c : 이전에 중단된 다운로드를 이어서 받는다.(Wget이 아닌 다른 프로그램으로 받던 파일도 가능)

preview the log file → cat wget-log

Wget으로 여러 파일 다운로드
다운로드하려는 모든 URL이 url_list.txt에 있을 때, -i → Wget에게 로컬 파일에서 URL 읽어오도록 지시
wget -i url_list.txt

  • -i url_list.txt 사이에 어떤 옵션도 들어가서는 안된다. -i 앞에 옵션 위치시킬 것

대용량 파일을 위한 다운로드 제한 설정
파일 다운로드가 네트워드 대역폭 전체를 점유하여 다른 작업을 방해하지 않도록 제한을 걸어야 할 때가 있다.
--limit-rate option

  • 숫자 입력 시 기본적으로 초당 바이트로 계산된다.
  • wget --limit-rate=200k -i url_list.txt
    - 초당 다운로드 속도가 200KB를 넘지 않도록 제한한다.

소용량 파일을 위한 다운로드 제한 설정
작은 파일들을 여러 개 받을 때는 대역폭 제한보다 서버에 과부하를 주지 않는 것이 중요하다.
이때는 파일 다운로드 사이에 강제적인 대기 시간을 두는 --wait 옵션을 사용

  • 시간 단위는 '초'
  • wget --wait=2.5 -i url_list.txt : 파일 다운로드 시마다 2.5초의 휴식시간

csvkit


Bash 명령어에 부족한 데이터 핸들링 기능을 보완하기 위해 python 라이브러리에 의존한다. csvkit은 이러한 간극을 메워주기 위한 파이썬을 기반으로 개발한, 데이터 변환, 처리, 정제 기능을 모아놓은 suite이다.

in2csv : 파일을 CSV로 변환

  • in2csv SPotifyData.xlsx > Spotify.csv
  • > (리다이렉트 연산자)를 사용하지 않고 xlsx 파일만 입려하면 데이터가 터미널에 출력만 될 뿐, 파일로 저장되지 않는다.

엑셀 파일에 여러 시트 있을 때, 특정 시트 변환하기

  • in2csv -n Spotify.xlsx (또는 --names) : 모든 시트 목록 출력
  • 특정 시트 변환 in2csv --sheet "work-one_popularity" Spotify.xlsx > Spotify_population.csv
  • 시트 이름에 공백이나 특수문자 있다면 따옴표("")로 감싸줘야 한다.

in2csv는 실행 시 별도의 로그를 남기지 않는다. ls를 통해 파일 생성을 확인해라.

csvlook : 데이터 미리보기
cat, less를 쓰면 터미널에서 데이터 형식이 깨져 보이기 쉽다. csvlook을 사용하면 마크다운 호환 방식의 고정폭 테이블로 예쁘게 정렬하여 보여준다.
csvlook Sptify_population.csv

csvstat : 기술 통계 확인
describe()와 유사한 기능을 한다. 평균, 중앙값, 고유값 개수 등 주요 통계 수치를 요약해서 보여준다.
csvstat Spotify_population.csv

profile
열심히 하면 재밌다

0개의 댓글