Hadoop - MapReduce

jaewonnow_·2025년 11월 3일

DataEngineering

목록 보기
2/19

MapReduce

MapReduce provides a Programmed Model and an associated implementation for processing and generating large data sets.

→ Model : 프로그램을 짜는게 아니라 짜여있는 형태.

  • HDFS, YARN 위에서 동작하는 데이터 분산 처리 소프트웨어
  • 데이터는 각 단계가 끝날 때마다 디스크와 CPU & 메모리 사이를 오가기 때문에 시간이 오래 걸린다.
  • 시간이 걸리는 배치 처리엔 적합하나, 짧은 시간이 걸리는 몇 번의 애드훅 처리엔 불리하다.

대체로 여러 머신에서 데이터 처리를 하려면 여러 가지 문제를 고려해야 한다. 여러 머신 중 한 머신이 동작하지 않는 문제 , 동시 처리를 위해 각 프로세스 스케줄링 등 .

-> HDFS 는 이 모든 것을 고려한 기술 , 사용자는 이 과정을 단순한 Map 과 Reduce 과정으로 처리할 수 있다.

대량의 데이터를 배치 형식으로 단계를 나누어 처리한다.

Map

데이터를 작은 단위로 나누어 여러 작업자(worker)가 병렬로 처리할 수 있게 만드는 단계

데이터를 쪼개서 key-value로 변환

0 . Split ( Map 단계 추가설명)

  • 입력 분할 (Input Splits): MapReduce 프레임워크는 이 HDFS 블록들을 참조하여 입력 분할(Input Split)이라는 블록 정보 기반으로 논리적인 작업 단위를 추가적으로 만든다.

-> 저장 단위 (블록) 과 연산 단위 (Input Split)을 구분해야 함

-> 저장 계층 Master : NamerNode / Slave : DataNode

-> 연산 계층 Master : Jobtracker / Slave : Taskworker

1. Map Task

  • Input Split(HDFS 블록을 기반으로 하는 논리적 단위)당 하나의 Map 태스크를 YARN(또는 리소스 매니저가)이 생성하고 DataNode에 할당한다, 하나의 Map 태스크는 하나의 Input Split을 처리합니다.
  • Map 태스크가 시작되면, 내부적으로 RecordReader라는 구성 요소를 실행.

1-1. Map Task - RecordReader

RecordReader: Map Task 내부에 위치한 구성 요소

  • RecordReader는 할당받은 Input Split의 바이너리 데이터를 읽으면서, 정의된 InputFormat (예: TextInputFormat은 줄 바꿈 문자를 기준으로 분할)에 따라 논리적인 레코드 (예: 텍스트 파일의 한 줄 Key : 줄번호 , Value : 한줄의 내) 단위로 분할.

  • 각 분할은 <Key, Value> 쌍으로 변환되어 Worker Node map () 함수에 전달

1-2. Mapper

  • 사용자가 작성한 map ()함수는 데이터를 가공하거나 필요한 정보를 추출
  • Map ()의 결과물은 로컬에 저장됨(기본적으로 클라우드에 저장 X , 리턴 X)
  • Worker 노드가 죽으면? Map과정 다시실행
  • 프레임워크 내부 버퍼(buffer)에 “emit(쓰기)” 형태로 전달

1-3. Shuffle & Sort 단계

  • Map 단계에서 나온 중간 결과를 키(Key) 기준으로 정렬
  • 이 과정은 Hadoop 내부에서 자동 처리됨
  • Map Task 의 결과값 -> Reduce Task의 입력값 단계로 넘김
  • Shuffle & Sort 단계의 “1차 처리(정렬·스필)”까지 수행한 후,
    최종적으로는 네트워크를 통해 Reduce Task가 있는 Node로 전송

2. Reduce Task

  • 모든 Map 이 끝나면 Reduce 실행

  • 정렬된 데이터-> (key별로 value 리스트가 모인 Data Collection) 를 받아 Mapper 간 같은 키의 값들을 병합(merge)

  • Reduce 함수가 최종 집계나 요약 결과를 생성

Map -(Shuffle) -> Reduce 구조

Reduce

Map 결과를 모아서 같은 key를 가진 value들을 합치거나 요약하는 단계입니다.

주로 집계(aggregation), 합(sum), 평균(mean), 정렬(sort) 등에 사용

Partitioning

