2004년 구글에서 발표한 Lage Cluster에서 Data Processing을 하기 위한 알고리즘
Hadoop MapReduce는 구글 알고리즘 논문을 소프트웨어 프레임워크로 구현한 구현체
Key-Value구조가 알고리즘의 핵심
모든 문제를 해결하기에 적합하지 않을 수 있다 (데이터의 분산 처리가 가능한 연산에 적합하다)
Map Function
Reduce Function
key와 value로 이뤄진 값을 넣으면 Key-value의 값이 나오는 형태이다
장점
단점
단일 JVM에서 전체 Job을 실행하는 방식으로 로컬상에서 테스트하는 환경말고는 거의 사용하지 않는다
Hadoop 버전 1.0대 까지 유지하던 MapReduce 분산 처리 방식으로 Jop Tracker와 Task Tracker를 사용하는 MapReduce 버전 1
Hadoop 버전 2.0이상에서 사용하는 MapReduce 분산 처리 방식으로 MapReduce 이외의 워크로드 수용이 가능한 MapReduce 버전 2
구현된 맵리듀스 Job을 제출하는 실행 주체
맵리듀스 Job이 수행되는 전체 과정을 조정하며, Job에 대해 수행하는 마스터 데몬 역활
Job에 대한 분할된 Task를 수행하며, 실질적인 Data Processing의 주체이다
n개의 서버로 구성되있는 slave에 떠있는 job을 수행하는 데몬
각 단계들 간의 Data와 처리과정에서 발생하는 중간 파일들을 공유하기 위해 사용
HDFS에 저장되어있는 파일을 처리하기 위해 사용
inputSplit은 물리적 Block들을 논리적으로 그룹핑 한 개념이다
하둡은 저장할 파일을 128mb 블록들로 쪼개서 관리하게 되는데 inputSplits가 쪼개진 물리적 블록들을 논리적으로 그룹핑하는 것이다
inputSplit은 Mapper의 입력으로 들어오는 데이터를 분할하는 방식을 제공하기 위해, 데이터의 위치와 읽어 들이는 길이를 정의한다
자바 프로그램으로 애플리케이션을 만들었다고 가정시
클라이언트가 하둡 Job Tracker한테 앱의 실행할 자료파일을 제출한다. Job Tracker는 전달 받은 자료파일을 HDFS에 저장을 한다. 이렇게 되면 모든 DataNode에서는 HDFS에 접근이 가능하므로 해당 파일을 가져오고 모든 Task Tracker가 해당파일을 참조를 하고 child JVM이 fork되고 Map Task 또는 Reduce Task가 실행이 된다
제출 과정
job의 submit 메소드
job Initialization (초기화)
Job Tracker
가 submitJob 메소드
의 호출을 받으면 Job을 내부 Queue에 저장
Job Scheduler는 Queue에서 Job을 읽어서 초기화 과정을 진행
초기화 과정
입력 스플릿 정보
를 가져온다mapred.reduce.tasks
속성 값에 따라 이 수 만큼 Reduce Task를 생성Task 할당
TaskTracker는 하트비트를 보내는 단순한 루프를 수행
하트비트는 Live 상태 체크 및 메시지 채널 용도로 사용된다
TaskTracker는 map 및 Reduce Task를 위한 많은 수의 혼합된 슬롯을 갖는다
JobTracker의 TaskTracker 선택기준
Task 실행
로컬 디스크에서 가장 처음 뜨는 Task가 Map Task이다
split되어있는 자료들을 Map Task에서 읽어서 맵리듀스 알고리즘 상의 알고리즘으로 처리를 해서 output을 주고, 반환된 output결과를 hash partitioning을 하고, 파티션 기준으로 같은 key를 갖고 있는 데이터들 끼리 모아서 merge 작업을 해주고 Reduce연산을 해준 후 최종 output을 내놓는다
위 내용을 그림상으로 표현해보면
이와 같은 형태로 진행이 되는 것이다
만약 맨 위의 text파일이 100기가 크기의 파일이라 하면 해당 파일은 128mb의 크기로 수많은 블록단위로 쪼개져서 분산 저장하게 된다, 이때 MapTask는 각 블록마다 하나씩 뜨게 된다
쪼개진 블록단위 텍스트 파일에 대해서 맵리듀스 애플리케이션을 적용하려 하는데 해당 앱은 전체 텍스트에서 각 단어가 몇번 나왔는지 세는 앱이라 가정
그러면 해당 맵리듀스 앱은 각 블록에서 MapTask를 key-value의 쌍으로 구현을 하고 각 블록에서 먼저 값들을 정리를 하고
정리된 블록에 대해서 key를 기준으로 같은 key를 가지고 있는 것들끼리 서버에 모으게 된다. 이때 MapTask를 수행한 서버와 Reduce를 수행한 서버끼리 트래픽이 발생하게 된다
필수로 구현이 필요한 부분은 Mapper 부분이다
Input
Mapper
combiner
partitioner
Reducer
output
Mapper에서 Reducer로 보내는 과정속에
shuffle이라하는데, 같은 키를 같고있는 애들끼리 모으고 sorting 후에 reducer의 입력값으로 들어가게 된다
이때, shuffle과정에서 트래픽이 발생할 수 밖에 없다
이 트래픽을 줄이는데 중요한 역활을 하는것이 combiner와 partitioner이다
Mapper의 output결과를 보통 sequenceFile로 쓴다
SequenceFile 이란?
Mapper 다음에 추가되어 있는 모습이다
데이터 전송량을 줄이기 위해서 미리 mapper내부에서 정보를 취합을 해서 Reducer로 전송할 트래픽양을 줄여준다
서로 다른 Mapper에서 생성된 중간 결과 key-value 쌍들을, key 중심으로 같은 키를 갖는 데이터는 물리적으로 동일한 Reducer로 데이터를 보내는 용도로 사용한다
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
numReduceTasks : reduceTask 개수
Mapper의 Immediate 결과 파일이 Reducer로 전달되는 과정이 Shuffling (이때, 트래픽이 발생)
서로 다른 Mapper로 부터 받은 데이터를 key중심으로 sorting 수행
-> 같은 키에 해당하는 리스트를 Reducer로 전달
위의 그림과같은 상황이라면 ReduceTask개수가 2개로 2로 모듈러 연산을 하여 파티셔닝을 한 것
Reducer는 Mapper의 출력 결과를 입력으로 받아서 데이터를 처리한다
처리된 데이터를 OutPutFormat의 형태에 맞게 결과로 출력
맵리듀스 애플리케이션에서 Reducer의 구현은 선택적 옵션이다
TextOutPutFormat
sequenceFileOutPutFormat
MultipleOutputsFormat
LazyOutputFormat
DBOutputFormat
Task 진행율
Task는 보고 플래그가 설정되 있다면 TaskTracker에게 진행 상황을 3초마다 보고
TaskTracker는 JobTracker에게 하트비트를 보낼 때 진행중인 모든 Task의 상태를 포함하여 전송
Client Job은 매초마다 JobTracker를 풀링하여 최신 정보를 갱신한다
Task 수행시 장애 발생시?
만약 job을 수행하는데 장애가 발생한다면 JobTracker가 장애가 발생한 해당 서버가 처리해야할 데이터를 가지고 있는 다른 TaskTracker에게 Job을 다시 실행하도록 명령을 던진다
이러한 job에 대한 장애발생도 하둡 내부적으로 알아서 처리가 된다
JobTracker는 하나의 Job에 대한 마지막 Task가 완료되었을 경우 상태를 성공
으로 변경한다
클라이언트는 상태를 검사하고 사용자에게 알려주기 위한 메시지를 출력한다
callBack으로 HTTP Job통지를 받고자하면 job.end.notification.url
을 설정하면 된다
번외
하둡의 데이터 유실 상황
하둡이 매우 busy한 상황인데 카프카와 같은 써드파티앱에서 하둡에 뭔가를 쓸려고 한다면 하둡은 거절을 하고 그동안 카프카에 적재된 데이터들에 대해서 보관 싸이클이 1시간이라 할 때, 이 시간 이후에도 하둡이 busy하다면 해당 파일들이 적재되지 못하고 유실되는 상황이 나올 수 있다