하둡 프로그래밍 - Secondary Sorting

Park Suyong·2021년 2월 4일
0

하둡

목록 보기
7/8

하둡에서 정렬은 굉장히 많이 다뤄지고, 알려진 MapReduce의 핵심 기능이다. 맵리듀스는 기본적으로 입력 데이터의 키를 기준으로 정렬되기 때문에, 하나의 Reduce Task만 실행되게 한다면 정렬을 쉽게 해결하는 것도 가능할 것이다. 다만, 여러 데이터노드가 구성되어 있는 상황에서 하나의 Reduce Task만 실행하는 것은 분산 환경의 장점을 살리지 않은 것이고 대량의 데이터를 처리하게 되는 경우 네트워크 부하도 상당히 클 것이다.

하둡은 개발자에게 정렬 기능을 제공한다. 보조 정렬(Secondary Sort), 부분 정렬(Partial Sort), 전체 정렬(Total Sort)이 바로 그것이다.

1. Secondary Sorting

이전에 항공 운항 지연 데이터를 분석한 결과를 봤을 때, 월의 순서가 정렬되어 처리되지 않았다.

1987,10 265658
1987,11 255127
1987,12 287408
1988,1  261810
1988,10 230876
1988,11 237343
1988,12 249340
1988,2  242219
1988,3  255083
1988,4  219288

이렇게 표시되는 이유는 출력 데이터의 키값 자체가 연도와 월이 합쳐진 하나의 문자열로 표시되기 때문이다. "1988,10"이 "1988,2" 보다 먼저 출력되는 것이 그 이유이다.

그렇다면 월의 순서대로 정렬하려면 어떻게 해야 할까? 이 때 보조 정렬(Secondary Sort)을 이용한다. 보조 정렬(Secondary Sort)은 키의 값들을 Grouping하고 Grouping된 record에 순서를 부여하는 방식이다. 보조 정렬(Secondary Sort)의 구현 순서는 다음과 같다.

  • 기존 키의 값들을 조합복합키(Composite Key)를 정의한다. 이 때 키의 값 중에서 어떤 키를 Grouping Key로 사용할지를 결정하게 된다.
  • 복합키(Composite Key)의 레코드를 정렬하기 위한 비교기(Comparator)을 정의한다.
  • Grouping Key를 Partitioning할 파티셔너(Partitioner)를 정의한다.
  • Grouping Key를 정렬하기 위한 비교기(Comparator)을 정의한다.

그렇다면 복합키(Composite Key)의 연도를 Group Key로 사용하여 MapReduce 프로그램을 작성하도록 한다.

복합키 (Composite Key)

복합키는 기존의 키값을 조합한 일종의 키 집합 클래스이다. 이전에 정렬되어 있지 않은 처리 결과에서 보았듯이, 복합키를 적용하기 전까지는 출력키를 단순히 하나의 문자열로 사용했다. 다만 복합키를 적용하여 연도와 월을 각각의 멤버 변수로 정의하도록 한다. 다음 코드는 복합키 코드이다.

public class DateKey implements WritableComparable<DateKey> {

    private String year;
    private Integer month;

    public DateKey() {
    }

    public DateKey(String year, Integer month) {
        this.year = year;
        this.month = month;
    }

    public String getYear() {
        return year;
    }

    public void setYear(String year) {
        this.year = year;
    }

    public Integer getMonth() {
        return month;
    }

    public void setMonth(Integer month) {
        this.month = month;
    }

