첫 번째 MapReduce - Join
MovieAverageRateTopK
package com.fastcampus.hadoop;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class MovieAverageRateTopK extends Configured implements Tool {
public static class MovieMapper extends Mapper<Object, Text, Text, Text> {
private Text movieId = new Text();
private Text outValue = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] columns = value.toString().split(",");
if (columns[0].equals("movieId")) {
return;
}
movieId.set(columns[0]);
outValue.set("M" + columns[1]);
context.write(movieId, outValue);
}
}
public static class RatingMapper extends Mapper<Object, Text, Text, Text> {
private Text movieId = new Text();
private Text outValue = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] colums = value.toString().split(",");
if (colums[0].equals("userId")) {
return;
}
movieId.set(colums[1]);
outValue.set("R" + colums[2]);
context.write(movieId, outValue);
}
}
public static class MovieRatingJoinReducer extends Reducer<Text, Text, Text, Text> {
private List<String> ratingList = new ArrayList<>();
private Text movieName = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
ratingList.clear();
for (Text value : values) {
if (value.charAt(0) == 'M') {
movieName.set(value.toString().substring(1));
} else if (value.charAt(0) == 'R') {
ratingList.add(value.toString().substring(1));
}
}
double average = ratingList.stream().mapToDouble(Double::parseDouble).average().orElse(0.0);
outValue.set(String.valueOf(average));
context.write(movieName, outValue);
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "MovieAverageRateTopK First");
job.setJarByClass(MovieAverageRateTopK.class);
job.setReducerClass(MovieRatingJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MovieMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, RatingMapper.class);
return 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MovieAverageRateTopK(), args);
System.exit(exitCode);
}
}
첫 번째 MapReduce - Unit Test
MovieAverageRateTopKTest
package com.fastcampus.hadoop;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
public class MovieAverageRateTopKTest {
@Test
public void movieMapTest() throws IOException {
MapDriver mapDriver = new MapDriver<Object, Text, Text, Text>()
.withMapper(new MovieAverageRateTopK.MovieMapper())
.withInput(new LongWritable(0), new Text("movieId,title,genres"))
.withInput(new LongWritable(1), new Text("1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy"))
.withInput(new LongWritable(2), new Text("2,Jumanji (1995),Adventure|Children|Fantasy"));
mapDriver.withOutput(new Text("1"), new Text("MToy Story (1995)"))
.withOutput(new Text("2"), new Text("MJumanji (1995)"))
.runTest();
}
@Test
public void ratingMapTest() throws IOException {
new MapDriver<Object, Text, Text, Text>()
.withMapper(new MovieAverageRateTopK.RatingMapper())
.withInput(new LongWritable(0), new Text("userId,movieId,rating,timestamp"))
.withInput(new LongWritable(1), new Text("1,1,4.0,964982703"))
.withInput(new LongWritable(2), new Text("7,1,4.5,1106635946"))
.withInput(new LongWritable(3), new Text("8,2,4.0,839463806"))
.withInput(new LongWritable(4), new Text("18,2,3.0,1455617462"))
.withOutput(new Text("1"), new Text("R4.0"))
.withOutput(new Text("1"), new Text("R4.5"))
.withOutput(new Text("2"), new Text("R4.0"))
.withOutput(new Text("2"), new Text("R3.0"))
.runTest();
}
@Test
public void movieRatingJoinReduceTest() throws IOException {
new ReduceDriver<Text, Text, Text, Text>()
.withReducer(new MovieAverageRateTopK.MovieRatingJoinReducer())
.withInput(new Text("1"), Arrays.asList(new Text("MToy Story (1995)"), new Text("R4.0"), new Text("R4.5")))
.withInput(new Text("2"), Arrays.asList(new Text("MJumanji (1995)"), new Text("R4.0"), new Text("R3.0")))
.withOutput(new Text("Toy Story (1995)"), new Text("4.25"))
.withOutput(new Text("Jumanji (1995)"), new Text("3.5"))
.runTest();
}
}