📖 학습 주제
- 빅데이터
- Hadoop
✏️ 주요 메모 사항 소개
빅데이터

빅데이터의 정의는 "서버 한대로 처리할 수 없는 규모의 데이터", "기존의 소프트웨어로는 처리할 수 없는 규모의 데이터"와 같은 것들이 있다.
빅데이터 처리의 특징
- 큰 데이터를 손실없이 보관할 방법이 필요
- 큰 데이터 저장이 가능한 분산 파일 시스템이 필요
- 처리 시간이 오래 걸림
- 병렬 처리가 가능한 분산 컴퓨팅 시스템이 필요
- 비구조화된 데이터일 가능성이 높음
- 결국 다수의 컴퓨터로 구성된 프레임워크가 필요하다 (대용량 분산 시스템).
Hadoop

Hadoop은 대규모 데이터를 저장하고 처리하는 오픈 소스 분산 컴퓨팅 프레임워크이다. 주로 HDFS(분산 파일 시스템)와 MapReduce(분산 처리 모델)를 사용해서 데이터를 분산 저장하고 병렬 처리한다.
현재의 하둡2.0은 YARN이란 이름의 분산처리 시스템위에서 동작하는 애플리케이션이 되었다. 추가적으로 Spark는 YARN위에서 애플리케이션 레이어로 실행된다.

HDSF - 분산 파일 시스템
- 데이터를 블록단위로 나눠 저장 (블록의 크기는 128 MB (default))
- 블록 복제 방식 (Replication)
- 각 블록은 3 군데에 중복 저장됨
- Fault tolerance를 보장할 수 있는 방식으로 이 블록들은 저장됨
- 하둡 2.0 내임노드 이중화 지원
- Active & Standby 둘 사이에 share edit log가 존재)
- Secondary 내임노드는 여전히 존재

MapReduce - 분산 컴퓨팅 시스템
- 하나의 잡 트래커와 다수의 태스크 트래커로 구성
- 잡 트래커가 일을 나눠서 다수의 태스크 트래커에게 분배
- 태스크 트래커에서 병렬 처리

YARN
하둡2.0에서는 분산 컴퓨팅 시스템으로 YARN을 이용하는데, 이는 세부 리소스 관리가 가능한 범용 컴퓨팅 프레임워크이다.
- 리소스 매니저 : Job Scheduler, Application Manager
- 노드 매니저
- 컨테이너 : 앱 마스터, 태스크
Spark가 이 위에서 구현된다.

Yarn의 동작

- 실행하려는 코드와 환경 정보를 RM(Resource Manager)에게 넘김
- 실행에 필요한 파일들은 application ID에 해당하는 HDFS 폴더에 미리 복사됨
- RM은 NM(Node Manager)으로부터 컨테이너를 받아 AM(Application Master) 실행
- AM은 프로그램 마다 하나씩 할당되는 프로그램 마스터에 해당
- AM은 입력 데이터 처리에 필요한 리소스를 RM에게 요구
- RM은 data locality를 고려해서 리소스(컨테이너)를 할당
- AM은 할당받은 리소스를 NM을 통해 컨테이너로 론치하고 그 안에서 코드를 실행
- 이 때 실행에 필요한 파일들이 HDFS에서 Container가 있는 서버로 먼저 복사
- 각 태스크는 상황을 주기적으로 AM에게 보고 (heartbeat)
- 태스크가 실패하거나 보고가 오랜 시간 없으면 태스크를 다른 컨테이너로 재실행
MapReduce 프로그래밍
MapReduce 프로그래밍은 대용량의 데이터를 처리하는데 목적을 두고 있다. 다음과 같은 특징이 있다.
- 데이터 셋은 Key, Value의 집합이며 변경이 불가하다.
- 데이터 조작은 map과 reduce 두 개의 오퍼레이션만 가능하다.
- 이 두 오퍼레이션은 항상 하나의 쌍으로 연속으로 실행
- 이 두 오퍼레이션의 코드를 개발자가 채워야함
- MapReduce 시스템이 Map의 결과를 Reduce단으로 모아줌
- 이 단계를 보통 셔플링이라고 부르며, 네트워크단을 통한 데이터 이동이 생긴다.

