하둡 프로그래밍 - MapReduce 응용

Park Suyong·2021년 1월 30일
0

하둡

목록 보기
6/8
post-custom-banner

이전 게시물에서 항공 출발 지연 데이터에 대한 분석을 진행했었다. 반대로, 항공 도착 지연 데이터도 존재한다. 코드는 거의 유사하고, 동작 방식이나 출력 결과도 거의 다르지 않으므로 Github에만 업로드 하고 블로깅을 따로 하지는 않았다. 문제는 여기서 발생한다. 두 프로그램의 코드가 거의 유사하다는 것이다. 그렇다면 코드를 좀 더 간단하게 처리할 수 있는 방법은 무엇일까?

MapReduce Job을 처리할 때 사용자가 정의한 파라미터를 입력받아 항공 도착 혹은 출발 지연 데이터 중 하나를 분석하도록 설계할 수 있다. 이 때, 사용자가 정의한 파라미터를 처리하려면 하둡에서 제공하는 사용자 정의 옵션과 관련된 다양한 API를 이해해야 한다.

1. 사용자 정의 옵션의 이해

하둡에서는 개발자가 편리하게 MapReduce 프로그램을 개발할 수 있도록 다양한 헬퍼 Class를 제공한다. 이 클래스는 org.apache.hadoop.util 패키지에 구현되어 있다.

GenericOptionsParser

GenericOptionsParser는 하둡 콘솔 명령어에서 입력한 옵션을 분석한다. 또한, GenericOptionsParser는 사용자가 하둡 콘솔 명령어에서 네임 노드, JobTracker 추가 구성 자원 등을 설정할 수 있는 각종 options을 제공한다. 아래 표는 GenericOptionsParser가 제공하는 options을 정리한 것이다.

GenericOptionsParser Options 표
옵션 기능
-conf [파일명] 명시한 파일을 환경설정에 있는 리소스 정보에 추가한다.
-D [옵션 = 값] 하둡 환경설정 파일에 있는 옵션에 새로운 값을 설정한다.
-fs [Namenode호스트:Namenode포트] 네임 노드를 새롭게 설정한다.
-jt [JobTracker호스트:JobTracker포트] Job Tracker를 새롭게 설정한다.
-files [파일1, 파일2,..., 파일n] 로컬에 있는 파일을 HDFS에서 사용하는 공유 파일 시스템으로 복사한다.
-libjars [JAR파일1, JAR파일2,..., JAR파일n] 로컬에 있는 JAR 파일을 HDFS에서 사용하는 공유 파일 시스템으로 복사하고, 맵리듀스의 Task Class Path에 추가한다.
archives [아카이브파일1, 아카이브파일2, ..., 아카이브파일n] 로컬에 있는 아카이브 파일을 HDFS에서 사용하는 공유 파일 시스템으로 복사한 후 압축을 푼다.
예시
./bin/hadoop dfs -fs server01:10020 -ls /input
호스트명이 server01이고, 포트 번호가 10020인 네임 노드에서 HDFS의 input 폴더에 있는 파일 목록을 조회한다.
./bin/hadoop dfs -D fs.default.name==server01:10020 -ls /input
호스트명이 server01이고, 포트 번호가 10020인 네임 노드에서 HDFS의 input 폴더에 있는 파일 목록을 조회한다.
./bin/hadoop dfs -conf hadoop-site.xml -ls /input
hadoop-site.xml에 정의된 네임 노드에서 HDFS의 input 폴더에 있는 파일 목록을 조회한다.
./bin/hadoop job -D mapred.job.tracker=server01:10030 -submit job.xml
환경설정 파일 mapred-site.xml에 있는 mapred.job.tracker 옵션 값을 server01:10030로 설정한 후 JobTracker에게 Job을 제출한다.
./bin/hadoop job -jt server01:10040 -submit job.xml
호스트명이 server이고, 포트번호가 10040인 JobTracker에게 Job을 제출한다. 

