public static class TopKMapper extends Mapper<Object, Text, Text, Text> {
// Key값을 기준으로 정렬이 되어있는 맵
private TreeMap<Double, Text> topKMap = new TreeMap<>();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] columns = value.toString().split("\t");
topKMap.put(Double.parseDouble(columns[1]), new Text(columns[0]));
if (topKMap.size() > K) {
topKMap.remove(topKMap.firstKey());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Double k : topKMap.keySet()) {
// 맵의 아웃풋으로 평점이 키로 출력이 되고, 두번째로 영화제목이 출력이 된다
context.write(new Text(k.toString()), topKMap.get(k));
}
}
}
public static class TopKReducer extends Reducer<Text, Text, Text, Text> {
private TreeMap<Double, Text> topKMap = new TreeMap<>();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
topKMap.put(Double.parseDouble(key.toString()), new Text(value));
if (topKMap.size() > K) {
topKMap.remove(topKMap.firstKey());
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 내림차순 키값으로 가져와서 처리
for (Double k : topKMap.descendingKeySet()) {
context.write(topKMap.get(k), new Text(k.toString()));
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "MovieAverageRateTopK First");
job.setJarByClass(MovieAverageRateTopK.class);
job.setReducerClass(MovieRatingJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MovieMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, RatingMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
int returnCode = job.waitForCompletion(true) ? 0 : 1;
// 정상적으로 완료가 됐을 시
if (returnCode == 0) {
Job job2 = Job.getInstance(getConf(), "MovieAverageRateTopK Second" );
job2.setJarByClass(MovieAverageRateTopK.class);
job2.setMapperClass(TopKMapper.class);
job2.setReducerClass(TopKReducer.class);
job2.setNumReduceTasks(1);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2, new Path(args[2]));
FileOutputFormat.setOutputPath(job2, new Path(args[3]));
return job2.waitForCompletion(true) ? 0 : 1;
}
return 1;
}
/** 두번째 MapReduce - Unit Test */
@Test
public void topKMapTest() throws IOException {
// withOutput 위치를 바꿀 시 에러가 뜸 즉, 출력순서가 뒤바뀜
// 작은 순서부터 실행이 되야 하는데 순서가 뒤바뀌면서 큰 순서부터 되버려서 에러가 생김
new MapDriver<Object, Text, Text, Text>()
.withMapper(new MovieAverageRateTopK.TopKMapper())
.withInput(new LongWritable(0), new Text("Toy Story (1995)\t4.25"))
.withInput(new LongWritable(1), new Text("Jumanji (1995)\t3.5"))
.withOutput(new Text("3.5"), new Text("Jumanji (1995)"))
.withOutput(new Text("4.25"), new Text("Toy Story (1995)"))
.runTest();
}
@Test
public void topKReduceTest() throws IOException {
// 여기도 출력 결과의 순서가 중요함. 여긴 높은 평점부터 낮은평점순으로 순서가 되야 함.
new ReduceDriver<Text, Text, Text, Text>()
.withReducer(new MovieAverageRateTopK.TopKReducer())
.withInput(new Text("3.5"), Arrays.asList(new Text("Jumanji (1995)")))
.withInput(new Text("4.25"), Arrays.asList(new Text("Toy Story (1995)")))
.withOutput(new Text("Toy Story (1995)"), new Text("4.25"))
.withOutput(new Text("Jumanji (1995)"), new Text("3.5"))
.runTest();
}
잘 작동되는걸 확인 할 수 있다.