→ Hadoop에서 MapReduce코드를 실행시키면 여기에 만들어주고 그걸 실행시킬 수 있음
→ src : 실제 코드를 넣는 곳
→ template : 과제를 넣는곳
→ datagen : data를 generate할 수 있는 프로그램 들어있음
→ data : data 저장
→ build.xml : Map reduce를 위한 것들이 들어있음
MapReduce코드를 작성 -> Template폴더의 템플릿을 가져가서 코딩 -> src파일에 넣어서 컴파일, 수행
Hadoop, MapReduce코드를 돌릴려면 HDFS에 데이터를 옮겨야함
src 디렉토리에 코드를 만들 때마다 pgd.addClass를 넣어야함
ant를 수행하면 컴파일됨
=⇒wordcount코드가 수행되고 ssafy.jar이라는 패키지를 만들어짐
=⇒ wordcout코드의 argument를 2개 작성 : 입력 파일이 들어있는 dir, 출력 파일이 들어갈 dir
$ hadoop jar [jar file][program name] <intput arguments ...>
ex) $ hadoop jar ssafy.jar wordcount wordcount_test wordcount_test_out
문서마다 map함수가 호출되어 단어마다 value를 만들어 출력
셔플링 페이즈 : map 함수가 출력한 value를 같은 key끼리 모아 list를 만들어서 출력
셔플링 페이즈가 끝난 후 각각의 Key마다 Reduce함수가 호출됨 :
ValueList에 있는 Value들을 합쳐서 (Key, Value)형태로 출
⇒ 출력 : map함수(Key, Value)→셔플링페이즈(Key, ValueList)→Reduce함수(Key, Value)
→ 지금 map함수가 호출 된 dataFile 이름을 return
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.fs.FileSystem;
public static class TokenizerMapper
extends Mapper<Object,Text,Text,Text> {
// variable declairations
private Text pos = new Text(); //value로 사용할 변수
private Text word = new Text(); //key로 사용할 변수
private String filename; //현재 읽는 파일의 이름을 넣을 변수
protected void setup(Context context) throws IOException, InterruptedException{
filename = ((FileSplit)context.getInputSplit()).getPath().getName(); //setup함수에서 filename에 현재사용하는 파일 이름을 뽑아서 저장
}
// map function (Context -> fixed parameter)
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException { //입력 파일을 한 줄 읽고
StringTokenizer itr = new StringTokenizer(value.toString(), " ", true); //단어 단위로 자르기
long p = ((LongWritable)key).get(); //현재 위치가 시작지점에서 몇 byte인지 표시
while ( itr.hasMoreTokens() ) { //다음 단어가 없을 때까지 반복
String token = itr.nextToken(); //다음 단어 가져오기
word.set(token.trim()); //word에 단어를 잘라서 넣음
if(! token.equals(" ")){ //단어가 존재하는 경우
pos.set(filename+":"+p); //value에 파일의 이름과 현재 위치를 저장
context.write(word,pos); //key와 value를 출력
}
p+=token.length(); //현재 위치에 단어 길이만큼 더해줌
}
}
}
public static class IntSumReducer
extends Reducer<Text,Text,Text,Text> {
private Text list = new Text();
// key : a disticnt word
// values : Iterable type (data list)
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String s = new String();
int comma = 0;
for (Text val : values ) { // list의 값들을 하나씩 수행
if(comma == 0){
comma = 1;
s += (":"+val.toString()); //처음에는 앞에 ,를 붙이지 않음
}
else
s += (", "+val.toString()); //두번째 부터 앞에 컴마를 넣어줌
}
list.set(s); //value로 사용할 list에 저장
context.write(key,list); //key와 list 저장
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if ( otherArgs.length != 2 ) { //'hadoop jar jarname.jar [프로그램명]'을 제외한 변수 저장
System.err.println("Usage: <in> <out>");
System.exit(2);
}
FileSystem hdfs = FileSystem.get(conf);
Path output = new Path(otherArgs[1]);
if(hdfs.exists(output)) hdfs.delete(output, true); //output디렉토리 자동 삭제
Job job = new Job(conf,"inverted count");
job.setJarByClass(InvertedIndex.class); //Class 명 설정
job.setMapperClass(TokenizerMapper.class); //Map Class 설정
job.setReducerClass(IntSumReducer.class); //Reduce Class 설정
job.setOutputKeyClass(Text.class); //output key type 설정
job.setOutputValueClass(Text.class); //output value type 설정
job.setNumReduceTasks(2); //동시에 수행되는 reduce개수 설정
FileInputFormat.addInputPath(job,new Path(otherArgs[0])); //입력파일 디렉토리 설정
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //출력파일 디렉토리 설정
FileSystem.get(job.getConfiguration()).delete(new Path(otherArgs[1]),true);
System.exit(job.waitForCompletion(true) ? 0 : 1 ); //실행
}
순서
$ ant
//matrix data를 hdfs에 넣어야 하기때문에 matadd_test라는 input이 들어갈 dir생성
$ hdfs dfs -mkdir matadd_test
//.txt파일은 input data로써 matadd_test dir에 넣음
$ hdfs dfs -put data/matadd-data-2x2.txt matadd_test
//실행시키고 결과 확인
$ hadoop jar ssafy.jar matadd matadd_test matadd_test_out
$ hdfs dfs -cat matadd_test_out/part-r-00000 | more
$ hdfs dfs -cat matadd_test_out/part-r-00001 | more
public static class MAddMapper extends Mapper<Object, Text, Text , IntWritable>{
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String arr[] = value.toString().split("\t");
Text nextKey = new Text(arr[1]+"\t"+arr[2]);
IntWritable nextValue = new IntWritable(Integer.parseInt(arr[3]));
context.write(nextKey, nextValue);
}
}
public static class MAddReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
FileSystem hdfs = FileSystem.get(conf);
Path output = new Path(otherArgs[1]);
if (hdfs.exists(output))
hdfs.delete(output, true);
Job job = new Job(conf, "matrix addition");
job.setJarByClass(MatrixAdd.class);
job.setMapperClass(MAddMapper.class);
job.setReducerClass(MAddReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1)
}
장단점
public static class MMMapper extends Mapper<Object, Text, Text, Text>{
private Text keypair = new Text(); //
private Text valpair= new Text(); //
private String Matrix1name;
private String Matrix2name;
private int n; // 1번째 matrix의 row갯수
private int l; // 1번째 matrix의 column갯수
private int m; // 2번째 matrix의 column갯수
protected void setup(Context context) throws IOException, InterruptedException { //setup함수에서 값을 넣어줌
// ------------------------------------------------------
Configuration config = context.getConfiguration();
Matrix1name = config.get("Matrix1name","A"); //Matrix1name으로 된 String을 받아오는데 없으면 A를 가져옴
Matrix2name = config.get("Matrix2name","B");
n = config.getInt("n",10); //n으로 된 int를 받아오는데 없으면 1을 가져옴
l = config.getInt("l",10);
m = config.getInt("m",10);
// ------------------------------------------------------
}
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// TODO
// ------------------------------------------------------
StringTokenizer token = new StringTokenizer(value.toString());
String mat = token.nextToken();
int row = Integer.parseInt(token.nextToken());
int col = Integer.parseInt(token.nextToken());
int v = Integer.parseInt(token.nextToken());
if(mat.equals(Matrix1name)){ //matrix이름이 1인 경우
valpair.set(""+col+" "+v); //value값에 col의 위치와 v값을 입력
for(int j=0; j<m; j++){ //row는 고정, col값을 1칸씩 증가시키며 key, value를 넣어준다
String p = "" + row + "," + j;
keypair.set(p);
context.write(keypair, valpair);
}
}
else if(mat.equals(Matrix2name)){ //2인 경우 row와 v값을 입력
valpair.set(""+row+" "+v);
for(int i=0; i<n; i++){ //value값은 col은 고정, row를 1개 씩 증가하며 넣어준다
String p = "" + i + "," + col;
keypair.set(p);
context.write(keypari, valpair);
}
}
// ------------------------------------------------------
}