    @Override
    public String toString() { // toString 메소드를 재정의하여 출력을 어떤 식으로 할지 결정한다. 정의하지 않는다면 자바의 toString이 호출되게 된다.
        return (new StringBuilder()).append(year).append(",").append(month).toString();
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException { // 입력 스트림에서 연도와 월을 조회함
        year = WritableUtils.readString(dataInput);
        month = dataInput.readInt();
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException { // 출력 스트림에 연도와 월을 출력함
        WritableUtils.writeString(dataOutput, year);
        dataOutput.writeInt(month);
    }

    @Override
    public int compareTo(DateKey key) { // 복합키와 복합키를 비교해 순서를 정할 때 사용한다.
        int result = year.compareTo(key.year);
        if(0 == result) // 연도가 동일한 경우
            result = month.compareTo(key.month); // 연도가 동일할 경우 월을 비교하도록 한다.
        return result;
    }

    // 스트림에서 데이터를 읽고, 출력하는 작업에는 하둡에서 제공하는 WritableUtils를 이용하도록 한다.
}

자세한 설명은 위 코드의 주석을 참고하도록 한다. 복합키를 구현했으므로 이제 복합키 비교기(Composite Key Comparator)을 구현하도록 한다.

복합키 비교기 (Composite Key Comparator)

public class DateKeyComparator extends WritableComparator {
    // 복합키 비교기는 복합키의 정렬 순서를 부여하기 위한 클래스이다.

    protected DateKeyComparator() {
        super(DateKey.class, true);
    }

    // compare 메소드는 이미 WritableComparator에 정의되어 있다. 하지만, 객체 스트림으로 비교하기 때문에 부정확하다.
    // 그러므로, compare 메소드를 재정의하도록 한다.
    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        // 2개의 WritableComparable 객체를 파라미터로 전달받아 DateKey(복합키) 타입으로 형변환한다.
        // 그래야만 DateKey에 선언한 멤버 변수를 조회할 수 있기 때문이다.
        // 복합키 클래스 형변환
        DateKey k1 = (DateKey) w1;
        DateKey k2 = (DateKey) w2;

        // 연도 비교
        // DateKey로 형변환하여 멤버 변수를 조회할 수 있게 됐으므로 연도값을 우선적으로 비교한다.
        int cmp = k1.getYear().compareTo(k2.getYear());
        if(cmp != 0)
            return cmp;
        // 두 값이 동일한 경우 0, k1이 클 경우 1, k1이 작을 경우 -1을 반환한다.
        // 연도가 일치할 경우, 월을 비교해야 한다. 따라서, 다음 줄로 넘어가게 된다. 연도가 일치하지 않는 경우 위 코드에서 정렬되게 된다.

        // 월 비교
        return k1.getMonth() == k2.getMonth() ? 0 : (k1.getMonth() < k2.getMonth() ? -1 : 1);
        // 월을 기준으로 오름차순으로 정렬되게 된다. 월이 같으면 0, k1의 월이 더 작다면 -1, k1의 월이 더 크다면 1을 반환한다.
    }
}

위 복합키 비교기는 복합키의 정렬 순서를 부여하기 위한 클래스이며, 2개의 복합키를 비교하게 되고 각 멤버 변수를 비교해 정렬 순서를 정하게 된다. WritableComparable Interface로 구현된 객체를 비교하려면 반드시 WritableComparator을 상속받아서 비교기를 구현해야 한다. 위 코드에서도 그런 모습을 볼 수 있다.

compare 메소드를 재정의하도록 한다. 이 메소드는 이미 WritableComparator에 정의되어 있으나, 객체 스트림에서 조회한 값을 비교하므로 정확하지 않다. 따라서, 멤버 변수를 비교하도록 compare 메소드를 재정의하도록 한다.

그룹키 파티셔너 구현 (Group Key Partitioner)

Partitioner는 Map Task의 출력 데이터를 Reduce Task의 입력 데이터로 보낼지를 결정하고, 이렇게 Partitioning된 데이터는 Map Task의 출력 데이터의 키의 값에 따라 정렬되게 된다. 다음 코드를 보도록 한다.

public class GroupKeyPartitioner extends Partitioner<DateKey, IntWritable> {
    // 여기서, Partitioner에 있는 2개의 파라미터 값은 Mapper의 출력 데이터의 키와 값에 해당하는 파라미터이다.

