맵리듀스(MapReduce)란?

맵리듀스 알고리즘

  • Map Function

    • (key1, value1) -> (key2, value2)
  • Reduce Function

    • (key2, List of value2) -> (key3, value3)

key와 value로 이뤄진 값을 넣으면 Key-value의 값이 나오는 형태이다

  • HDFS에 분산 저장되어 있는 데이터를 병렬로 처리하여 취합하는 역활
  • Map Function과 Reduce Fucntion으로 구성
  • java, c++, pyton 등 다양한 언어 지원
  • Job에 대한 구동 및 관리는 하둡이 알아서 한다, 개발자는 비지니스 로직 구현에 집중

맵리듀스 장단점

장점

  • 단순하고 사용이 편리하다
  • 높은 확장성
  • 특정 데이터 모델이나 스키마, 질의에 의존적이지 않은 유연성
  • 저장 구조의 독립성
  • 데이터 복제에 기반한 내구성과 재수행을 통한 내고장성 확보

단점

  • 고정된 단일 데이터 흐름
  • 기존 DBMS보다 불편한 스키마 질의
  • 단순한 스케쥴링
  • 작은 데이터를 저장/처리하기에 적합하지 않다

맵리듀스 구동 방식

  • Local

단일 JVM에서 전체 Job을 실행하는 방식으로 로컬상에서 테스트하는 환경말고는 거의 사용하지 않는다

  • Classic

Hadoop 버전 1.0대 까지 유지하던 MapReduce 분산 처리 방식으로 Jop Tracker와 Task Tracker를 사용하는 MapReduce 버전 1

  • YARN

Hadoop 버전 2.0이상에서 사용하는 MapReduce 분산 처리 방식으로 MapReduce 이외의 워크로드 수용이 가능한 MapReduce 버전 2

MapReduce 1

  • 클라이언트 (Client)

구현된 맵리듀스 Job을 제출하는 실행 주체

  • 잡트래커 (Job Tracker)

맵리듀스 Job이 수행되는 전체 과정을 조정하며, Job에 대해 수행하는 마스터 데몬 역활

  • 테스크트래커 (Task Tracker)

Job에 대한 분할된 Task를 수행하며, 실질적인 Data Processing의 주체이다
n개의 서버로 구성되있는 slave에 떠있는 job을 수행하는 데몬

  • 하둡 분산파일시스템 (HDFS)

각 단계들 간의 Data와 처리과정에서 발생하는 중간 파일들을 공유하기 위해 사용
HDFS에 저장되어있는 파일을 처리하기 위해 사용

InputSplits

inputSplit은 물리적 Block들을 논리적으로 그룹핑 한 개념이다

하둡은 저장할 파일을 128mb 블록들로 쪼개서 관리하게 되는데 inputSplits가 쪼개진 물리적 블록들을 논리적으로 그룹핑하는 것이다

inputSplit은 Mapper의 입력으로 들어오는 데이터를 분할하는 방식을 제공하기 위해, 데이터의 위치와 읽어 들이는 길이를 정의한다

맵리듀스 구동 절차

자바 프로그램으로 애플리케이션을 만들었다고 가정시

클라이언트가 하둡 Job Tracker한테 앱의 실행할 자료파일을 제출한다. Job Tracker는 전달 받은 자료파일을 HDFS에 저장을 한다. 이렇게 되면 모든 DataNode에서는 HDFS에 접근이 가능하므로 해당 파일을 가져오고 모든 Task Tracker가 해당파일을 참조를 하고 child JVM이 fork되고 Map Task 또는 Reduce Task가 실행이 된다

  1. Job 실행
  2. 신규 Job ID할당(JobTracker) 및 수신
  3. Job Resource 공유
  4. Job 제출
  5. Job 초기화
  6. InputSplits 정보 검색
  7. TaskTracker에 Task를 할당
  8. TaskTracker가 공유되어 있는 Job Resource를 Local로 복사
  9. TaskTracker가 child JVM 실행
  10. MapTasker or ReduceTaks 실행

제출 과정

job의 submit 메소드

  • jobTracker에 새로운 job id를 요청
  • job에 대한 입력 스플릿(inputSplits)를 계산 -> Data locality 고려
  • job 수행에 필요한 자원(resource)을 JobTracker의 파일 시스템 (HDFS)상의 해당 job ID를 이름으로 하는 디렉토리에 복사 (jar 설정파일, inputSplits 정보)
    • -> 모든 TaskTracker가 리소스를 로컬로 복사 (Localization 과정)
  • job Tracker가 job을 시작할 준비가 되었음을 알림

