앞선 예제에서 결과 데이터는 파일에 행으로 나누어져 있는데, 정렬이 되어있지 않다.
정렬되지 않은 이유는 맵리듀스에서 사용한 키를 연월을 단순히 붙인 텍스트로 인식했기 때문이다.
이것을 보조 정렬을 이용해서 월의 오름차순으로 정렬해보자.
보조정렬의 구현 순서
package de.example.hadoop.mapreduce.secondarysort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class DateKey implements WritableComparable<DateKey> {
private String year;
private Integer month;
public DateKey() {
}
public DateKey(String year, Integer date) {
this.year = year;
this.month = date;
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public Integer getMonth() {
return month;
}
public void setMonth(Integer month) {
this.month = month;
}
@Override
public String toString() {
return (new StringBuilder()).append(year).append(",").append(month)
.toString();
}
@Override
public void readFields(DataInput in) throws IOException {
year = WritableUtils.readString(in);
month = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, year);
out.writeInt(month);
}
@Override
public int compareTo(DateKey key) {
int result = year.compareTo(key.year);
if (0 == result) {
result = month.compareTo(key.month);
}
return result;
}
}
Composite key 의 정렬 순서를 부여하기 위한 Comparator 구현.
package de.example.hadoop.mapreduce.secondarysort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class DateKeyComparator extends WritableComparator {
protected DateKeyComparator() {
super(DateKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
//복합키 클래스 캐스팅
DateKey k1 = (DateKey) w1;
DateKey k2 = (DateKey) w2;
//연도 비교
int cmp = k1.getYear().compareTo(k2.getYear());
if (cmp != 0) {
return cmp;
}
//월 비교
return k1.getMonth() == k2.getMonth() ? 0 : (k1.getMonth() < k2
.getMonth() ? -1 : 1);
}
}
파티셔너는 맵 태스크의 출력 데이터를 리듀스 태스크의 입력 데이터로 보낼지 결정한다.
이 파티셔닝된 데이터는 맵 태스크의 출력 데이터의 키 값에 따라 정렬된다.
package de.example.hadoop.mapreduce.secondarysort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class GroupKeyPartitioner extends Partitioner<DateKey, IntWritable> {
@Override
public int getPartition(DateKey key, IntWritable val, int numPartitions) {
int hash = key.getYear().hashCode();
int partition = hash % numPartitions;
return partition;
}
}
Reducer는 groupkey comparator를 통해 하나의 Reducer로 모으고 싶은 데이터를 정의할 수 있다.
package de.example.hadoop.mapreduce.secondarysort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupKeyComparator extends WritableComparator {
protected GroupKeyComparator() {
super(DateKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
DateKey k1 = (DateKey) w1;
DateKey k2 = (DateKey) w2;
//연도값 비교
return k1.getYear().compareTo(k2.getYear());
}
}
package de.example.hadoop.mapreduce.secondarysort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import de.example.hadoop.mapreduce.common.AirlinePerformanceParser;
import de.example.hadoop.mapreduce.common.DelayCounters;
import java.io.IOException;
public class DelayCountMapperWithDateKey extends
Mapper<LongWritable, Text, DateKey, IntWritable> {
// map 출력값
private final static IntWritable outputValue = new IntWritable(1);
// map 출력키
private DateKey outputKey = new DateKey();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
// 출발 지연 데이터 출력
if (parser.isDepartureDelayAvailable()) {
if (parser.getDepartureDelayTime() > 0) {
// 출력키 설정
outputKey.setYear("D," + parser.getYear());
outputKey.setMonth(parser.getMonth());
// 출력 데이터 생성
context.write(outputKey, outputValue);
} else if (parser.getDepartureDelayTime() == 0) {
context.getCounter(DelayCounters.scheduled_departure).increment(1);
} else if (parser.getDepartureDelayTime() < 0) {
context.getCounter(DelayCounters.early_departure).increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_departure).increment(1);
}
// 도착 지연 데이터 출력
if (parser.isArriveDelayAvailable()) {
if (parser.getArriveDelayTime() > 0) {
// 출력키 설정
outputKey.setYear("A," + parser.getYear());
outputKey.setMonth(parser.getMonth());
// 출력 데이터 생성
context.write(outputKey, outputValue);
} else if (parser.getArriveDelayTime() == 0) {
context.getCounter(
DelayCounters.scheduled_arrival).increment(1);
} else if (parser.getArriveDelayTime() < 0) {
context.getCounter(DelayCounters.early_arrival).increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_arrival).increment(1);
}
}
}
Composite Key(DateKey)를 입력과 출력 데이터의 key 로 사용한다.
package de.example.hadoop.mapreduce.secondarysort;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class DelayCountReducerWithDateKey extends
Reducer<DateKey, IntWritable, DateKey, IntWritable> {
private MultipleOutputs<DateKey, IntWritable> mos;
// reduce 출력키
private DateKey outputKey = new DateKey();
// reduce 출력값
private IntWritable result = new IntWritable();
@Override
public void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<DateKey, IntWritable>(context);
}
public void reduce(DateKey key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 콤마 구분자 분리
String[] colums = key.getYear().split(",");
int sum = 0;
Integer bMonth = key.getMonth();
//월을 구분해서 합산하는 로직
if (colums[0].equals("D")) {
for (IntWritable value : values) {
if (bMonth != key.getMonth()) {
result.set(sum);
outputKey.setYear(key.getYear().substring(2));
outputKey.setMonth(bMonth);
mos.write("departure", outputKey, result);
sum = 0;
}
sum += value.get();
bMonth = key.getMonth();
}
if (key.getMonth() == bMonth) {
outputKey.setYear(key.getYear().substring(2));
outputKey.setMonth(key.getMonth());
result.set(sum);
mos.write("departure", outputKey, result);
}
} else {
for (IntWritable value : values) {
if (bMonth != key.getMonth()) {
result.set(sum);
outputKey.setYear(key.getYear().substring(2));
outputKey.setMonth(bMonth);
mos.write("arrival", outputKey, result);
sum = 0;
}
sum += value.get();
bMonth = key.getMonth();
}
if (key.getMonth() == bMonth) {
outputKey.setYear(key.getYear().substring(2));
outputKey.setMonth(key.getMonth());
result.set(sum);
mos.write("arrival", outputKey, result);
}
}
}
@Override
public void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
package de.example.hadoop.mapreduce.secondarysort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCountWithDateKeyApp extends Configured implements Tool {
public static void main(String[] args) throws Exception {
// Tool 인터페이스 실행
int res = ToolRunner.run(new Configuration(), new DelayCountWithDateKeyApp(), args);
System.out.println("MR-Job Result:" + res);
}
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
// 입출력 데이터 경로 확인
if (otherArgs.length != 2) {
System.err.println("Usage: DelayCountWithDateKey <in> <out>");
System.exit(2);
}
// Job 이름 설정
Job job = Job.getInstance(getConf(), "DelayCountWithDateKey");
// 입출력 데이터 경로 설정
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// Job 클래스 설정
job.setJarByClass(DelayCountWithDateKeyApp.class);
job.setPartitionerClass(GroupKeyPartitioner.class);
job.setGroupingComparatorClass(GroupKeyComparator.class);
job.setSortComparatorClass(DateKeyComparator.class);
// Mapper 클래스 설정
job.setMapperClass(DelayCountMapperWithDateKey.class);
// Reducer 클래스 설정
job.setReducerClass(DelayCountReducerWithDateKey.class);
// Map에 대한 출력키 및 출력값 유형 설정
job.setMapOutputKeyClass(DateKey.class);
job.setMapOutputValueClass(IntWritable.class);
// 입출력 데이터 포맷 설정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 최종 output 출력키 및 출력값 유형 설정
job.setOutputKeyClass(DateKey.class);
job.setOutputValueClass(IntWritable.class);
// MultipleOutputs 설정
MultipleOutputs.addNamedOutput(job, "departure",
TextOutputFormat.class, DateKey.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class,
DateKey.class, IntWritable.class);
job.waitForCompletion(true);
return 0;
}
}
빌드
./gradlew build
이동
scp -i $your_key $your_jar_path hadoop@$your_emr_primary_ec2_public_dns:~/example/jars/.
실행
hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.secondarysort.DelayCountWithDateKeyApp /data/input /data/output/delay_count_secondary_sort
연, 월로 잘 정렬된 것을 확인할 수 있다.
1990,1 227276
1990,2 220047
1990,3 242945
1990,4 224690
1990,5 233642
1990,6 231164
1990,7 229526
1990,8 251039
1990,9 204097
1990,10 230413
1990,11 201826
1990,12 244405
1995,1 244982
1995,2 204094
1995,3 229787
1995,4 212546
1995,5 219347
1995,6 239005
1995,7 217536
1995,8 223555
1995,9 182936
1995,10 215418
1995,11 215924
1995,12 254740
2002,1 176118
2002,2 148683
2002,3 199497
2002,4 172803
2002,5 171711
2002,6 186988
2002,7 189060
2002,8 174227
2002,9 123310
2002,10 165998
2002,11 146106
2002,12 179807
2007,1 286334
2007,2 284152
2007,3 293360
2007,4 273055
2007,5 275332
2007,6 326446
2007,7 326559
2007,8 317197
2007,9 225751
2007,10 270098
2007,11 242722
2007,12 332449
부분정렬은 Mapper의 출력 데이터를 SequenceFile로 저장하고, SequenceFile을 MapFile로 변경해서 데이터를 검색하는 방법이다.
맵 태스크가 실행될 때 파티셔너는 매퍼의 출력 데이터가 어떤 리듀스 태스크로 전달될지 결정하고, 파티셔닝된 데이터는 key 에 따라서 정렬된다.
부분정렬을 하기 위해 파티셔닝된 출력 데이터를 맵파일로 변경한다.
특정 키에 대한 데이터를 검색할 경우, 해당 키에 대한 데이터가 저장된 맵파일에서 데이터를 조회한다.
실습은 항공 지연 통계 데이터를 항공 운항 거리 순서대로 정렬하는 부분 정렬을 한다.
mapreduce 프로그램
검색 프로그램
SequenceFile은 Mapper 이후에 파티셔닝된 파일이다.
따라서 Mapper 만 구현하고, reducer를 구현하지 않는다.
Output 은 SequenceFile 이어야한다.
package de.example.hadoop.mapreduce.partialsort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import de.example.hadoop.mapreduce.common.AirlinePerformanceParser;
public class SequenceFileCreator extends Configured implements Tool {
// output 에 있는 IntWritable 에 distance 값을 키로 놓을 것. 그 뒤 Text 에는 전체 데이터를 다 놓을 것.
static class DistanceMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private IntWritable outputKey = new IntWritable();
public void map(LongWritable key, Text value,
Mapper<LongWritable, Text, IntWritable, Text>.Context mapper)
throws IOException, InterruptedException {
try {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
if (parser.isDistanceAvailable()) {
outputKey.set(parser.getDistance());
mapper.write(outputKey, value);
}
} catch (ArrayIndexOutOfBoundsException ae) {
outputKey.set(0);
mapper.write(outputKey, value);
ae.printStackTrace();
} catch (Exception e) {
outputKey.set(0);
mapper.write(outputKey, value);
e.printStackTrace();
}
}
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "SequenceFileCreator");
job.setJarByClass(SequenceFileCreator.class);
job.setMapperClass(DistanceMapper.class);
job.setNumReduceTasks(0);
// 입출력 경로 설정
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 출력 포맷을 SequenceFile로 설정
// 항상 SequenceFileOutput이라고 해야 됨
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 출력 키를 항공 운항 거리(IntWritable)로 설정
job.setOutputKeyClass(IntWritable.class);
// 출력 값을 라인(Text)으로 설정
// mapper의 output key value랑 맞춰줘야 함
job.setOutputValueClass(Text.class);
// 시퀀스 파일 압축 포맷 설정
// 전체 데이터 1.5GB 정도 될 텐데 HDFS put 할 때 오랜 시간이 걸린다. 압축을 하면 압축하는 데 cpu 자원을 쓰긴 쓰지만 IO 작업이 빨라짐.
SequenceFileOutputFormat.setCompressOutput(job, true);
// Ggip은 압축 효율이 아주 좋은 편은 아니지만. 대신 압축 해제 시간이 길게 걸리지 않음
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SequenceFileCreator(), args);
System.out.println("MR-Job Result:" + res);
}
}
SequenceFileOutputFormat
로 한다.빌드하고, hadoop node에 배포한 뒤 다음 커맨드를 실행한다.
hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.partialsort.SequenceFileCreator /data/input/2008.csv /data/output/2008_sequencefile
hdfs dfs -ls /data/output/2008_sequencefile
gzip 으로 압축했으므로 -text 로 확인해야한다.
hdfs dfs -text /data/output/2008_sequencefile/part-m-00000 | head -10
운항거리가 같은 라인들이 모여있는 것을 확인할 수 있다. 정렬 자체는 입력데이터의 키를 사용하는데, 라인넘버를 기준으로 출력된 것이다.
맵파일은 키값을 검색할 수 있게 index와 함께 정렬된 sequence file 이다.
맵파일은 물리적으로 index 가 저장된 index 파일과 데이터 내용이 저장되어있는 data file 로 구성된다.
HDFS에 저장한 sequence 파일을 이용해 맵파일로 생성할 수 있다.
package de.example.hadoop.mapreduce.partialsort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MapFileCreator extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MapFileCreator(), args);
System.out.println("MR-Job Result:" + res);
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "MapFileCreator");
job.setJarByClass(MapFileCreator.class);
// 입출력 경로 설정
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 입력 데이터를 SequenceFile로 설정
job.setInputFormatClass(SequenceFileInputFormat.class);
// 출력 데이터를 MapFile로 설정
job.setOutputFormatClass(MapFileOutputFormat.class);
// 출력 데이터의 키를 항공 운항 거리(IntWrtiable)로 설정
job.setOutputKeyClass(IntWritable.class);
// 시퀀스 파일 압축 포맷 설정
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
return job.waitForCompletion(true) ? 0 : 1;
}
}
org.apache.hadoop.mapreduce.Mapper
, org.apache.hadoop.mapreduce.Reducer
가 설정된다.빌드하고 jar 를 primary 노드로 옮긴뒤 실행한다.
hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.partialsort.MapFileCreator /data/output/2008_sequencefile /data/output/2008_mapfile
hdfs dfs -ls /data/output/2008_mapfile
hdfs dfs -ls /data/output/2008_mapfile/part-r-00000
hdfs dfs -text /data/output/2008_mapfile/part-r-00000/index | head -10
hdfs dfs -text /data/output/2008_mapfile/part-r-00000/data | head -10
MapFile에서 검색하는 프로그램이다.
맵리듀스 프로그램이 아니라 원하는 인덱스가 파티션 번호로 있으므로, 파티션 번호로 맵파일에 접근해서 데이터를 읽는다.
package de.example.hadoop.mapreduce.partialsort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapFile.Reader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SearchValueList extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SearchValueList(), args);
System.out.println("MR-Job Result:" + res);
}
public int run(String[] args) throws Exception {
// 읽을 데이터 경로
Path path = new Path(args[0]);
// MapFile 조회
Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
// 검색 키를 저장할 객체를 선언
IntWritable key = new IntWritable();
key.set(Integer.parseInt(args[1]));
// 검색 값을 저장할 객체를 선언
Text value = new Text();
// 파티셔너를 이용해 검색 키가 저장된 MapFile 조회
Partitioner<IntWritable, Text> partitioner = new HashPartitioner<>();
Reader reader = readers[partitioner.getPartition(key, value, readers.length)];
// 검색 결과 확인
Writable entry = reader.get(key, value);
if (entry == null) {
System.out.println("The requested key was not found.");
}
// MapFile을 순회하며 키와 값을 출력
IntWritable nextKey = new IntWritable();
do {
System.out.println(value);
} while (reader.next(nextKey, value) && key.equals(nextKey));
return 0;
}
}
// MapFile 조회
Reader[] readers = MapFileOutputFormat.getReaders(path, getConf());
// 파티셔너를 이용해 검색 키가 저장된 MapFile 조회
Partitioner<IntWritable, Text> partitioner = new HashPartitioner<>();
Reader reader = readers[partitioner.getPartition(key, value, readers.length)];
// 검색 결과 확인
Writable entry = reader.get(key, value);
if (entry == null) {
System.out.println("The requested key was not found.");
}
앞서 찾은 파티션의 Reader에서 실제 데이터를 검색하는 단계이다. 즉, "Partition 1에서 키 30을 가진 데이터를 가져와 줘"라고 요청하는 과정이다. 만약 해당 키가 존재하지 않으면 null이 반환되며, 그 경우 "The requested key was not found." 메시지를 출력한다.
빌드한 뒤, primary node 에 jar 를 옮기고 다음 명령을 수행한다.
hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.partialsort.SearchValueList /data/output/2008_mapfile 30
결과 - distance 가 30인 레코드가 빠르게 검색되었다.
2008,1,8,2,816,805,907,855,B6,9002,N236JB,51,50,19,12,11,JFK,HPN,30,5,27,0,,0,NA,NA,NA,NA,NA
2008,1,6,7,2226,2200,2301,2240,CO,348,N56859,35,40,11,21,26,SJC,SFO,30,7,17,0,,0,0,0,0,0,21
부분정렬을 위해서는 Reducer를 타지 않고 우선 Mapper로 데이터를 변환하고 변환된 파일을 맵 파일(인덱스를 가진 파일)로 만들면 조회하는 쪽에서 그 맵 파일 형식에 따라서 그 데이터가 있는 Partition을 찾고 그 partition에서 데이터를 찾는다.
모든 맵리듀스 Job은 입력 데이터의 키를 기준으로 정렬하기 떄문에 하나의 파티션으로 쉽게 데이터를 정렬할 수 있다.
하지만 데이터가 많은 경우에는 경우에는 문제가 발생한다.
리듀스 태스크를 실행하지 않는 노드는 놀고, 리듀스 태스크를 실행하는 노드에만 부하가 집중된다.
전체 정렬을 하려면 다음 순서로 진행한다.
하둡은 전체 정렬에 활용할 수 있는 org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner
로 파티션 개수와 파티션에 저장할 데이터 범위를 정할 수 있다.
하둡은 파티션에 키를 고르게 분배할 수 있도록 org.apache.hadoop.mapreduce.lib.partition.InputSampler
를 제공한다. 입력 데이터에서 특정 개수의 데이터를 추출해서 키와 데이터의 수를 샘플링 해서 데이터 분포를 작성한다. TotalOrderPartitioner
는 이 샘플링 결과로 파티션을 생성하고 맵리듀스 잡은 TotalOrderPartitioner
가 생성한 파티션에 출력 데이터를 넣는다.
package de.example.hadoop.mapreduce.totalsort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import de.example.hadoop.mapreduce.common.AirlinePerformanceParser;
public class TotalOrderSortByDistance extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TotalOrderSortByDistance(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
//input and output paths passed from cli
Path inputPath = new Path(args[0]); // 원본 데이터 읽을 저장소
Path stageOutputPath = new Path(args[1]); // 첫 번째로 Map과정을 통해서 만들 Sequence file 저장될 위치
Path partitionerFile = new Path(args[2]); // Sampling결과 partition정보 저장될 위치
Path sortedOutputPath = new Path(args[3]); // 전체 정렬된 결과를 받을 네 번째 파라미터로 받도록 해놓기.
// 부분정렬 때 했던 Map 작업과 같다
int sequenceResult = run4makingSequenceFile(getConf(), inputPath, stageOutputPath);
System.out.println("Job for sequence file is completed");
if (sequenceResult != 0) {
System.err.println("Fail to make sequence file.");
return sequenceResult;
} else {
int sortResult = run4totalSort(getConf(), stageOutputPath, partitionerFile, sortedOutputPath);
System.out.println("Job for sort file is completed");
if (sortResult != 0) {
System.err.println("Fail to sort.");
}
return sortResult;
}
}
int run4makingSequenceFile(Configuration conf, Path input, Path output) throws Exception {
Job job = Job.getInstance(conf, "Extract sequence file by distance");
job.setJarByClass(TotalOrderSortByDistance.class);
//set mapper to extract value
//first job has no reducer
job.setMapperClass(DistanceMapper.class);
job.setNumReduceTasks(0);
//set key and value classes
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// 입출력 경로, format 설정
FileInputFormat.addInputPath(job, input);
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job, output);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 시퀀스 파일 압축 포맷 설정
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
int code = job.waitForCompletion(true) ? 0 : 1;
return code;
}
int run4totalSort(Configuration conf, Path input, Path partitioner, Path output) throws Exception {
Job job = Job.getInstance(conf, "Sort sequence file by distance");
job.setJarByClass(TotalOrderSortByDistance.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
SequenceFileInputFormat.setInputPaths(job, input);
SequenceFileOutputFormat.setOutputPath(job, output);
// 시퀀스 파일 압축 포맷 설정
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitioner); // partitioner 파티션 파일이 위치한 경로.
InputSampler.Sampler<IntWritable, Text> sampler = new RandomSampler<>(01., 1000, 10); // input split을 10개로, 0.1 %의 확률로 1000 건의 데이터를 sampling하겠다는 것.
InputSampler.writePartitionFile(job, sampler);
return job.waitForCompletion(true) ? 0 : 2;
}
static class DistanceMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private IntWritable outputKey = new IntWritable();
public void map(LongWritable key, Text value,
Mapper<LongWritable, Text, IntWritable, Text>.Context mapper)
throws IOException, InterruptedException {
try {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
if (parser.isDistanceAvailable()) {
outputKey.set(parser.getDistance());
mapper.write(outputKey, value);
}
} catch (ArrayIndexOutOfBoundsException ae) {
outputKey.set(0);
mapper.write(outputKey, value);
ae.printStackTrace();
} catch (Exception e) {
outputKey.set(0);
mapper.write(outputKey, value);
e.printStackTrace();
}
}
}
}
run4makingSequenceFile
의 내용은 앞서 수행한 3.2.1 [SequenceFileCreator.java](http://SequenceFileCreator.java)
와 동일하다.빌드 후 jar 를 primary node 에 위치시키고, 다음 명령어로 실행한다.
hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.totalsort.TotalOrderSortByDistance /data/input/1987.csv /data/output/1987_totalsort/1987_sequence /data/output/1987_totalsort/1987_partition /data/output/1987_totalsort/1987_totalsort
hdfs dfs -ls /data/output/1987_totalsort/1987_sequence
hdfs dfs -ls /data/output/1987_totalsort/1987_parition
hdfs dfs -ls /data/output/1987_totalsort/1987_totalsort
SequenceFile 확인
hdfs dfs -text /data/output/1987_totalsort/1987_sequence/part-m-00000 | head -30
파티션 확인
hdfs dfs -text /data/output/1987_totalsort/1987_parition
전체 정렬 확인
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00000 | head -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00000 | tail -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00001 | head -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00001 | tail -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00002 | head -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00002 | tail -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00003 | head -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00003 | tail -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00004 | head -10
hdfs dfs -text /data/output/1987_totalsort/1987_totalsort/part-r-00004 | tail -10