맵리듀스 프로그램을 개발할 때는 GenericOptionsParser만 단독으로 사용하기 보다 GenericOptionsParser가 사용하는 Configuration 객체를 상속받는 Tool InterfaceGenericOptionsParser를 내부적으로 선언한 ToolRunner Class를 이용한다.

Tool

Tool InterfaceGenericOptionsParser의 콘솔 설정 옵션을 지원하기 위한 인터페이스이다. Tool InterfaceConfiguration Class를 상속받으며, 내부적으로 run 메소드가 정의되어 있다. 따라서, Tool Interface를 상속받으면 run 메소드를 재정의하여 사용해야 한다.

ToolRunner

ToolRunnerTool Interface의 실행을 도와주는 헬퍼 클래스이다. ToolRunnerGenericOptionsParser를 사용해 사용자가 콘솔 명령어에서 설정한 옵션을 분석하고, Configuration 객체에 설정한다. Configuration 갹채룰 Tool Interface에 전달한 후, Tool Interfacerun 메소드를 실행한다.

MapReduce JAVA 코드에 적용

public class DelayMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private String delay;
    private final static IntWritable outputValue = new IntWritable(1);
    private Text outputKey = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        delay = context.getConfiguration().get("workType");
    }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

        if(delay.equals("arrival")) {

            outputKey.set(parser.getYear() + "," + parser.getMonth());
            if(parser.getArriveDelayTime() > 0)
                context.write(outputKey, outputValue);
        }
        else if(delay.equals("departure")) {

            outputKey.set(parser.getYear() + "," + parser.getMonth()); // 출력키(운항연도,운항월) 설정
            if(parser.getDepartureDelayTime() > 0) // 출발 지연이 발생한 경우, context.write 호출
                context.write(outputKey, outputValue);
        }
        else {
            System.err.println("Select 'arrival' or 'departure'");
            System.exit(2);
        }
    }
}

위 코드는 Mapper이다. setup 함수를 Override 했다. setup 함수는 Mapper Class에 정의되어 있으므로 이를 재정의한 것이다. 또한 그 내부에서 context.getConfiguration().get() 하여 사용자가 하둡 콘솔 명령어에서 입력한 파라미터를 받아들인다.

public class DelayMain extends Configured implements Tool {

    public static void main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new DelayMain(), args); // run 메소드 호출
        System.out.println("MR-Job Result : " + res);
    }

    @Override
    public int run(String[] args) throws Exception {

        String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

        if(otherArgs.length != 2) {
            System.err.println("Usage : DelayMain <in> <out>");
            System.exit(2);
        }

        Job job = new Job(getConf(), "DelayMain");

        job.setJarByClass(DelayMain.class);
        job.setMapperClass(DelayMapper.class);
        job.setReducerClass(DelayReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        boolean success = job.waitForCompletion(true);
        System.out.println(success);
        return 0;
    }
}

위 코드는 Driver Program이다. 메인 함수에서 ToolRunner 클래스를 호출했다. 또한, Main Class가 Tool Interface를 상속받았으므로 run 메소드를 재정의한 모습이다.

위 코드에서 GenericOptionsParser에서 getRemainingArgs 메소드를 호출해 문자열 배열을 생성한다. 여기서 반환하게 되는 문자열 배열은 GenericOptionsParser에서 제공하는 파라미터를 제외한 나머지 파라미터를 말한다. 예를 들어, ./bin/hadoop jar A.jar MainClass -D workType=hi input output 이라고 명령어를 입력했을 때, getRemainingArgs 메소드에서 반환하는 문자열은 input output이 된다.

그래서 입출력 데이터 경로를 설정할 때 문자열 배열 값을 이용하여 설정하게 되는 것이다.

2. 카운터 사용

하둡은 맵리듀스 Job의 진행 상황을 모니터링할 수 있도록 Counter라는 API를 제공하며, 모든 job은 다수의 내장 카운터를 가지고 있다. 내장 카운터는 Map, Reduce, Combiner의 입출력 레코드 건수 및 바이트를 확인할 수 있으며, Map과 Reduce의 Task의 개수 및 실패 여부, 파일 시스템에서 얼마나 많은 바이트를 읽고 썼는가에 대한 정보를 제공한다.