MapReduce 과정 중에 생성하는 중간 파일의 중간 키 값이 같은 레코드들은 같은 리듀스 태스크를 실행하는 노드에 입력해야 한다. 이를 위해서 파티셔닝 함수를 정의할 수 있는데, 일반적으로 Hash mod R 연산을 거친다.

이때 파티셔닝 함수를 어떻게 작성하는가에 따라서 네트워크 트래픽 오버헤드를 조절 가능하다.
맵 태스크와 리듀스 태스크가 전부 다른 노드 혹은 다른 노드에서 실행한다고 가정하면 네트워크 트래픽이 증가하고 , 성능이 저하된다.

Handling Worker Failure in MapReduce

  • Master 가 주기적으로 worker 에게 Ping 을 보냄 - (Health Check)
  • 주기적인 헬스체크 → Resilient (회복 탄력성) 있음
  • 추가적으로 노드 머신이 다운되도 계속해서 그냥 재실행 → 하둡의 핵심

1️⃣ Hadoop 1.x (고전 MapReduce) 구조

계층구성요소역할구분
Master Node🧠 NameNodeHDFS의 메타데이터 관리 (파일명 ↔ 블록 위치, 파일시스템의 “두뇌”)저장 계층
⚙️ JobTrackerMapReduce 작업(Job) 전체 관리 (Task 스케줄링, 실패 감시, 재실행 등)연산 계층
Slave Node💾 DataNode실제 데이터 블록 저장저장 계층
🧩 TaskTrackerJobTracker의 명령을 받아 Map / Reduce Task 실행연산 계층

MapReduce API

Hadoop 에서 Mapper , Reducer 는 클래스로 선언된다

Mapper 클래스 (Mapper<K1, V1, K2, V2>)

Mapper는 입력 데이터를 가공해서 (중간 key, 중간 value) 쌍을 만드는 단계입니다.

public class MyMapper extends Mapper<K1, V1, K2, V2> {

    protected void setup(Context context) throws IOException, InterruptedException {
        // ① 맵 태스크 시작 시 한 번만 실행
    }

    protected void map(K1 key, V1 value, Context context)
            throws IOException, InterruptedException {
        // ② InputSplit의 각 (key, value) 쌍마다 실행
        //    context.write(K2, V2)로 결과 emit
    }

    protected void cleanup(Context context)
            throws IOException, InterruptedException {
        // ③ 맵 태스크 종료 시 한 번 실행
    }
}
메서드호출 시점역할
setup()Mapper Task 시작 시 1회 호출초기화 작업 (예: 파일 열기, 변수 준비, DB 연결 등)
map()입력 데이터의 각 (key, value) 쌍마다 호출핵심 로직 수행. 데이터 필터링, 파싱, 변환, context.write()로 결과 출력
cleanup()Mapper Task 종료 시 1회 호출마무리 작업 (예: 열린 리소스 닫기, 누적 데이터 flush 등)

Reducer 클래스 (Reducer<K2, V2, K3, V3>)

Reducer는 Mapper들이 만든 중간 결과를 Key별로 모아
최종 (key, value) 결과를 생성합니다.

public class MyReducer extends Reducer<K2, V2, K3, V3> {

    protected void setup(Context context)
            throws IOException, InterruptedException {
        // ① Reduce 태스크 시작 시 1회 실행
    }

    protected void reduce(K2 key, Iterable<V2> values, Context context)
            throws IOException, InterruptedException {
        // ② 동일한 key에 대해 한 번 호출
        //    values는 같은 key를 가진 value들의 리스트
    }

    protected void cleanup(Context context)
            throws IOException, InterruptedException {
        // ③ Reduce 태스크 종료 시 1회 실행
    }
}
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) sum += val.get();
    context.write(key, new IntWritable(sum));
}

Combiner 클래스 (옵션)

Combiner는 Reducer와 거의 같은 형태이지만,
Mapper의 로컬 단계에서 미리 부분 집계를 수행한다.

즉, 네트워크로 Shuffle하기 전에 Key별로 일부 결과를 합쳐서
데이터 전송량을 줄이는 역할을 합니다.

### Partitioner<K, V>

public class MyPartitioner extends Partitioner<K, V> {
    @Override
    public int getPartition(K key, V value, int numPartitions) {
        // key별로 특정 Reducer를 선택 (0 ~ numPartitions-1)
        return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}
profile
0 to 100 Data Engineer

0개의 댓글