aggregate를 통해 자주 쓰이는 집계 방식을 직접 구현해봄으로써 RDD 연산의 원리를 이해하려 하였다.fold() 함수 fold(zeroValue, func)는 모든 파티션마다 zeroValue가 하나씩 적용되어 누적 계산된다. sc.parallelize([2,3,4], 4)처럼 파티션 수가 데이터 수보다 많다면 빈 파티션에도 zeroValue가 쓰이고 결과가 예상보다 커질 수 있다.aggregate()aggregate(zeroValue, seqOp, combOp)는 zeroValue에 대해 파티션 내외 두 가지 함수를 적용함으로써 평균 계산, 문자열 누적 등 복합 연산이 가능하다.(합계, 개수)를 튜플로 전달하여 전체 평균을 계산하는 방식은 fold()보다 명확하게 병렬 환경에서 상태를 유지할 수 있다.hello, world, spark)aggregate를 사용하면 문자열을 순차적으로 이어붙이면서 줄번호까지 함께 조작할 수 있다.seqOp과 combOp의 역할을 분리해 생각하는 것이 중요했고 combOp에서 index가 기대와 다르게 누적되는 이유는 각 파티션의 index가 별도로 관리되며 단순히 누적되어 최종 index가 커지는 구조이기 때문이다..glom().collect()을 통해 파티션 구성을 확인함으로써 연산 결과가 어떻게 나오는지를 납득할 수 있었다.[1, 5, 3, 9], [2, 8, 4, 7, 6] 처럼 2개 파티션으로 구성되면 각 파티션마다 seqOp → combOp 순서로 집계됨을 직접 확인하였다.groupBy()의 작동 방식groupBy(func)는 값 전체를 그룹핑하여 이터레이터로 반환! 후처리 (sorted, list(y))가 필요하다.groupBy는 단순 그룹핑, reduceByKey는 그룹 후 연산 포함이라는 점도 이해하였다.df.filter(df.col1 == 'A' or df.col2 == 'B') → Python에서는 작동하지만 Spark에서는 오류 발생|, &)를 써야 하며 각 조건은 괄호로 묶어야 오류 없이 작동한다는 걸 알게 되었다.=> df.filter((df.col1 == 'A') | (df.col2 == 'B'))StringIndexer는 학습할 때(.fit()) 문자열 값을 숫자로 바꾸는 규칙을 미리 만들어두고OneHotEncoder는 그 숫자를 벡터로 바꿀 수 있도록 틀을 준비해두고StandardScaler는 숫자들을 정규화하기 위해 평균과 표준편차 같은 기준값을 미리 계산해두는 것VectorAssembler는 단일 컬럼도 리스트로 받아야 하고 정규화를 위해 반드시 벡터형태로 먼저 만들어야 한다.inputCol과 outputCol을 꼼꼼히 맞춰야 전체 파이프라인이 오류 없이 작동한다는 점!.fit() 한 번이면 test 데이터에도 동일한 기준으로 처리할 수 있음aggregate 결과에서 index 값이 왜 예상보다 커지는지 이해하는 데 시간이 걸렸다. (→ 사실은 각 파티션이 독립적으로 index를 누적해서였다….)
PySpark 연산자에서 Python 스타일로 조건을 썼다가 연산자 우선순위 문제 + Py4JError 발생
오늘 뭔가 집중력이 평소보다 흐려진 느낌이 있었고 열심히 이해하려고 따라가긴 했지만 내가 이걸 진짜 알고 있나? 하는 찝찝함이 남기도 했다.
처음에 Logical/Optimized/Physical Plan을 해석하는 과정이 너무 낯설고 어렵게 느껴졌고 튜닝과 관련해서도 개념은 이해했지만 감이 안 잡혔다.
서비스존별 승차/하차 집계를 할 때 중복 조인, groupBy 위치 문제로 인한 이상 결과가 출력되는 상황을 디버깅하는 데 시간이 오래 걸렸다.
join → groupBy와 groupBy → join의 순서 차이에 따라 결과가 완전히 달라진다는 걸 실습하면서 딱 문제로 만나 로직 설계 시 좀 더 미리 고민이 필요하다는 점을 느꼈다.