<MapReduce 프로그램 동작 예시 : Word Count>

Map
- Map: (k, v) -> [(k', v')*]
- 입력은 시스템에 의해 주어지며 입력으로 지정된 HDFS 파일에서 넘어옴
- 키,밸류 페어를 새로운 키,밸류 페어 리스트로 변환 (transformation)
- 출력: 입력과 동일한 키, 밸류 페어를 그대로 출력해도 되고 출력이 없어도 됨
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

Reduce
- Reduce: (k’, [v1’, v2’, v3’, v4’, …]) -> (k’’, v'')
- 입력은 시스템에 의해 주어짐
- 맵의 출력 중 같은 키를 갖는 키/밸류 페어를 시스템이 묶어서 입력으로 넣어줌
- 키와 밸류 리스트를 새로운 키,밸류 페어로 변환
- SQL의 GROUP BY와 흡사
- 출력이 HDFS에 저장됨
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

Shuffling and Sorting
- Shuffling
- Mapper의 출력을 Reducer로 보내주는 프로세스를 말함
- 전송되는 데이터의 크기가 크면 네트웍 병목을 초래하고 시간이 오래 걸됨
- Sorting
- 모든 Mapper의 출력을 Reducer가 받으면 이를 키별로 소팅

Data Skew
각 태스크가 처리하는 데이터 크기에 불균형이 존재한다면 병렬처리는 큰 의미가 없다. 가장 느린 태스크가 전체 처리 속도를 결정하기 때문. 특히 Reducer로 오는 나눠지는 데이터의 크기는 큰 차이가 있을 수 있다.
Group By나 Join등이 이에 해당할 수 있고, 처리 방식에 따라 Reducer의 수에 따라 메모리 에러등이 일어날 수 있다.

문제점
MapReduce 프로그래밍의 문제점은 낮은 생산성과 배치작업 중심이다.
- 낮은 생산성
- 프로그래밍 모델이 가진 융통성 부족 (2가지 오퍼레이션만 지원)
- 튜닝/최적화가 쉽지 않음 (데이터 분포가 균등하지 않은 경우)
- 배치작업 중심
- 기본적으로 Low Latency가 아니라 Throughput에 초점이 맞춰짐
이에 따라서 MapReduce의 대안들이 등장하기 시작했는데, 대표적으로 YARN과 Spark와 같은 더 범용적인 대용량 데이터 처리 프레임워크들이 등장했다.
SQL을 사용하는 Hive, Presto와 같은 툴도 등장했다.
- Hive
- MapReduce위에서 구현됨. Throughput에 초점. 대용량 ETL에 적합
- Presto
- Low latency에서 초점. 메모리를 주로 사용. Adhoc 쿼리에 적합
- AWS Athena가 Presto 기반
Spark

Apache Spark는 빅데이터 처리를 위한 빠르고 범용적인 오픈 소스 분산 처리 시스템이다. 메모리 내에서 데이터 처리를 최적화하여 빠르게 데이터를 처리할 수 있다. 다양한 데이터 소스와 통합되며, 실시간 스트리밍, 머신러닝, 그래프 처리 등을 지원한다.
구성
- Spark Core
- Spark SQL
- Spark ML
- Spark Streaming
- Spark GraphX