맵리듀스 프레임워크는 개발자가 직접 카운터를 정의해서 사용할 수 있는 API를 제공한다. 또한, 카운터의 숫자를 직접 증감시킬 수도 있다. 따라서, 맵과 리듀스의 동작 logic을 확인하는 데 큰 도움을 줄 수 있을 것이다. 그렇다면, 사용자 정의 카운터를 구현해 보도록 한다.

우선, 사용자 정의 카운터는 Java enum 클래스를 이용해 구현한다.

public enum DelayCounters {
    not_available_arrival, scheduled_arrival, early_arrival, not_available_departure, scheduled_departure, early_departure;
}

위 코드에서 볼 수 있는 6개의 Counter를 가지고 있다. 항공 출발 지연 데이터를 분석할 때 3개, 항공 도착 지연 데이터를 분석할 때 3개의 Counter를 사용하게 될 것이다. 위 코드의 경우 자체적인 클래스로 선언하였으나 카운터가 필요한 클래스에서 내부 변수로 선언해도 된다.

우선, 카운터들의 의미에 대해 알아본 후, Mapper 코드를 보도록 한다.

  • not_available_departure : 출발 지연 시간이 NA인 경우
  • scheduled_departure : 예정된 시각에 맞춰 출발한 경우
  • early_departure : 예정된 시각보다 일찍 출발한 경우
  • not_available_arrival : 도착 지연 시간이 NA인 경우
  • scheduled_arrival : 예정 시각에 맞춰 도착한 경우
  • early_arrival : 예정 시각보다 빨리 도착한 경우
public class DelayMapperWithCounter extends Mapper<LongWritable, Text, Text, IntWritable> {

    private String delay;
    private final static IntWritable outputValue = new IntWritable(1);
    private Text outputKey = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        delay = context.getConfiguration().get("workType");
    }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

        if(delay.equals("departure")) {
            if(parser.isDepartureDelayAvailable()) {
                if(parser.getDepartureDelayTime() > 0) {
                    outputKey.set(parser.getYear() + "," + parser.getMonth()); // 출력키(운항연도,운항월) 설정
                    context.write(outputKey, outputValue);
                }
                else if(parser.getDepartureDelayTime() == 0)
                    context.getCounter(DelayCounters.scheduled_departure).increment(1);
                else if(parser.getDepartureDelayTime() < 0)
                    context.getCounter(DelayCounters.early_departure).increment(1);
            }
            else
                context.getCounter(DelayCounters.not_available_departure).increment(1);
        }
        else if(delay.equals("arrival")) {
            if(parser.isArriveDelayAvailable()) {
                if(parser.getArriveDelayTime() > 0) {
                    outputKey.set(parser.getYear() + "," + parser.getMonth()); // 출력키(운항연도,운항월) 설정
                    context.write(outputKey, outputValue);
                }
                else if(parser.getArriveDelayTime() == 0)
                    context.getCounter(DelayCounters.scheduled_arrival).increment(1);
                else if(parser.getArriveDelayTime() < 0)
                    context.getCounter(DelayCounters.early_arrival).increment(1);
            }
            else
                context.getCounter(DelayCounters.not_available_arrival).increment(1);
        }

        else {
            System.err.println("Select 'arrival' or 'departure'");
            System.exit(2);
        }
    }
}

위 코드는 Counter를 사용하는 Mapper Class의 코드이다. 우선 사용자에게 새로운 값을 입력받게 된다. GenericOptionsParser를 이용해 입력받은 후, 출발 지연 데이터를 분석할 것인지 도착 지연 데이터를 분석할 것인지 결정하게 된다.