    @Override
    public int getPartition(DateKey key, IntWritable val, int numPartitions) {
        // 파티셔너는 getPartition 메소드를 호출해 파티셔닝 번호를 조회하게 된다.
        // 연도가 키값이 되므로 연도에 대한 해시값을 조회해 파티션 번호를 생성하도록 한다.
        
        int hash = key.getYear().hashCode();
        int partition = hash % numPartitions;
        return partition;
    }
}

자세한 설명은 위 코드 주석을 참고하도록 한다.

그룹키 비교기 구현 (Group Key Comparator)

Reducer는 그룹키 비교기(Group Key Comparator)를 사용하여 같은 연도에 해당하는 모든 데이터를 하나의 Reducer 그룹에서 처리할 수 있다. 위에서 복합키 비교기(Composite Key Comparator)을 구현한 것처럼 WritableComparator을 상속받아 클래스를 선언하도록 한다.

public class GroupKeyComparator extends WritableComparator {
    // 리듀서는 그룹키 비교기를 통해 같은 연도에 해당하는 모든 데이터를 하나의 Reducer 그룹에서 처리할 수 있다.

    protected GroupKeyComparator() {
        super(DateKey.class, true);
    }

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
   	// grouping key 값인 연도를 비교하는 처리가 필요하다. 따라서, compare 메소드를 재정의하여 2개의 복합키의 연도값을 비교하는 코드를 작성한다.
        DateKey k1 = (DateKey) w1;
        DateKey k2 = (DateKey) w2;

        // 연도값 비교
        return k1.getYear().compareTo(k2.getYear());
    }
}

또, 그룹핑 키값인 연도를 비교하는 처리가 필요할 것이다. 따라서, compare 메소드에 2개의 복합키의 연도값을 비교하도록 재정의한다.

Mapper 구현

public class DelayCountMapperWithDateKey extends Mapper<LongWritable, Text, DateKey, IntWritable> {

    private final static IntWritable outputValue = new IntWritable(1); // map 출력값
    private DateKey outputKey = new DateKey(); // map 출력키

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

        if(parser.isDepartureDelayAvailable()) {
            if(parser.getDepartureDelayTime() > 0) {
                outputKey.setYear("D," + parser.getYear());
                outputKey.setMonth(parser.getMonth());
                // 출력키(운항연도,운항월) 설정 형태 = D,1987,3

                context.write(outputKey, outputValue);
                // 출력 데이터 생성
            }
            else if(parser.getDepartureDelayTime() == 0)
                context.getCounter(DelayCounters.scheduled_departure).increment(1);
            else if(parser.getDepartureDelayTime() < 0)
                context.getCounter(DelayCounters.early_departure).increment(1);
        }
        else
            context.getCounter(DelayCounters.not_available_departure).increment(1);

        if(parser.isArriveDelayAvailable()) {
            if(parser.getArriveDelayTime() > 0) {
                outputKey.setYear("A," + parser.getYear());
                outputKey.setMonth(parser.getMonth());
                // 출력키(운항연도,운항월) 설정 형태 = A,1987,3

                context.write(outputKey, outputValue);
                // 출력 데이터 생성
            }
            else if(parser.getArriveDelayTime() == 0)
                context.getCounter(DelayCounters.scheduled_arrival).increment(1);
            else if(parser.getArriveDelayTime() < 0)
                context.getCounter(DelayCounters.early_arrival).increment(1);
        }
        else
            context.getCounter(DelayCounters.not_available_arrival).increment(1);
    }
}

주석 처리 되어 있는 부분의 코드를 참고하면 된다.

Reducer 구현

public class DelayCountReducerWithDateKey extends Reducer<DateKey, IntWritable, DateKey, IntWritable> {

    private MultipleOutputs<DateKey, IntWritable> mos;

