[MapReduce] MapReduce 프로그래밍

Hyunjun Kim·2025년 8월 23일
0

Data_Engineering

목록 보기
137/153

2 MapReduce 프로그래밍

ℹ️ 실습에 사용하는 데이터는, P08-C02 AWS EMR Hadoop 실습 > 2 실습용 데이터 다운로드 에서 세팅한 데이터를 사용한다.

hdfs dfs -ls /data/input
hdfs dfs -head /data/input/1987.csv

2.1 공통 코드

코드에서 사용되는 Hadoop 패키지들은 클릭해서 들어가 Java doc 꼭 확인 해보자.

2.1.1 settings.gradle

rootProject.name = 'hadoop-mapreduce-app'
  • jar가 만들어지면 prefix로 붙을 것. 맘대로 바꿔도 됨

2.1.2 build.gradle

plugins {
    id 'java'
}

group 'de.example.hadoop.mapreduce'
version '1.0-SNAPSHOT'

def hadoopVersion = "3.2.1"

repositories {
    mavenCentral()
}

dependencies {
    implementation "org.apache.hadoop:hadoop-common:${hadoopVersion}"
    implementation "org.apache.hadoop:hadoop-hdfs-client:${hadoopVersion}"
    implementation "org.apache.hadoop:hadoop-mapreduce-client-core:${hadoopVersion}"
}

test {
    useJUnitPlatform()
}

2.1.3 AirlinePerformanceParser.java

csv 파일을 파싱하는 코드이다.

package de.example.hadoop.mapreduce.common;

import org.apache.hadoop.io.Text;

public class AirlinePerformanceParser {

    private int year;
    private int month;

    private int arriveDelayTime = 0;
    private int departureDelayTime = 0;
    private int distance = 0;

    private boolean arriveDelayAvailable = true;
    private boolean departureDelayAvailable = true;
    private boolean distanceAvailable = true;

    private String uniqueCarrier;

    public AirlinePerformanceParser(Text text) {
        try {
            String[] colums = text.toString().split(",");

            // 운항 연도 설정
            year = Integer.parseInt(colums[0]);

            // 운항 월 설정
            month = Integer.parseInt(colums[1]);

            // 항공사 코드 설정
            uniqueCarrier = colums[8];

            // 항공기 출발 지연 시간 설정
            if (!colums[15].equals("NA")) {
                departureDelayTime = Integer.parseInt(colums[15]);
            } else {
                departureDelayAvailable = false;
            }

            // 항공기 도착 지연 시간 설정
            if (!colums[14].equals("NA")) {
                arriveDelayTime = Integer.parseInt(colums[14]);
            } else {
                arriveDelayAvailable = false;
            }

            // 운항 거리 설정
            if (!colums[18].equals("NA")) {
                distance = Integer.parseInt(colums[18]);
            } else {
                distanceAvailable = false;
            }
        } catch (Exception e) {
            System.out.println("Error parsing a record :" + e.getMessage());
        }
    }

    public int getYear() { return year; }

    public int getMonth() { return month; }

    public int getArriveDelayTime() { return arriveDelayTime; }

    public int getDepartureDelayTime() { return departureDelayTime; }

    public boolean isArriveDelayAvailable() { return arriveDelayAvailable; }

    public boolean isDepartureDelayAvailable() { return departureDelayAvailable;  }

    public String getUniqueCarrier() { return uniqueCarrier; }

    public int getDistance() { return distance; }

    public boolean isDistanceAvailable() { return distanceAvailable;  }
}


2.2 출발 지연 데이터 분석

2.2.1 DepartureDelayCountMapper.java

package de.example.hadoop.mapreduce.departuredelay;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

import de.example.hadoop.mapreduce.common.AirlinePerformanceParser;

//LongWritable:input_key, Text:input_value, Text:output_key, IntWritable:output_value(int)
// Text는 hadoop에 있는 Text니가 유의해서 사용하고
public class DepartureDelayCountMapper extends
                                       Mapper<LongWritable, Text, Text, IntWritable> {

    // map 출력값
    private final static IntWritable outputValue = new IntWritable(1);
    // map 출력키
    private Text outputKey = new Text();
    
    //원본 파일자체를 읽을 것이기 때문에 파일 읽는 거 자체에서는 키가 필요 없음. 그래서 이건 안 쓸 거지만, long type을 써야 하기 때문에 LongWritable 썼고, Text value 만 사용할 것.
    //value 어떤 값 들어오냐면 csv 파일의 콤마로 이루어진 한 줄 들어감
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

        // 출력키 설정
        outputKey.set(parser.getYear() + "," + parser.getMonth());

        if (parser.getDepartureDelayTime() > 0) {
            // 출력 데이터 생성
            context.write(outputKey, outputValue);
        }
    }
}

2.2.2 DelayCountReducer.java