그 다음 분석하려는 데이터가 NA인지 아닌지를 확인한 후, 출발 / 도착이 얼마나 정시각과 차이가 나는지를 알기 위해 AirlinePerformanceParser에 선언했던 DelayTime 함수를 호출해 0을 기준으로 비교한다. 0보다 크다면 항공기가 지연된 것이고, 0과 같다면 정시각, 0보다 작다면 일찍 출발 혹은 도착한 것이다.

0보다 큰 경우는 출력 데이터로 나오게 될 것이므로 0보다 큰 경우는 바로 context.write를 진행한다. 0이거나 0보다 작은 경우에 대한 것도 알기 위해 Counter를 사용한 것이므로, 이 경우 getCounter 메소드를 호출하고, 원하는 인자를 파라미터로 넘긴 뒤 increment 메소드를 사용해 1씩 증가시킨다.

결과적으로, 항공기 지연 데이터 뿐만 아니라 일찍 출발 혹은 도착한 경우와 정각에 출발 혹은 도착한 경우 모두를 알 수 있게 됐다.

3. 다수의 파일 출력

위 경우, 우리는 GenericOptionsParser을 사용해 사용자로부터 입력을 받아 항공 출발 지연 데이터 혹은 항공 도착 지연 데이터를 분석했다. 다만, 문제가 있었다. 바로 이 데이터를 병렬로 처리하지 못한다는 것이다. 실습용 데이터는 압축을 모두 해제했을 때 약 16GB정도로 다른 데이터의 분석까지 필요한 경우 재실행하면 문제가 크게 없었으나 그 데이터 크기의 단위가 테라바이트 이상으로 넘어가게 된다면 상당히 비효율적인 방식일 것이다. 이러한 문제점을 해결하기 위해 MultipleOutputs를 사용하도록 한다.

org.apache.hadoop.mapreduce.lib.output.MultipleOutputs는 여러 개의 출력 데이터를 생성하도록 도와주는 기능을 제공한다. MultipleOutputs는 여러 개의 OutputCollectors를 만들고 각 OutputCollectors에 대한 출력 경로, 출력 포맷, 키와 값 유형을 설정한다. 이러한 파라미터 설정은 MultipleOutputs에서 제공하는 addNameedOutput 메소드 를 호출해 설정할 수 있다.

MultipleOutputs에서 생성되는 출력 데이터는 기존의 맵리듀스 job에서 생성하는 데이터와 별개로 생성되게 된다. 기존 맵리듀스에서는 part-r-xxxxx 라는 데이터 처리 결과가 출력되었다. 그런데 Reduce 단계에서 MultipleOutputs를 이용해 output이라는 디렉토리에 데이터를 생성하게 된다고 하면, part-r-xxxxxoutput-r-xxxxx이 동시에 생성되게 된다. 이러한 과정은 코드를 실행한 후 다시 확인해 보도록 한다.

아래 코드는 Mapper이다.

public class DelayCountMapperWithMultipleOutputs extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable outputValue = new IntWritable(1);
    private Text outputKey = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

        if(parser.isDepartureDelayAvailable()) {
            if(parser.getDepartureDelayTime() > 0) {
                outputKey.set("D," + parser.getYear() + "," + parser.getMonth()); // 출력키(운항연도,운항월) 설정 형태 = D,1987,3
                context.write(outputKey, outputValue);
            }
            else if(parser.getDepartureDelayTime() == 0)
                context.getCounter(DelayCounters.scheduled_departure).increment(1);
            else if(parser.getDepartureDelayTime() < 0)
                context.getCounter(DelayCounters.early_departure).increment(1);
        }
        else
            context.getCounter(DelayCounters.not_available_departure).increment(1);

        if(parser.isArriveDelayAvailable()) {
            if(parser.getArriveDelayTime() > 0) {
                outputKey.set("A," + parser.getYear() + "," + parser.getMonth()); // 출력키(운항연도,운항월) 설정
                context.write(outputKey, outputValue);
            }
            else if(parser.getArriveDelayTime() == 0)
                context.getCounter(DelayCounters.scheduled_arrival).increment(1);
            else if(parser.getArriveDelayTime() < 0)
                context.getCounter(DelayCounters.early_arrival).increment(1);
        }
        else
            context.getCounter(DelayCounters.not_available_arrival).increment(1);
    }
}