job Initialization (초기화)

  • Job TrackersubmitJob 메소드의 호출을 받으면 Job을 내부 Queue에 저장

  • Job Scheduler는 Queue에서 Job을 읽어서 초기화 과정을 진행

    • 실행할 job을 표현하기 위해 하나의 객체를 생성
    • 이 객체는 job에 대한 task뿐만 아니라 상태 및 진행 과정을 유지하기 위한 부가 정보를 캡슐화

초기화 과정

  • 공유파일시스템 (HDFS)으로부터 계산된 입력 스플릿 정보를 가져온다
  • 각 InputSplit에 대해 하나의 MapTask를 생성
  • mapred.reduce.tasks 속성 값에 따라 이 수 만큼 Reduce Task를 생성

Task 할당

  • TaskTracker는 하트비트를 보내는 단순한 루프를 수행

  • 하트비트는 Live 상태 체크 및 메시지 채널 용도로 사용된다

    • 하트비트를 보낼 때 정보의 일부로 task를 수행할 준비가 되었는지 여부를 전달
    • jobTracker가 하트비트의 응답으로 할당된 task를 전달
  • TaskTracker는 map 및 Reduce Task를 위한 많은 수의 혼합된 슬롯을 갖는다

  • JobTracker의 TaskTracker 선택기준

    • Reduce Task는 순차적으로 선택된다
    • TaskTracker는 지역성을 계산하여서, 가까운 거리에 있는 InputSplit을 갖도록 선택한다

Task 실행

  • Job의 구현체 jar파일을 공유된 HDFS로 부터 로컬파일 시스템으로 복사
  • 분산 캐시로부터 필요한 모든 파일을 로컬 파일 시스템으로 복사
  • Task 실행을 위한 Local Job Directory를 생성하고 jar 압축 해체
  • TaskRunner Instance를 생성하여 Task를 수행

맵리듀스 데이터 처리 흐름

로컬 디스크에서 가장 처음 뜨는 Task가 Map Task이다

split되어있는 자료들을 Map Task에서 읽어서 맵리듀스 알고리즘 상의 알고리즘으로 처리를 해서 output을 주고, 반환된 output결과를 hash partitioning을 하고, 파티션 기준으로 같은 key를 갖고 있는 데이터들 끼리 모아서 merge 작업을 해주고 Reduce연산을 해준 후 최종 output을 내놓는다

위 내용을 그림상으로 표현해보면

이와 같은 형태로 진행이 되는 것이다

만약 맨 위의 text파일이 100기가 크기의 파일이라 하면 해당 파일은 128mb의 크기로 수많은 블록단위로 쪼개져서 분산 저장하게 된다, 이때 MapTask는 각 블록마다 하나씩 뜨게 된다

쪼개진 블록단위 텍스트 파일에 대해서 맵리듀스 애플리케이션을 적용하려 하는데 해당 앱은 전체 텍스트에서 각 단어가 몇번 나왔는지 세는 앱이라 가정

그러면 해당 맵리듀스 앱은 각 블록에서 MapTask를 key-value의 쌍으로 구현을 하고 각 블록에서 먼저 값들을 정리를 하고

정리된 블록에 대해서 key를 기준으로 같은 key를 가지고 있는 것들끼리 서버에 모으게 된다. 이때 MapTask를 수행한 서버와 Reduce를 수행한 서버끼리 트래픽이 발생하게 된다

맵리듀스 구현을 위한 Interface

필수로 구현이 필요한 부분은 Mapper 부분이다

Input

  • 데이터를 읽을 때 어떠한 형태의 데이터를 읽을지
  • 텍스트, 이미지 등
  • 읽는 데이터 형식에 맞는 inputFormater들이 기본적으로 만들어져서 제공되거나, 직접 구현해서 사용할 수 있다

Mapper

  • 실제 첫번째 비지니스 로직을 key-value형태로 구현

combiner

  • 추후 설명

partitioner

  • mapper에 나온 키들을 파티셔닝
  • 키를 기준으로 서로 다른 노드에서 나온 키들을 같은 키끼리 묶을수 있도록 같은 서버로 전송을 해주고

Reducer

  • 모든 서버들의 내용을 취합해서 원하는 출력을 만들고

output

  • 출력을 해준다

Mapper에서 Reducer로 보내는 과정속에
shuffle이라하는데, 같은 키를 같고있는 애들끼리 모으고 sorting 후에 reducer의 입력값으로 들어가게 된다
이때, shuffle과정에서 트래픽이 발생할 수 밖에 없다

이 트래픽을 줄이는데 중요한 역활을 하는것이 combinerpartitioner이다

InputFormat 종류

Mapper의 output결과를 보통 sequenceFile로 쓴다