지연된 시간을 한 개씩 보냈으면 그것들을 같은 연월을 묶어서 다 더해 줌

package de.example.hadoop.mapreduce.common;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class DelayCountReducer extends
                               Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();
    
    // 여기 key, value는 DepartureDelayCountMapper 에서 보내준 output의 key랑 value
    // values로 되어 있는데, map함수로 되어 있는 여러 개의 값들을 여기서 받기 때문에 
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values)
            sum += value.get();
            
		// result : reduce의 결과로 쓸 값을 넣을 것
        result.set(sum);
        context.write(key, result);
        //이렇게 되면 key 몇년 몇월 에 있는, result : 모든 일씩 카운트를 센 result가 여기에 들어와서 쓰여지게 됨
    }
}

2.2.3 DepartureDelayCountApp.java

드라이버 역할을 하는 클래스이다.

package de.example.hadoop.mapreduce.departuredelay;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import de.example.hadoop.mapreduce.common.DelayCountReducer;

public class DepartureDelayCountApp {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        // 입출력 데이터 경로 확인
        if (args.length != 2) {
            System.err.println("Usage: DepartureDelayCount <input> <output>");
            System.exit(2);
        }
        // Job 이름 설정
        Job job = Job.getInstance(conf, "DepartureDelayCountApp");

        // 입출력 데이터 경로 설정
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // Job 클래스 설정, (자기 자신)
        job.setJarByClass(DepartureDelayCountApp.class);
        // Mapper 클래스 설정
        // jab submit jar에 이 클래스가 위치해야 됨. 클래스 path 찾을 수 있어야 함
        job.setMapperClass(DepartureDelayCountMapper.class);
        // Reducer 클래스 설정
        job.setReducerClass(DelayCountReducer.class);

        // 입출력 데이터 포맷 설정
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 출력키 및 출력값 유형 설정
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.waitForCompletion(true);
        // job submit한 cli에서 그 세션을 계속 잡아 놓고 job이 끝날 때 까지 기다림
    }
}

2.2.4 빌드 후 실행

빌드

./gradlew build

이동

scp -i $your_key $your_jar_path hadoop@$your_emr_primary_ec2_public_dns:~/example/jars/.

실행

hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.departuredelay.DepartureDelayCountApp /data/input /data/output/departure_deplay_count

2.2.5 결과 & 해석

다음과 같은 결과가 나왔다.

INFO mapreduce.Job: Counters: 57
	File System Counters
		FILE: Number of bytes read=30222939
		FILE: Number of bytes written=84526578
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=11578443804
		HDFS: Number of bytes written=3520
		HDFS: Number of read operations=301
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=10
		HDFS: Number of bytes read erasure-coded=0
	Job Counters
		Killed map tasks=1
		Killed reduce tasks=1
		Launched map tasks=92
		Launched reduce tasks=5
		Data-local map tasks=48
		Rack-local map tasks=44
		Total time spent by all maps in occupied slots (ms)=67204992
		Total time spent by all reduces in occupied slots (ms)=44446656
		Total time spent by all map tasks (ms)=700052
		Total time spent by all reduce tasks (ms)=231493
		Total vcore-milliseconds taken by all map tasks=700052
		Total vcore-milliseconds taken by all reduce tasks=231493
		Total megabyte-milliseconds taken by all map tasks=2150559744
		Total megabyte-milliseconds taken by all reduce tasks=1422292992
	Map-Reduce Framework
		Map input records=118914458
		Map output records=48310901
		Map output bytes=543783522
		Map output materialized bytes=30228066
		Input split bytes=9936
		Combine input records=0
		Combine output records=0
		Reduce input groups=247
		Reduce shuffle bytes=30228066
		Reduce input records=48310901
		Reduce output records=247
		Spilled Records=96621802
		Shuffled Maps =460
		Failed Shuffles=0
		Merged Map outputs=460
		GC time elapsed (ms)=25640
		CPU time spent (ms)=616540
		Physical memory (bytes) snapshot=104649011200
		Virtual memory (bytes) snapshot=441144213504
		Total committed heap usage (bytes)=105904603136
		Peak Map Physical memory (bytes)=1470734336
		Peak Map Virtual memory (bytes)=4472176640
		Peak Reduce Physical memory (bytes)=483819520
		Peak Reduce Virtual memory (bytes)=7100338176
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=11578433868
	File Output Format Counters
		Bytes Written=3520
  • map task 가 얼마나 효율적인지는, 전체 map task 수 대비 data-local, rack-local 에 뜬 태스크의 수를 확인하면 된다.
  • Shuffle 에 의한 성능은 아래 지표로 확인한다. shuffle 이 일어난 map의 수나, bytes 수가 적을수록 network에 의한 부하가 줄어 성능이 좋아질 수 있다.
    • Shuffled Maps
    • Reduce shuffle bytes