    private DateKey outputKey = new DateKey(); // reduce 출력키
    private IntWritable result = new IntWritable(); // reduce 출력 값

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        mos = new MultipleOutputs<DateKey, IntWritable>(context);
    }

    public void reduce(DateKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    	// 복합키를 입력데이터의 키와 출력 데이터의 키로 사용해야 하므로 위처럼 파라미터값을 설정한다.

        String[] colums = key.getYear().split(","); // , 구분자 분리
        int sum = 0;
        Integer bMonth = key.getMonth();

        if(colums[0].equals("D")) { // 출발 지연 데이터인 경우
            for(IntWritable value : values) {
                if (bMonth != key.getMonth()) {
                    // 이렇게 bMonth 변수를 생성해서 월 값을 백업해 두지 않게 되는 경우 12월만 출력되게 되고, 지연 횟수는 해당 연도의 전체 지연 횟수가 출력된다.
                    // 따라서, 월값을 bMonth 변수에 백업해 두어 월별 지연 횟수가 출력될 수 있게끔 코드를 수정한다.
                    result.set(sum);
                    outputKey.setYear(key.getYear().substring(2));
                    outputKey.setMonth(bMonth);
                    mos.write("departure", outputKey, result);
                    sum = 0;
                }
                sum += value.get();
                bMonth = key.getMonth();
            }
            if(key.getMonth() == bMonth) {
                outputKey.setYear(key.getYear().substring(2));
                outputKey.setMonth(key.getMonth());
                result.set(sum);
                mos.write("departure", outputKey, result);
            }
        }
        else { // 도착 지연 데이터인 경우
            for(IntWritable value : values) {
                if(bMonth != key.getMonth()) {
                    result.set(sum);
                    outputKey.setYear(key.getYear().substring(2));
                    outputKey.setMonth(bMonth);
                    mos.write("arrival", outputKey, result);
                    sum = 0;
                }
                sum += value.get();
                bMonth = key.getMonth();
            }
            if(key.getMonth() == bMonth) {
                // 그 다음 Iterable 객체의 순회가 종료되고 나면 최종적으로 월의 지연 횟수를 출력하도록 한다. 이렇게 해야 12월까지 출력된다.
                outputKey.setYear(key.getYear().substring(2));
                outputKey.setMonth(key.getMonth());
                result.set(sum);
                mos.write("arrival", outputKey, result);
            }
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        mos.close();
    }
}

이러한 Reducer에는 그룹핑 파티셔너(Grouping Partitioner)그룹핑 비교기(Grouping Comparator)에 의해 같은 연도로 Grouping되어 있는 데이터가 전달된 상태이다. 또한, 그룹핑된 값은 월의 순서대로 오름차순으로 정렬이 이미 되어 있다. 복합키 비교기(Composite Key Comparator)에서 구현했기 때문이다.

그런데, 이대로 reduce 메소드에서 지연 횟수를 aggregation하게 될 경우 데이터에 오류가 발생하게 된다. 예를 들어, 2008년의 항공 출발 지연 데이터를 처리하게 되는 경우, 2008년 12월만 출력되며 지연 횟수는 2008년의 모든 지연이 합산되어 다음과 같이 표현되게 된다.
2008122647363

이러한 현상의 원인은 Reducer가 2008년이라는 그룹 키를 기준 삼아 연산을 수행하게 되기 때문이다. 따라서, 월별로 지연 횟수를 계산하도록 하기 위해 복합키를 구분해서 처리하도록 코드를 작성해야 한다.

입력 데이터의 값에 해당하는 Iterable<IntWritable> values를 반복하게 되는 동안, 월 값에 해당하는 값을 bMonth라는 변수에 저장해 두도록 한다. 임시적으로 저장해 두는 것이다. 이 동안 bMonth의 값과 현재 데이터의 월 값이 일치하지 않는 경우 Reducer의 출력 데이터에 임시 저장되어 있는 월의 지연 횟수를 출력하게 된다.

최종적으로 Iterable 객체의 순회가 종료되면 복합키의 월 값과 임시 저장한 bMonth의 값이 동일할 것이다. 최종적으로 한번 더 출력해 줘야 12월까지 정상적으로 출력되게 될 것이다.

Driver 구현

public class DelayCountWithDateKey  extends Configured implements Tool {

    public static void main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new DelayCountWithDateKey(), args); // run 메소드 호출
        System.out.println("MR-Job Result : " + res);
    }

    @Override
    public int run(String[] args) throws Exception {

        String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

        if(otherArgs.length != 2) {
            System.err.println("Usage : DelayCountWithDateKey <in> <out>");
            System.exit(2);
        }

        Job job = new Job(getConf(), "DelayCountWithDateKey");

        job.setJarByClass(DelayCountWithDateKey.class);
        job.setPartitionerClass(GroupKeyPartitioner.class);
        job.setGroupingComparatorClass(GroupKeyComparator.class);
        job.setSortComparatorClass(DateKeyComparator.class);

        job.setMapperClass(DelayCountMapperWithDateKey.class);
        job.setReducerClass(DelayCountReducerWithDateKey.class);

        job.setMapOutputKeyClass(DateKey.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(DateKey.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, DateKey.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, DateKey.class, IntWritable.class);
        // departure, arrival이라는 출력 경로를 생성한 것이다.
        // 출력 경로, 출력 포맷, 출력 키, 출력 값 타입을 순차적으로 파라미터를 작성한다.

        boolean success = job.waitForCompletion(true);
        System.out.println(success);
        return 0;
    }
}

