ℹ️ 실습에 사용하는 데이터는, P08-C02 AWS EMR Hadoop 실습 > 2 실습용 데이터 다운로드 에서 세팅한 데이터를 사용한다.
hdfs dfs -ls /data/input
hdfs dfs -head /data/input/1987.csv
코드에서 사용되는 Hadoop 패키지들은 클릭해서 들어가 Java doc 꼭 확인 해보자.
rootProject.name = 'hadoop-mapreduce-app'
plugins {
id 'java'
}
group 'de.example.hadoop.mapreduce'
version '1.0-SNAPSHOT'
def hadoopVersion = "3.2.1"
repositories {
mavenCentral()
}
dependencies {
implementation "org.apache.hadoop:hadoop-common:${hadoopVersion}"
implementation "org.apache.hadoop:hadoop-hdfs-client:${hadoopVersion}"
implementation "org.apache.hadoop:hadoop-mapreduce-client-core:${hadoopVersion}"
}
test {
useJUnitPlatform()
}
csv 파일을 파싱하는 코드이다.
package de.example.hadoop.mapreduce.common;
import org.apache.hadoop.io.Text;
public class AirlinePerformanceParser {
private int year;
private int month;
private int arriveDelayTime = 0;
private int departureDelayTime = 0;
private int distance = 0;
private boolean arriveDelayAvailable = true;
private boolean departureDelayAvailable = true;
private boolean distanceAvailable = true;
private String uniqueCarrier;
public AirlinePerformanceParser(Text text) {
try {
String[] colums = text.toString().split(",");
// 운항 연도 설정
year = Integer.parseInt(colums[0]);
// 운항 월 설정
month = Integer.parseInt(colums[1]);
// 항공사 코드 설정
uniqueCarrier = colums[8];
// 항공기 출발 지연 시간 설정
if (!colums[15].equals("NA")) {
departureDelayTime = Integer.parseInt(colums[15]);
} else {
departureDelayAvailable = false;
}
// 항공기 도착 지연 시간 설정
if (!colums[14].equals("NA")) {
arriveDelayTime = Integer.parseInt(colums[14]);
} else {
arriveDelayAvailable = false;
}
// 운항 거리 설정
if (!colums[18].equals("NA")) {
distance = Integer.parseInt(colums[18]);
} else {
distanceAvailable = false;
}
} catch (Exception e) {
System.out.println("Error parsing a record :" + e.getMessage());
}
}
public int getYear() { return year; }
public int getMonth() { return month; }
public int getArriveDelayTime() { return arriveDelayTime; }
public int getDepartureDelayTime() { return departureDelayTime; }
public boolean isArriveDelayAvailable() { return arriveDelayAvailable; }
public boolean isDepartureDelayAvailable() { return departureDelayAvailable; }
public String getUniqueCarrier() { return uniqueCarrier; }
public int getDistance() { return distance; }
public boolean isDistanceAvailable() { return distanceAvailable; }
}
package de.example.hadoop.mapreduce.departuredelay;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import de.example.hadoop.mapreduce.common.AirlinePerformanceParser;
//LongWritable:input_key, Text:input_value, Text:output_key, IntWritable:output_value(int)
// Text는 hadoop에 있는 Text니가 유의해서 사용하고
public class DepartureDelayCountMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
// map 출력값
private final static IntWritable outputValue = new IntWritable(1);
// map 출력키
private Text outputKey = new Text();
//원본 파일자체를 읽을 것이기 때문에 파일 읽는 거 자체에서는 키가 필요 없음. 그래서 이건 안 쓸 거지만, long type을 써야 하기 때문에 LongWritable 썼고, Text value 만 사용할 것.
//value 어떤 값 들어오냐면 csv 파일의 콤마로 이루어진 한 줄 들어감
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
// 출력키 설정
outputKey.set(parser.getYear() + "," + parser.getMonth());
if (parser.getDepartureDelayTime() > 0) {
// 출력 데이터 생성
context.write(outputKey, outputValue);
}
}
}
지연된 시간을 한 개씩 보냈으면 그것들을 같은 연월을 묶어서 다 더해 줌
package de.example.hadoop.mapreduce.common;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class DelayCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
// 여기 key, value는 DepartureDelayCountMapper 에서 보내준 output의 key랑 value
// values로 되어 있는데, map함수로 되어 있는 여러 개의 값들을 여기서 받기 때문에
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values)
sum += value.get();
// result : reduce의 결과로 쓸 값을 넣을 것
result.set(sum);
context.write(key, result);
//이렇게 되면 key 몇년 몇월 에 있는, result : 모든 일씩 카운트를 센 result가 여기에 들어와서 쓰여지게 됨
}
}
드라이버 역할을 하는 클래스이다.
package de.example.hadoop.mapreduce.departuredelay;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import de.example.hadoop.mapreduce.common.DelayCountReducer;
public class DepartureDelayCountApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 입출력 데이터 경로 확인
if (args.length != 2) {
System.err.println("Usage: DepartureDelayCount <input> <output>");
System.exit(2);
}
// Job 이름 설정
Job job = Job.getInstance(conf, "DepartureDelayCountApp");
// 입출력 데이터 경로 설정
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Job 클래스 설정, (자기 자신)
job.setJarByClass(DepartureDelayCountApp.class);
// Mapper 클래스 설정
// jab submit jar에 이 클래스가 위치해야 됨. 클래스 path 찾을 수 있어야 함
job.setMapperClass(DepartureDelayCountMapper.class);
// Reducer 클래스 설정
job.setReducerClass(DelayCountReducer.class);
// 입출력 데이터 포맷 설정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 출력키 및 출력값 유형 설정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
// job submit한 cli에서 그 세션을 계속 잡아 놓고 job이 끝날 때 까지 기다림
}
}
빌드
./gradlew build
이동
scp -i $your_key $your_jar_path hadoop@$your_emr_primary_ec2_public_dns:~/example/jars/.
실행
hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.departuredelay.DepartureDelayCountApp /data/input /data/output/departure_deplay_count
다음과 같은 결과가 나왔다.
INFO mapreduce.Job: Counters: 57
File System Counters
FILE: Number of bytes read=30222939
FILE: Number of bytes written=84526578
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=11578443804
HDFS: Number of bytes written=3520
HDFS: Number of read operations=301
HDFS: Number of large read operations=0
HDFS: Number of write operations=10
HDFS: Number of bytes read erasure-coded=0
Job Counters
Killed map tasks=1
Killed reduce tasks=1
Launched map tasks=92
Launched reduce tasks=5
Data-local map tasks=48
Rack-local map tasks=44
Total time spent by all maps in occupied slots (ms)=67204992
Total time spent by all reduces in occupied slots (ms)=44446656
Total time spent by all map tasks (ms)=700052
Total time spent by all reduce tasks (ms)=231493
Total vcore-milliseconds taken by all map tasks=700052
Total vcore-milliseconds taken by all reduce tasks=231493
Total megabyte-milliseconds taken by all map tasks=2150559744
Total megabyte-milliseconds taken by all reduce tasks=1422292992
Map-Reduce Framework
Map input records=118914458
Map output records=48310901
Map output bytes=543783522
Map output materialized bytes=30228066
Input split bytes=9936
Combine input records=0
Combine output records=0
Reduce input groups=247
Reduce shuffle bytes=30228066
Reduce input records=48310901
Reduce output records=247
Spilled Records=96621802
Shuffled Maps =460
Failed Shuffles=0
Merged Map outputs=460
GC time elapsed (ms)=25640
CPU time spent (ms)=616540
Physical memory (bytes) snapshot=104649011200
Virtual memory (bytes) snapshot=441144213504
Total committed heap usage (bytes)=105904603136
Peak Map Physical memory (bytes)=1470734336
Peak Map Virtual memory (bytes)=4472176640
Peak Reduce Physical memory (bytes)=483819520
Peak Reduce Virtual memory (bytes)=7100338176
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=11578433868
File Output Format Counters
Bytes Written=3520
job 제출시 설정한 output directory 에 보면 다음과 같은 결과를 확인할 수 있다.
org.apache.hadoop.util 패키지에는 개발자가 맵리듀스 프로그래밍에서 편의를 높일 수 있는 기능들을 제공한다.
사용자 정의 옵션을 지원하는 도구는 GenericOptionsParser, Tool, ToolRunner 가 있다.
맵리듀스 Job 을 실행하면 Map, Reduce 메소드가 완료되면 Counters 라는 결과가 출력이 된다.
하둡 맵리듀스는 Job의 진행상황을 기록하고 모니터링할 수 있도록 Counter API 를 제공하고, 모든 Job은 기본적으로 내장 counter 를 가지고 있다.
앞선 예제를 수행한 결과도 내장 Counter 로 인해 출력된 것이다.
이 Counter API를 이용하면 내가 직접 남기고 싶은 카운터 값을 남길 수 있다.
사용자 정의 옵션을 사용해서 출발 지연 시간, 도착 지연 시간을 모두 계산할 수 있는 실습을 진행한다.
Counter 를 이용해서 지연여부별로 counter 를 남긴다.
package de.example.hadoop.mapreduce.common;
public enum DelayCounters {
not_available_arrival, scheduled_arrival, early_arrival, not_available_departure,
scheduled_departure, early_departure;
}
package de.example.hadoop.mapreduce.custom_option_counter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import de.example.hadoop.mapreduce.common.AirlinePerformanceParser;
import de.example.hadoop.mapreduce.common.DelayCounters;
public class DelayCountMapperWithCounter
extends Mapper<LongWritable, Text, Text, IntWritable> {
// 작업 구분
private String workType;
// map 출력값
private final static IntWritable outputValue = new IntWritable(1);
// map 출력키
private Text outputKey = new Text();
@Override
public void setup(Context context) throws IOException, InterruptedException {
workType = context.getConfiguration().get("workType");
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
// 출발 지연 데이터 출력
if (workType.equals("departure")) {
if (parser.isDepartureDelayAvailable()) {
if (parser.getDepartureDelayTime() > 0) {
// 출력키 설정
outputKey.set(parser.getYear() + "," + parser.getMonth());
// 출력 데이터 생성
context.write(outputKey, outputValue);
} else if (parser.getDepartureDelayTime() == 0) {
context.getCounter(DelayCounters.scheduled_departure).increment(1);
//counter 꺼낼 때 Context에서 꺼낸다. 결국 Counter를 제공하는 게 MapReduce프레임워크인데 프레임워크와 관련된 동작을 할 때는 전부 Context를 이용해서 데이터를 넣거나 가져오기 한다고 보면 됨.
} else if (parser.getDepartureDelayTime() < 0) {
context.getCounter(DelayCounters.early_departure).increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_departure).increment(1);
}
// 도착 지연 데이터 출력
} else if (workType.equals("arrival")) {
if (parser.isArriveDelayAvailable()) {
if (parser.getArriveDelayTime() > 0) {
// 출력키 설정
outputKey.set(parser.getYear() + "," + parser.getMonth());
// 출력 데이터 생성
context.write(outputKey, outputValue);
} else if (parser.getArriveDelayTime() == 0) {
context.getCounter(
DelayCounters.scheduled_arrival).increment(1);
} else if (parser.getArriveDelayTime() < 0) {
context.getCounter(DelayCounters.early_arrival).increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_arrival).increment(1);
}
}
}
}
reducer 는 DelayCountReducer 를 사용한다.
package de.example.hadoop.mapreduce.custom_option_counter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import de.example.hadoop.mapreduce.common.DelayCountReducer;
public class DelayCountWithCounterApp extends Configured implements Tool {
public static void main(String[] args) throws Exception {
// Tool 인터페이스 실행
int res = ToolRunner.run(new Configuration(), new DelayCountWithCounterApp(), args);
System.out.println("MR-Job Result:" + res);
}
@Override
public int run(String[] args) throws Exception {
//getConf()는 Configured 상속받았고, Tool 인터페이스 실행의 new Configuration() 에서 들어온 configuration이다.
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
// 입출력 데이터 경로 확인
if (otherArgs.length != 2) {
System.err.println("Usage: DelayCountWithCounter <in> <out>");
System.exit(2);
}
// Job 이름 설정
Job job = Job.getInstance(getConf(), "DelayCountWithCounter");
// 입출력 데이터 경로 설정
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// Job 클래스 설정
job.setJarByClass(DelayCountWithCounterApp.class);
// Mapper 클래스 설정
job.setMapperClass(DelayCountMapperWithCounter.class);
// Reducer 클래스 설정
job.setReducerClass(DelayCountReducer.class);
// 입출력 데이터 포맷 설정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 출력키 및 출력값 유형 설정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
return 0;
}
}
위 DelayCountWithCounterApp 클래스를 가지고 build 한 다음에,
빌드
./gradlew build
빌드 결과를
이동
scp -i $your_key $your_jar_path hadoop@$your_emr_primary_ec2_public_dns:~/example/jars/.
실행
hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.custom_option_counter.DelayCountWithCounterApp -D workType=departure /data/input /data/output/delay_count_with_counter_worktype-departure
-D workType=departure
이 파라미터를 DelayCountWithCounterApp.java 에서 설정했던 ToolRunner가 받아서 DelayCountMapperWithCounter 의 context.getConfiguration().get("worktype")
에서 받을 수 있게 해주는 것INFO mapreduce.Job: Counters: 59
File System Counters
FILE: Number of bytes read=30222939
FILE: Number of bytes written=84559849
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=11578443804
HDFS: Number of bytes written=3520
HDFS: Number of read operations=301
HDFS: Number of large read operations=0
HDFS: Number of write operations=10
HDFS: Number of bytes read erasure-coded=0
Job Counters
Killed reduce tasks=1
Launched map tasks=92
Launched reduce tasks=5
Data-local map tasks=48
Rack-local map tasks=44
Total time spent by all maps in occupied slots (ms)=67804512
Total time spent by all reduces in occupied slots (ms)=44550720
Total time spent by all map tasks (ms)=706297
Total time spent by all reduce tasks (ms)=232035
Total vcore-milliseconds taken by all map tasks=706297
Total vcore-milliseconds taken by all reduce tasks=232035
Total megabyte-milliseconds taken by all map tasks=2169744384
Total megabyte-milliseconds taken by all reduce tasks=1425623040
Map-Reduce Framework
Map input records=118914458
Map output records=48310901
Map output bytes=543783522
Map output materialized bytes=30228066
Input split bytes=9936
Combine input records=0
Combine output records=0
Reduce input groups=247
Reduce shuffle bytes=30228066
Reduce input records=48310901
Reduce output records=247
Spilled Records=96621802
Shuffled Maps =460
Failed Shuffles=0
Merged Map outputs=460
GC time elapsed (ms)=22741
CPU time spent (ms)=602380
Physical memory (bytes) snapshot=99204706304
Virtual memory (bytes) snapshot=441102503936
Total committed heap usage (bytes)=100565254144
Peak Map Physical memory (bytes)=1469022208
Peak Map Virtual memory (bytes)=4437213184
Peak Reduce Physical memory (bytes)=492261376
Peak Reduce Virtual memory (bytes)=7098269696
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
de.example.hadoop.mapreduce.common.DelayCounters
early_departure=42305219
not_available_departure=2230332
scheduled_departure=26068006
File Input Format Counters
Bytes Read=11578433868
File Output Format Counters
Bytes Written=3520
de.example.hadoop.mapreduce.common.DelayCounters
라는 enum 클래스 이름 하위에 counter 집계된 항목들이 enum 필드 이름과 함께 남는다.