2.2.6 파일 결과 확인

job 제출시 설정한 output directory 에 보면 다음과 같은 결과를 확인할 수 있다.

  • 성공 상태가 _SUCCESS 파일로 만들어져있다.
  • 데이터가 part-3-00000X 파일들로 나눠서 들어가있다.
  • 파일에 저장된 데이터는 정렬되어있지 않다.

2.3 사용자 정의 옵션, Counter 사용하기


2.3.1 사용자 정의 옵션

org.apache.hadoop.util 패키지에는 개발자가 맵리듀스 프로그래밍에서 편의를 높일 수 있는 기능들을 제공한다.
사용자 정의 옵션을 지원하는 도구는 GenericOptionsParser, Tool, ToolRunner 가 있다.


2.3.2 Counter

맵리듀스 Job 을 실행하면 Map, Reduce 메소드가 완료되면 Counters 라는 결과가 출력이 된다.
하둡 맵리듀스는 Job의 진행상황을 기록하고 모니터링할 수 있도록 Counter API 를 제공하고, 모든 Job은 기본적으로 내장 counter 를 가지고 있다.
앞선 예제를 수행한 결과도 내장 Counter 로 인해 출력된 것이다.

이 Counter API를 이용하면 내가 직접 남기고 싶은 카운터 값을 남길 수 있다.


2.3.3 실습 내용

사용자 정의 옵션을 사용해서 출발 지연 시간, 도착 지연 시간을 모두 계산할 수 있는 실습을 진행한다.

Counter 를 이용해서 지연여부별로 counter 를 남긴다.


2.3.4 DelayCounters.java

package de.example.hadoop.mapreduce.common;

public enum DelayCounters {
    not_available_arrival, scheduled_arrival, early_arrival, not_available_departure,
    scheduled_departure, early_departure;
}
  • 사용자 정의 카운터의 이름과 종류를 위한 enum 클래스.

2.3.5 DelayCountMapperWithCounter.java

package de.example.hadoop.mapreduce.custom_option_counter;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

import de.example.hadoop.mapreduce.common.AirlinePerformanceParser;
import de.example.hadoop.mapreduce.common.DelayCounters;

public class DelayCountMapperWithCounter
        extends Mapper<LongWritable, Text, Text, IntWritable> {
    // 작업 구분
    private String workType;
    // map 출력값
    private final static IntWritable outputValue = new IntWritable(1);
    // map 출력키
    private Text outputKey = new Text();

    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        workType = context.getConfiguration().get("workType");
    }

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

        // 출발 지연 데이터 출력
        if (workType.equals("departure")) {
            if (parser.isDepartureDelayAvailable()) {
                if (parser.getDepartureDelayTime() > 0) {
                    // 출력키 설정
                    outputKey.set(parser.getYear() + "," + parser.getMonth());
                    // 출력 데이터 생성
                    context.write(outputKey, outputValue);
                } else if (parser.getDepartureDelayTime() == 0) {
                    context.getCounter(DelayCounters.scheduled_departure).increment(1);
                    //counter 꺼낼 때 Context에서 꺼낸다. 결국 Counter를 제공하는 게 MapReduce프레임워크인데 프레임워크와 관련된 동작을 할 때는 전부 Context를 이용해서 데이터를 넣거나 가져오기 한다고 보면 됨.
                } else if (parser.getDepartureDelayTime() < 0) {
                    context.getCounter(DelayCounters.early_departure).increment(1);
                }
            } else {
                context.getCounter(DelayCounters.not_available_departure).increment(1);
            }
            // 도착 지연 데이터 출력
        } else if (workType.equals("arrival")) {
            if (parser.isArriveDelayAvailable()) {
                if (parser.getArriveDelayTime() > 0) {
                    // 출력키 설정
                    outputKey.set(parser.getYear() + "," + parser.getMonth());
                    // 출력 데이터 생성
                    context.write(outputKey, outputValue);
                } else if (parser.getArriveDelayTime() == 0) {
                    context.getCounter(
                            DelayCounters.scheduled_arrival).increment(1);
                } else if (parser.getArriveDelayTime() < 0) {
                    context.getCounter(DelayCounters.early_arrival).increment(1);
                }
            } else {
                context.getCounter(DelayCounters.not_available_arrival).increment(1);
            }
        }
    }
}
  • Tool 에 의해 넘어온 옵션을 context에 담긴 configuration 객체를 통해 읽을 수 있다.
  • context 에서 counter 를 enum을 key로해서 꺼낸 뒤, increment 로 카운트를 증가시킨다.

2.3.6 Reducer

reducer 는 DelayCountReducer 를 사용한다.


2.3.7 DelayCountWithCounterApp.java

package de.example.hadoop.mapreduce.custom_option_counter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import de.example.hadoop.mapreduce.common.DelayCountReducer;