위 코드에서는 주석을 참고하도록 한다.

출력 결과

21/02/04 15:58:57 INFO mapred.JobClient:   Map-Reduce Framework
21/02/04 15:58:57 INFO mapred.JobClient:     Spilled Records=352757924
21/02/04 15:58:57 INFO mapred.JobClient:     Map output materialized bytes=1590924127
21/02/04 15:58:57 INFO mapred.JobClient:     Reduce input records=104279595
21/02/04 15:58:57 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=334751891456
21/02/04 15:58:57 INFO mapred.JobClient:     Map input records=118914458
21/02/04 15:58:57 INFO mapred.JobClient:     SPLIT_RAW_BYTES=19980
21/02/04 15:58:57 INFO mapred.JobClient:     Map output bytes=1382363857
21/02/04 15:58:57 INFO mapred.JobClient:     Reduce shuffle bytes=1590924127
21/02/04 15:58:57 INFO mapred.JobClient:     Physical memory (bytes) snapshot=46951034880
21/02/04 15:58:57 INFO mapred.JobClient:     Reduce input groups=494
21/02/04 15:58:57 INFO mapred.JobClient:     Combine output records=0
21/02/04 15:58:57 INFO mapred.JobClient:     Reduce output records=0
21/02/04 15:58:57 INFO mapred.JobClient:     Map output records=104279595
21/02/04 15:58:57 INFO mapred.JobClient:     Combine input records=0
21/02/04 15:58:57 INFO mapred.JobClient:     CPU time spent (ms)=725840
21/02/04 15:58:57 INFO mapred.JobClient:     Total committed heap usage (bytes)=37039374336
21/02/04 15:58:57 INFO mapred.JobClient:   DelayCounters
21/02/04 15:58:57 INFO mapred.JobClient:     scheduled_arrival=5091690
21/02/04 15:58:57 INFO mapred.JobClient:     early_arrival=55351148
21/02/04 15:58:57 INFO mapred.JobClient:     not_available_departure=2230332
21/02/04 15:58:57 INFO mapred.JobClient:     early_departure=42305219
21/02/04 15:58:57 INFO mapred.JobClient:     scheduled_departure=26068006
21/02/04 15:58:57 INFO mapred.JobClient:     not_available_arrival=2502926
21/02/04 15:58:57 INFO mapred.JobClient:   File Input Format Counters
21/02/04 15:58:57 INFO mapred.JobClient:     Bytes Read=11574493516
21/02/04 15:58:57 INFO mapred.JobClient:   FileSystemCounters
21/02/04 15:58:57 INFO mapred.JobClient:     HDFS_BYTES_READ=11574513496
21/02/04 15:58:57 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=5388016898
21/02/04 15:58:57 INFO mapred.JobClient:     FILE_BYTES_READ=3786797898
21/02/04 15:58:57 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=7041
21/02/04 15:58:57 INFO mapred.JobClient:   Job Counters
21/02/04 15:58:57 INFO mapred.JobClient:     Launched map tasks=180
21/02/04 15:58:57 INFO mapred.JobClient:     Launched reduce tasks=1
21/02/04 15:58:57 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=297557
21/02/04 15:58:57 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
21/02/04 15:58:57 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=513978
21/02/04 15:58:57 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
21/02/04 15:58:57 INFO mapred.JobClient:     Data-local map tasks=180
21/02/04 15:58:57 INFO mapred.JobClient:   File Output Format Counters
21/02/04 15:58:57 INFO mapred.JobClient:     Bytes Written=0

위 실행 결과는 이전에 실습했던 Multiple Outputs를 사용하는 맵리듀스 프로그램이다. 위 결과에서 Reducer의 입력 데이터 그룹은 494개 이다. 밑의 로그 메시지는 이번에 실습한 Secondary Sort를 사용하는 맵리듀스 프로그램이다.