SequenceFile 이란?

  • sequence file은 하둡 자체적으로 구현된 binary파일 포맷으로 key-value 쌍으로 구성되어있다

Combiner의 역활

Mapper 다음에 추가되어 있는 모습이다

데이터 전송량을 줄이기 위해서 미리 mapper내부에서 정보를 취합을 해서 Reducer로 전송할 트래픽양을 줄여준다

Partitioner의 역활

서로 다른 Mapper에서 생성된 중간 결과 key-value 쌍들을, key 중심으로 같은 키를 갖는 데이터는 물리적으로 동일한 Reducer로 데이터를 보내는 용도로 사용한다

  • 기본 디폴트 파티셔너는 데이터의 Key값을 해싱 처리하고 Reducer의 개수 만큼 모듈러 연산을 한다. 그래야 reducer에 맞게 보낼수 있으므로 (3개면 0, 1, 2번)
public class HashPartitioner<K, V> extends Partitioner<K, V> {
	public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

numReduceTasks : reduceTask 개수

Shuffling & Sort 과정

Mapper의 Immediate 결과 파일이 Reducer로 전달되는 과정이 Shuffling (이때, 트래픽이 발생)

서로 다른 Mapper로 부터 받은 데이터를 key중심으로 sorting 수행
-> 같은 키에 해당하는 리스트를 Reducer로 전달

위의 그림과같은 상황이라면 ReduceTask개수가 2개로 2로 모듈러 연산을 하여 파티셔닝을 한 것

Reducer의 역활

Reducer는 Mapper의 출력 결과를 입력으로 받아서 데이터를 처리한다

처리된 데이터를 OutPutFormat의 형태에 맞게 결과로 출력

맵리듀스 애플리케이션에서 Reducer의 구현은 선택적 옵션이다

OutPutFormat의 역활 및 종류

TextOutPutFormat

  • 디폴트 아웃풋 포맷으로, 텍스트 파일의 하나의 라인에 key-value 쌍을 출력한다

sequenceFileOutPutFormat

  • 주로 mapper의 output을 reducer로 보내기 전, key-value 쌍 구조를 압축하도록 출력한다

MultipleOutputsFormat

  • 출력 파일의 이름을 Key-Value 등에서 추출된 문자열로 구성하고, 해당 파일에 데이터를 쓸 수 있음
    (여러 개의 파일로 쓰기 가능함)

LazyOutputFormat

  • 결과로 출력할 데이터가 있는 경우에만, 파일을 생성하는 OutputFormat 의 Wrapper 로 사용

DBOutputFormat

  • 관계형 데이터베이스나 Hbase 로 데이터를 쓰기 위한 OutputFormat

MapReduce 진행 상황과 상태 갱신

  • Task 진행율

    • Map Task: 제출된 Map개수에 대한 처리 비율
    • Reduce Task : 총 진행을 3단계로 나누어서 계산 (shuffle 포함)
  • Task는 보고 플래그가 설정되 있다면 TaskTracker에게 진행 상황을 3초마다 보고

  • TaskTracker는 JobTracker에게 하트비트를 보낼 때 진행중인 모든 Task의 상태를 포함하여 전송

  • Client Job은 매초마다 JobTracker를 풀링하여 최신 정보를 갱신한다

Task 수행시 장애 발생시?

만약 job을 수행하는데 장애가 발생한다면 JobTracker가 장애가 발생한 해당 서버가 처리해야할 데이터를 가지고 있는 다른 TaskTracker에게 Job을 다시 실행하도록 명령을 던진다

이러한 job에 대한 장애발생도 하둡 내부적으로 알아서 처리가 된다

Job Completion

JobTracker는 하나의 Job에 대한 마지막 Task가 완료되었을 경우 상태를 성공으로 변경한다

클라이언트는 상태를 검사하고 사용자에게 알려주기 위한 메시지를 출력한다

  • waitForCompletion 메소드가 종료되고, job 통계와 카운터가 콘솔로 출력

callBack으로 HTTP Job통지를 받고자하면 job.end.notification.url을 설정하면 된다

번외

하둡의 데이터 유실 상황

하둡이 매우 busy한 상황인데 카프카와 같은 써드파티앱에서 하둡에 뭔가를 쓸려고 한다면 하둡은 거절을 하고 그동안 카프카에 적재된 데이터들에 대해서 보관 싸이클이 1시간이라 할 때, 이 시간 이후에도 하둡이 busy하다면 해당 파일들이 적재되지 못하고 유실되는 상황이 나올 수 있다

profile
내꿈은 숲속의잠자는공주

0개의 댓글

Powered by GraphCDN, the GraphQL CDN