위 코드에서 Counter를 실습할 때 사용했던 GenericOptionsParser는 제거하고, 콤마(,) 구분자를 사용하여 키 값 맨 앞에 출발 지연인지, 도착 지연인지를 표시하는 알파벳을 삽입했다. 따라서, 위 코드가 Reducer에게 전달될 때 outputKey의 값은 D or A,1988,3과 같은 형식으로 전달되게 될 것이다.

아래 코드는 Reducer 이다.

public class DelayCountReducerWithMultipleOutputs extends Reducer<Text, IntWritable, Text, IntWritable> {
    // 항공 출발 지연 데이터와 도착 지연 데이터를 구분하여 aggregation 해야 한다.

    private MultipleOutputs<Text, IntWritable> mos;

    private Text outputKey = new Text(); // reduce 출력키
    private IntWritable result = new IntWritable(); // reduce 출력 값

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        String[] colums = key.toString().split(","); // , 구분자 관리

        outputKey.set(colums[1] + "," + colums[2]);

        if(colums[0].equals("D")) { // 출발 지연 데이터인 경우
            int sum = 0;
            for(IntWritable value : values)
                sum += value.get();
            result.set(sum);

            mos.write("departure", outputKey, result);
        }
        else { // 도착 지연 데이터인 경우
            int sum = 0;
            for(IntWritable value : values)
                sum += value.get();
            result.set(sum);

            mos.write("arrival", outputKey, result);
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        mos.close();
    }
}

Reducer에서는 reducer가 초기화 되는 단계인 setup 메소드와 reducer가 종료되는 단계인 cleanup 메소드Override했고, MultipleOutputs는 멤버 변수로 선언하고 setup 메소드에서 생성한 후 cleanup 메소드에서 종료했다.

위 코드에서 colums라는 문자열 배열은 Mapper의 결과물의 키값이다. 따라서, split 메소드로 콤마 구분자를 분리시킨 배열의 첫 번째 요소는 D 혹은 A이며, 2번째는 연도, 3번째는 월(Month) 이다. 따라서, 해당 값을 이용하여 Reducer에서 출발 지연 데이터와 도착 지연 데이터를 동시에 분석할 수 있도록 조건문을 사용하여 코드를 분석한 모습을 볼 수 있다.

또한, context.write()가 아니라 MultipleOutputs.write 했음을 알 수 있다. MulitpleOutputs를 사용하려면 context를 사용하지 않는다. 위 코드에서는 context.write를 하지 않았기 때문에 생성되는 결과 파일인 arrival-r-00000과 departure-r-00000에는 각각 데이터가 쓰여지지만 part-r-00000에는 쓰여지지 않게 된다. 참고로, MultipleOutputs.write()의 파라미터에는 출력 디렉토리명, 출력키, 출력값을 순서대로 파라미터로 넘기면 된다.

4. 체인

여태까지는 Map과 Reduce가 각각 한 개인 맵리듀스 프로그램을 작성했다. 하지만 상황에 따라, Mapper 혹은 Reducer을 2번 이상 적용해야 하는 경우도 있을 것이다. 체인은 이러한 경우에 사용하게 된다. 하둡에서는 ChainMapper, ChainReducer을 제공한다. 두 클래스는 체인(chain) 방식으로 Mapper, Reducer을 호출하게 된다. 가령, Mapper가 5번 적용되게 된다면 첫 번째 Mapper의 출력이 두 번째 Mapper의 입력으로 들어가게 된다. 마지막 Mapper의 결과가 최종적인 Mapper의 결과값이 될 것이다. 또한, Mapper들 사이에 Reducer을 삽입하여 실행하는 것도 가능하다.

profile
Android Developer
post-custom-banner

0개의 댓글