MapReduce provides a Programmed Model and an associated implementation for processing and generating large data sets.
→ Model : 프로그램을 짜는게 아니라 짜여있는 형태.
대체로 여러 머신에서 데이터 처리를 하려면 여러 가지 문제를 고려해야 한다. 여러 머신 중 한 머신이 동작하지 않는 문제 , 동시 처리를 위해 각 프로세스 스케줄링 등 .
-> HDFS 는 이 모든 것을 고려한 기술 , 사용자는 이 과정을 단순한 Map 과 Reduce 과정으로 처리할 수 있다.
대량의 데이터를 배치 형식으로 단계를 나누어 처리한다.
데이터를 작은 단위로 나누어 여러 작업자(worker)가 병렬로 처리할 수 있게 만드는 단계
데이터를 쪼개서 key-value로 변환
-> 저장 단위 (블록) 과 연산 단위 (Input Split)을 구분해야 함
-> 저장 계층 Master : NamerNode / Slave : DataNode
-> 연산 계층 Master : Jobtracker / Slave : Taskworker
RecordReader라는 구성 요소를 실행.RecordReader: Map Task 내부에 위치한 구성 요소
RecordReader는 할당받은 Input Split의 바이너리 데이터를 읽으면서, 정의된 InputFormat (예: TextInputFormat은 줄 바꿈 문자를 기준으로 분할)에 따라 논리적인 레코드 (예: 텍스트 파일의 한 줄 Key : 줄번호 , Value : 한줄의 내) 단위로 분할.
각 분할은 <Key, Value> 쌍으로 변환되어 Worker Node map () 함수에 전달
map ()함수는 데이터를 가공하거나 필요한 정보를 추출모든 Map 이 끝나면 Reduce 실행
정렬된 데이터-> (key별로 value 리스트가 모인 Data Collection) 를 받아 Mapper 간 같은 키의 값들을 병합(merge)
Reduce 함수가 최종 집계나 요약 결과를 생성
Map -(Shuffle) -> Reduce 구조
Map 결과를 모아서 같은 key를 가진 value들을 합치거나 요약하는 단계입니다.
주로 집계(aggregation), 합(sum), 평균(mean), 정렬(sort) 등에 사용
MapReduce 과정 중에 생성하는 중간 파일의 중간 키 값이 같은 레코드들은 같은 리듀스 태스크를 실행하는 노드에 입력해야 한다. 이를 위해서 파티셔닝 함수를 정의할 수 있는데, 일반적으로 Hash mod R 연산을 거친다.
이때 파티셔닝 함수를 어떻게 작성하는가에 따라서 네트워크 트래픽 오버헤드를 조절 가능하다.
맵 태스크와 리듀스 태스크가 전부 다른 노드 혹은 다른 노드에서 실행한다고 가정하면 네트워크 트래픽이 증가하고 , 성능이 저하된다.
| 계층 | 구성요소 | 역할 | 구분 |
|---|---|---|---|
| Master Node | 🧠 NameNode | HDFS의 메타데이터 관리 (파일명 ↔ 블록 위치, 파일시스템의 “두뇌”) | 저장 계층 |
| ⚙️ JobTracker | MapReduce 작업(Job) 전체 관리 (Task 스케줄링, 실패 감시, 재실행 등) | 연산 계층 | |
| Slave Node | 💾 DataNode | 실제 데이터 블록 저장 | 저장 계층 |
| 🧩 TaskTracker | JobTracker의 명령을 받아 Map / Reduce Task 실행 | 연산 계층 |
Hadoop 에서 Mapper , Reducer 는 클래스로 선언된다
Mapper는 입력 데이터를 가공해서 (중간 key, 중간 value) 쌍을 만드는 단계입니다.
public class MyMapper extends Mapper<K1, V1, K2, V2> {
protected void setup(Context context) throws IOException, InterruptedException {
// ① 맵 태스크 시작 시 한 번만 실행
}
protected void map(K1 key, V1 value, Context context)
throws IOException, InterruptedException {
// ② InputSplit의 각 (key, value) 쌍마다 실행
// context.write(K2, V2)로 결과 emit
}
protected void cleanup(Context context)
throws IOException, InterruptedException {
// ③ 맵 태스크 종료 시 한 번 실행
}
}
| 메서드 | 호출 시점 | 역할 |
|---|---|---|
setup() | Mapper Task 시작 시 1회 호출 | 초기화 작업 (예: 파일 열기, 변수 준비, DB 연결 등) |
map() | 입력 데이터의 각 (key, value) 쌍마다 호출 | 핵심 로직 수행. 데이터 필터링, 파싱, 변환, context.write()로 결과 출력 |
cleanup() | Mapper Task 종료 시 1회 호출 | 마무리 작업 (예: 열린 리소스 닫기, 누적 데이터 flush 등) |
Reducer는 Mapper들이 만든 중간 결과를 Key별로 모아
최종 (key, value) 결과를 생성합니다.
public class MyReducer extends Reducer<K2, V2, K3, V3> {
protected void setup(Context context)
throws IOException, InterruptedException {
// ① Reduce 태스크 시작 시 1회 실행
}
protected void reduce(K2 key, Iterable<V2> values, Context context)
throws IOException, InterruptedException {
// ② 동일한 key에 대해 한 번 호출
// values는 같은 key를 가진 value들의 리스트
}
protected void cleanup(Context context)
throws IOException, InterruptedException {
// ③ Reduce 태스크 종료 시 1회 실행
}
}
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) sum += val.get();
context.write(key, new IntWritable(sum));
}
Combiner는 Reducer와 거의 같은 형태이지만,
Mapper의 로컬 단계에서 미리 부분 집계를 수행한다.
즉, 네트워크로 Shuffle하기 전에 Key별로 일부 결과를 합쳐서
데이터 전송량을 줄이는 역할을 합니다.
### Partitioner<K, V>
public class MyPartitioner extends Partitioner<K, V> {
@Override
public int getPartition(K key, V value, int numPartitions) {
// key별로 특정 Reducer를 선택 (0 ~ numPartitions-1)
return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}