21/02/04 18:15:10 INFO mapred.JobClient:   Map-Reduce Framework
21/02/04 18:15:10 INFO mapred.JobClient:     Spilled Records=371814036
21/02/04 18:15:10 INFO mapred.JobClient:     Map output materialized bytes=2085592980
21/02/04 18:15:10 INFO mapred.JobClient:     Reduce input records=104279595
21/02/04 18:15:10 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=334754734080
21/02/04 18:15:10 INFO mapred.JobClient:     Map input records=118914458
21/02/04 18:15:10 INFO mapred.JobClient:     SPLIT_RAW_BYTES=19980
21/02/04 18:15:10 INFO mapred.JobClient:     Map output bytes=1877032710
21/02/04 18:15:10 INFO mapred.JobClient:     Reduce shuffle bytes=2085592980
21/02/04 18:15:10 INFO mapred.JobClient:     Physical memory (bytes) snapshot=46553485312
21/02/04 18:15:10 INFO mapred.JobClient:     Reduce input groups=44
21/02/04 18:15:10 INFO mapred.JobClient:     Combine output records=0
21/02/04 18:15:10 INFO mapred.JobClient:     Reduce output records=0
21/02/04 18:15:10 INFO mapred.JobClient:     Map output records=104279595
21/02/04 18:15:10 INFO mapred.JobClient:     Combine input records=0
21/02/04 18:15:10 INFO mapred.JobClient:     CPU time spent (ms)=1175800
21/02/04 18:15:10 INFO mapred.JobClient:     Total committed heap usage (bytes)=37181980672
21/02/04 18:15:10 INFO mapred.JobClient:   DelayCounters
21/02/04 18:15:10 INFO mapred.JobClient:     scheduled_arrival=5091690
21/02/04 18:15:10 INFO mapred.JobClient:     early_arrival=55351148
21/02/04 18:15:10 INFO mapred.JobClient:     not_available_departure=2230332
21/02/04 18:15:10 INFO mapred.JobClient:     early_departure=42305219
21/02/04 18:15:10 INFO mapred.JobClient:     scheduled_departure=26068006
21/02/04 18:15:10 INFO mapred.JobClient:     not_available_arrival=2502926
21/02/04 18:15:10 INFO mapred.JobClient:   File Input Format Counters
21/02/04 18:15:10 INFO mapred.JobClient:     Bytes Read=11574493516
21/02/04 18:15:10 INFO mapred.JobClient:   FileSystemCounters
21/02/04 18:15:10 INFO mapred.JobClient:     HDFS_BYTES_READ=11574513496
21/02/04 18:15:10 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=7446748803
21/02/04 18:15:10 INFO mapred.JobClient:     FILE_BYTES_READ=5350691892
21/02/04 18:15:10 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=7041
21/02/04 18:15:10 INFO mapred.JobClient:   Job Counters
21/02/04 18:15:10 INFO mapred.JobClient:     Launched map tasks=180
21/02/04 18:15:10 INFO mapred.JobClient:     Launched reduce tasks=1
21/02/04 18:15:10 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=418341
21/02/04 18:15:10 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
21/02/04 18:15:10 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=684953
21/02/04 18:15:10 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
21/02/04 18:15:10 INFO mapred.JobClient:     Data-local map tasks=180
21/02/04 18:15:10 INFO mapred.JobClient:   File Output Format Counters
21/02/04 18:15:10 INFO mapred.JobClient:     Bytes Written=0

여기서 똑같이 Reducer의 입력 데이터 그룹을 보면 44개임을 알 수 있다. 정렬을 적용하기 전 맵리듀스 프로그램에서는 "2008, 1", "2008, 2"와 같은 연도와 월의 조합이 하나의 키였으나 보조 정렬에서는 연도만 그룹이 되었기 때문이다. 따라서, 1987년부터 2008년까지 총 22개월이고 출발 지연 및 도착 지연 데이터를 동시 분석하므로 44개의 그룹이 된 것이다.

profile
Android Developer

0개의 댓글