특징
- 하둡(YARN)이외에도 다른 분산 컴퓨팅 환경 지원 (K8s, Mesos)
- 판다스 데이터프레임과 개념적으로 동일한 데이터 구조 지원
- 다양한 방식의 컴퓨팅을 지원
- 배치 데이터 처리, 스트림 데이터 처리, SQL, 머신 러닝, 그래프 분석
Spark 프로그래밍 API
- RDD (Resilient Distributed Dataset)
- 로우레벨 프로그래밍 API로 세밀한 제어가 가능
- 하지만 코딩 복잡도 증가
- DataFrame & Dataset (판다스의 데이터프레임과 흡사)
- 하이레벨 프로그래밍 API로 점점 많이 사용되는 추세
- 구조화 데이터 조작이라면 보통 Spark SQL을 사용
- DataFrame/Dataset이 꼭 필요한 경우는?
- ML 피쳐 엔지니어링을 하거나 Spark ML을 쓰는 경우
- SQL만으로 할 수 없는 일의 경우
- Spark SQL
- 구조화된 데이터 처리를 SQL로 처리
- 데이터프레임을 테이블처럼 sql로 처리 가능
- Spark ML
- 머신러닝 관련 다양한 알고리즘, 유틸리티로 구성된 라이브러리
- Classification, Regression, Clustering, Collaborative Filtering, …
- 전체 리스트는 링크 참고. 딥러닝 지원은 미약
- RDD 기반과 데이터프레임 기반의 두 버전이 존재
- spark.mllib vs. spark.ml
- spark.mllib가 RDD 기반이고 spark.ml은 데이터프레임 기반
- spark.mllib는 RDD위에서 동작하는 이전 라이브러리로 더 이상 업데이트가 안됨
- 항상 spark.ml을 사용할 것!
- import pyspark.ml (import pyspark.mllib)
Spark ML의 장점은 원스톱 ML 프레임워크라는 것이다. 대용량 데이터도 처리 가능하다.
- 데이터프레임과 SparkSQL등을 이용해 전처리
- Spark ML을 이용해 모델 빌딩
- ML Pipeline을 통해 모델 빌딩 자동화
- MLflow로 모델 관리하고 서빙 (MLOps)
사용 예들
기본적으로 대용량 데이터 배치 처리, 스트림 처리, 모델 빌딩과 같은 예들이 있다.
- 예1) 대용량 비구조화된 데이터 처리하기 (ETL or ELT)

- 예2) ML 모델에 사용되는 대용량 피쳐 처리 (배치/스트림)

- 예3) Spark ML을 이용한 대용량 훈련 데이터 모델 학습
실행 환경
- 개발/테스트/학습 환경 (Interactive Clients)
- 노트북 (주피터, 제플린)
- Spark Shell
- 프로덕션 환경 (Submit Job)
- spark-submit (command-line utility) : 가장 많이 사용
- 데이터브릭스 노트북
- 노트북 코드를 주기적으로 실행해주는 것이 가능
- REST API
- Spark Standalone 모드에서만 가능
- API를 통해 Spark Job을 실행
- 실행 코드는 미리 HDFS등의 파일 시스템에 적재되어 있어야함
구조(YARN)

- Driver : 실행되는 코드의 마스터 역할 수행 (YARN의 Application Master)
- 사용자 코드를 실행하며 실행 모드(client, cluster)에 따라 실행되는 곳이 달라짐
- 코드를 실행하는데 필요한 리소스를 지정함
- --num-executors, --executor-cores, --executor-memory
- SparkSession을 만들어 Spark 클러스터와 통신 수행
- Cluster Manager (YARN의 경우 Resource Manager)
- Executor (YARN의 경우 Container)
- 사용자 코드를 실제 Spark 태스크로 변환해 Spark 클러스터에서 실행
- Executor : 실제 태스크를 실행해주는 역할 수행 (YARN의 컨테이너)
- 실제 태스크를 실행해주는 역할 수행 (JVM): Transformations, Actions
- YARN에서는 Container가 됨
-
local[n]
- 개발/테스트용, 하나의 JVM이 클러스터로 동작.
- Driver와 하나의 Executor 실행
- n은 코어의 수

-
YARN
- 두 개의 실행 모드가 존재: Client vs. Cluster
- Client 모드: Driver가 Spark 클러스터 밖에서 동작
- YARN 기반 Spark 클러스터를 바탕으로 개발/테스트 등을 할 때 사용
- Cluster 모드: Driver가 Spark 클러스터 안에서 동작
- 하나의 Container 슬롯을 차지
- 실제 프로덕션 운영에 사용되는 모드

-
Kubernetes
-
Mesos
-
Standalone