‘MapReduce: Simplified Data Processing on Large Clusters’ 논문은 2004년에 Jeffrey Dean이라는 구글의 전설적인 개발자가 작성한 논문입니다. 제프 딘으로 불리며 그에 대한 밈이 생겨날 정도로 영향력이 대단합니다. 해당 논문은 2024년 12월 기준으로 24,000 번 이상의 인용이 되었으며 Apache Hadoop 오픈소스의 기반이 되었고 대용량의 분산 데이터 처리 기술의 표준이라고 할 수 있습니다.
’맵리듀스(MapReduce)’는 대용량 데이터 처리를 분산 병렬 컴퓨터에서 처리하기 위한 목적으로 발표한 소프트웨어 프레임워크입니다. 여기서 말하는 대용량의 의미는 페타바이트 수준의 용량을 처리하는 수준입니다. 개발자는 키/값 쌍 집합을 생성하는 Map 함수와 동일한 중간 키와 연결된 모든 중간 값을 병합하는 Reduce 함수를 개발합니다. 시스템은 이 함수를 자동으로 병렬화하여 대규모 상용 시스템 클러스터(Parallelized and executed on a large cluster of commodity machines)에서 실행시킵니다. 따라서 병렬 및 분산 시스템에 대한 경험이 없는 개발자도 대규모 분산 시스템의 리소스를 쉽게 활용할 수 있습니다. MapReduce도 역시 단점이 존재하며 이를 해결하기 위해 Hive, Pig, Apache Spark, Apache Beam 등의 기술이 등장했습니다.
MapReduce는 개발자에게는 친숙한 ‘Map’ 함수와 ‘Reduce’ 함수를 응용한 기술입니다. map-reduce는 함수형 프로그래밍 언어인 LISP에서 기원하였습니다.
가령 특정 문자열의 각 단어 수를 카운팅하는 연산을 하고 싶다고 했을 때, 아래와 같이 표현합니다.
fun wordCount(input: String): Map<String, Int> {
return input
.split("\\s+".toRegex()) // 공백 또는 줄바꿈 기준으로 단어 분리
.map { it to 1 } // 각 단어를 Pair(단어, 1)로 변환
.groupBy({ it.first }, { it.second }) // 단어별로 그룹화하고 값 리스트로 정리
.mapValues { (_, values) -> values.sum() } // 값 리스트를 합산
}
fun main() {
val inputFiles = "Deer Bear River \n Car Car River \n Deer Car Bear"
val result = wordCount(inputFiles)
result.forEach { (word, count) ->
println("$word,$count")
}
}
입력된 각 논리적 ’레코드’에 맵핑(Map) 작업을 적용한 다음 동일한 키를 공유하는 모든 값에 연산(reduce) 작업을 적용하여 추출된 데이터를 적절하게 결합합니다. 개발자는 이 과정에서 map 함수와 reduce 함수를 적절하게 구현하였습니다. MapReduce는 이러한 map, reduce 함수를 개발자가 작성하면 대규모 클러스터에서 분산 병렬 처리를 도와주는 기술이라고 할 수 있습니다.
User Program은 Input Files를 M개의 조각으로 split 합니다. 일반적으로 개별 16MB or 64MB 크기로 나누는데 이는 개발자가 파라미터로 제어할 수 있습니다. 그 이후에 프로그램의 복사본을 fork해서 여러 인스턴스에서 실행시키게 됩니다. User Program는 파라미터와 Map, Reduce 함수가 정의되어있습니다.
Master는 각 작업을 유휴 Worker를 선정하여 각 Map / Reduce 작업을 할당합니다.
Map 워커 중 하나는 분할된 입력 파일을 읽습니다. 입력 데이터에서 key-value 값을 파싱하고 각 쌍을 사용자 정의 Map 함수를 적용합니다. Map 함수에 의해 생성된 key-value 값은 메모리에 버퍼링됩니다.
key-value 쌍은 각 워커의 로컬 디스크에 기록되고 R개의 분할된 지역으로 나누게 됩니다. Master는 버퍼링된 쌍의 로컬 디스크 위치를 기록하며 이를 Reduce 워커에게 전달합니다.
Reduce 워커는 이 위치에 대한 알림을 받고 원격 프로시저 호출을 통해 버퍼링 데이터를 읽습니다. Reduce 워커는 모든 중간 데이터를 읽으면 중간 key에 따라 정렬하여 동일한 key와 결과값을 그룹화합니다. Reduce 워커가 단일한 키만 처리하지 않기 때문에 정렬을 사용해야하며 메모리 문제 등이 있으면 외부 정렬을 사용합니다.
Reduce 워커는 Intermediate 데이터를 정렬하면서 key와 관련된 reduce 함수를 실행합니다. 이를 통해 reduce 파티션에 대해 최종 출력 파일을 전달합니다.
모든 Map 작업과 Reduce 작업이 완료되면 Master는 User Program에게 종료를 알립니다. MapReduce 실행된 결과는 R개(Reduce Worker 당 하나)의 출력 파일로 저장되며 R개의 출력 파일은 하나의 파일로 결합할 필요는 없고 그대로 사용할 수 있습니다.
같은 개념을 다시 Hadoop의 예시로 설명하겠습니다.
https://www.junaideffendi.com/p/everything-you-need-to-know-about
(Distributed File System, No Data Movement)
(병렬성, 내결함성) → input file split, chunk, retru, backup tasks
MapReduce를 custom해서 개선할 수 있는 내용입니다.
위 과정에서 우리가 잘 이해가 안되는 부분이 있습니다. Shuffling인데요. Intermediate key value(중간 데이터) 값을 reduce worker 에게 전달하기 위해서 진행되는 과정입니다.
중복된 키를 찾아주고 적절하게 셔플링을 해줘야 해당하는 reduce worker가 작업을 해줄 수 있습니다.
map을 통해 변환된 key-value는 메모리 버퍼에 저장되고 파티셔닝 작업을 진행합니다. 파티셔닝은 해당 키가 처리되어야하는 reduce task를 기준으로 데이터를 분리합니다. 리듀서의 개수에 따라 적절하게 해시를 해주고 그 값을 sorting해서 local disk에 적재를 하게 되는 과정이 진행됩니다.
또한 이 과정에서 이전에 설명한 Combining 절차가 수행되며 map worker local disk 공간에 적재합니다. master 노드는 이제서야 이 데이터 위치를 취합하여 redouce worker에게 위치를 알려줍니다.
셔플링 과정을 정리하면 다음과 같습니다. [참조](https://blog.firstpenguine.school/31)
0. Mapping: map의 결과물로 intermediate key value가 생성됨
1. Partitioning: 각각의 키 값을 기준으로 어느 리듀서에서 처리되어야 하는지 파티션을 결정함
2. Sorting: 같은 파티션 내의 값들은 키 값을 기준으로 소팅
3. Combining: 파티션 내의 값들에 대해 미리 reduce 작업을 진행하여 데이터의 양을 줄임
4. Spill to locak disk: 처리된 값들을 Mapper 노드의 디스크 공간에 저장
5. Notify Master: 매핑 과정을 완료하였다는 알림과 파티셔닝 정보를 마스터 노드에게 알림
6. Notify Reducers: 마스터 노드는 리듀서 노드들에게 이 정보를 전달
7. Remote Read: 리듀서 노드들은 자신에게 할당된 파티션들만 매퍼들에게서 읽어와서 리듀스 작업을 준비
하지만 혹자는 이 shuffling 과정이 복잡하고 속도가 느리다고 꼬집습니다. MapReduce를 small Map, large Shuffle, small Reduce라고 불러야 한다는 농담이 있다고도 합니다.
HDFS(Hadoop distributed File System): 대용량 데이터를 여러 노드에 분산하여 저장하는 시스템. 데이터의 안정성 및 가용성을 보장하고 사용한다.
YARN 이라는 reousrce 매니저를 추가해서 다른 분산처리 알고리즘을 사용할 수도 있도록 변경
GFS → HDFS로 사용
개발자가 구현할 인터페이스
Batch Processing이 아니라 Stream Processing의 중요성이 높아지면서 Flink를 사용하는 선택지가 생김