public class DelayCountWithCounterApp extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // Tool 인터페이스 실행
        int res = ToolRunner.run(new Configuration(), new DelayCountWithCounterApp(), args);
        System.out.println("MR-Job Result:" + res);
    }

	@Override
    public int run(String[] args) throws Exception {
    	//getConf()는 Configured 상속받았고, Tool 인터페이스 실행의 new Configuration() 에서 들어온 configuration이다.
        String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
        // 입출력 데이터 경로 확인
        if (otherArgs.length != 2) {
            System.err.println("Usage: DelayCountWithCounter <in> <out>");
            System.exit(2);
        }
        // Job 이름 설정
        Job job = Job.getInstance(getConf(), "DelayCountWithCounter");

        // 입출력 데이터 경로 설정
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        // Job 클래스 설정
        job.setJarByClass(DelayCountWithCounterApp.class);
        // Mapper 클래스 설정
        job.setMapperClass(DelayCountMapperWithCounter.class);
        // Reducer 클래스 설정
        job.setReducerClass(DelayCountReducer.class);

        // 입출력 데이터 포맷 설정
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 출력키 및 출력값 유형 설정
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.waitForCompletion(true);
        return 0;
    }
}
  • 커스텀 옵션을 위해
    • org.apache.hadoop.conf.Configured 을 상속받는다.
    • org.apache.hadoop.util.Tool 을 구현한다.
    • GenericOptionsParser 를 이용해야 ToolRunner에 의해 mapreduce 에 넘어가는 옵션과 Job 에 넘어오는 옵션을 구분할 수 있다.

2.3.8 빌드 후 실행

위 DelayCountWithCounterApp 클래스를 가지고 build 한 다음에,

빌드

./gradlew build

빌드 결과를
이동

scp -i $your_key $your_jar_path hadoop@$your_emr_primary_ec2_public_dns:~/example/jars/.

실행

hadoop jar hadoop-mapreduce-app-1.0-SNAPSHOT.jar de.example.hadoop.mapreduce.custom_option_counter.DelayCountWithCounterApp -D workType=departure /data/input /data/output/delay_count_with_counter_worktype-departure
  • -D workType=departure 이 파라미터를 DelayCountWithCounterApp.java 에서 설정했던 ToolRunner가 받아서 DelayCountMapperWithCounter 의 context.getConfiguration().get("worktype") 에서 받을 수 있게 해주는 것

2.3.9 결과

INFO mapreduce.Job: Counters: 59
	File System Counters
		FILE: Number of bytes read=30222939
		FILE: Number of bytes written=84559849
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=11578443804
		HDFS: Number of bytes written=3520
		HDFS: Number of read operations=301
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=10
		HDFS: Number of bytes read erasure-coded=0
	Job Counters
		Killed reduce tasks=1
		Launched map tasks=92
		Launched reduce tasks=5
		Data-local map tasks=48
		Rack-local map tasks=44
		Total time spent by all maps in occupied slots (ms)=67804512
		Total time spent by all reduces in occupied slots (ms)=44550720
		Total time spent by all map tasks (ms)=706297
		Total time spent by all reduce tasks (ms)=232035
		Total vcore-milliseconds taken by all map tasks=706297
		Total vcore-milliseconds taken by all reduce tasks=232035
		Total megabyte-milliseconds taken by all map tasks=2169744384
		Total megabyte-milliseconds taken by all reduce tasks=1425623040
	Map-Reduce Framework
		Map input records=118914458
		Map output records=48310901
		Map output bytes=543783522
		Map output materialized bytes=30228066
		Input split bytes=9936
		Combine input records=0
		Combine output records=0
		Reduce input groups=247
		Reduce shuffle bytes=30228066
		Reduce input records=48310901
		Reduce output records=247
		Spilled Records=96621802
		Shuffled Maps =460
		Failed Shuffles=0
		Merged Map outputs=460
		GC time elapsed (ms)=22741
		CPU time spent (ms)=602380
		Physical memory (bytes) snapshot=99204706304
		Virtual memory (bytes) snapshot=441102503936
		Total committed heap usage (bytes)=100565254144
		Peak Map Physical memory (bytes)=1469022208
		Peak Map Virtual memory (bytes)=4437213184
		Peak Reduce Physical memory (bytes)=492261376
		Peak Reduce Virtual memory (bytes)=7098269696
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	de.example.hadoop.mapreduce.common.DelayCounters
		early_departure=42305219
		not_available_departure=2230332
		scheduled_departure=26068006
	File Input Format Counters
		Bytes Read=11578433868
	File Output Format Counters
		Bytes Written=3520
  • de.example.hadoop.mapreduce.common.DelayCounters 라는 enum 클래스 이름 하위에 counter 집계된 항목들이 enum 필드 이름과 함께 남는다.
profile
Data Analytics Engineer 가 되

0개의 댓글