맵리듀스 기능

Q·2022년 11월 29일

Hadoop 완벽 가이드

목록 보기
8/13

8장 맵리듀스 기능

1. 카운터

  • 잡에 대한 통계를 수집하는 유용한 채널로서 품질 통계, 또는 응용프로그램 수준 통계를 제공.
  • 문제 진단에 유용

내장 카운터

  • 카운터는 연관있는 태스크에 의해서 관리되며, 주기적으로 태스크트래커에 보내지고 잡트래커에도 보내진다.
  • 전역적으로 수집
  • 내장된 잡 카운터는 실제로 잡트래커가 관리하기 때문에 사용자 정의 카운터를 포함한 다른 카운터와는 달리 네트워크 상으로 전달될 필요가 없다.
  • 보낼 때마다 누적된 전체 수치를 보낸다. (메시지 유실로 인한 오류 방지)
  • 잡 실행 중 테스크가 실패하면 카운터는 중단.
  • 잡이 성공하면 카운터 값은 최종값.

사용자 자바 카운터

  • 맵리듀스는 코드 수준에서 카운터의 집합을 정의하게 해주며 매퍼와 리듀서에서 원하는 방식으로 증가하게 할 수 있다.

  • 카운터는 연관된 카운터를 묶어 주는 자바 enum에 의해 정의.

  • 잡은 임의의 숫자의 enum을 정의할 수 있고 각 enum은 임의의 필드 개수를 갖는다.

  • enum의 이름이 그룹명이며 enum의 필드는 카운터명.

  • 카운터는 전역.

  • 맵리듀스 프레임워크는 잡이 끝나는 시점에 총계를 구하기 위해 모든 맵과 리듀스로부터 카운터를 수집.

  • 예제 코드 p.312~313 참조

    동적 카운터

    • 코드는 동적 카운터를 이용.(자바 enum에 의해 정의된 것이 아니다)
    • 자바 enum 필드는 컴파일 시점에 정의. (동적으로 새로운 카운터를 생성할 수 없다)
    • 실제로 받을 값을 내보낼 동적인 카운터를 사용하는 것이 더 편리
    • Reporter객체에서 사용하는 메소드는 그룹과 문자열 이름을 사용하는 카운터 이름을 인자로 받는다.
      public void incrCounter(String group, String counter, long amount)
    • 카운터를 생성하고 액세스하는 두 가지 방법(enum을 사용하고 문자열을 사용하는)은 실제로 같다.
      ( 하둡은 enum을 문자열로 변환해서 RPC로 카운터를 보내기 때문)
    • enum은 작업하기가 조금 더 수월, 타입 안정성도 제공. (대부분 잡에 적합)
    • 카운터를 동적으로 생성해야 하는 경우 String 인터페이스를 사용.

    읽기 가능한 카운터 이름

    • 기본적으로 카운터 이름은 enum의 완전한 자바 클래스명
    • 가독성이 좋지 않아 하둡은 리소스 번들을 이용해서 표시되는 이름을 변경하는 방법 제공.
    • 판독 가능한 이름들을 제고하는 비결은 enum의 이름을 따서 속성 파일을 생성.
      (하위 클래스들에 대해서는 구분자로 밑줄을 사용.)

    카운터 조회

    • 웹UI와 커맨드라인(job-counter)을 통하는 것 외에 자바 API를 이용해서 카운터값 조회 가능.
    • 잡이 끝나는 시점에 카운터를 가져오는 것이 일반적.
    • 디버깅을 위해 잡이 수행하는 도중에도 조회 가능.
    • RunningJob의 getCounters() 메소드를 호출해서 하나의 잡에 있는 모든 카운터를 담은 Counters 객체를 반환.
    • Counters 클래스는 카운터의 이름과 값을 찾을 수 있도록 다양한 메소드 제공.
    • 예제 코드 p.315~316 참고

    사용자 정의 스트리밍 카운터

    • 스티리밍 맵리듀스 프로그램은 특별하게 포멧된 라인을 표준에러 스트림에 보내는 식으로 카운터를 증가 시킬 수 있다.
    • 컨트롤 채널에 의해 선택.
    • 라인은 반드시 다음과 같은 유형이어야 한다.
      reporter: counter:group.counter.amount

2. 정렬

  • 데이터를 정렬하는 기능은 맵리듀스의 핵심.

준비

  • 기상 데이터셋을 기온을 기존으로 정렬할 것이다.
  • 기온을 텍스트 객체로 저장하면 제대로 정렬할 수 없다. (부호있는 정수는 사전 순서대로 정렬할 수 없기 때문)
  • 시퀀스파일을 이용하여 데이터를 저장.
  • 시퀀스파일에서 IntWritable 타입의 키는 기온을 나타내고 데이터 각 라인이 Text타입의 값이 된다.
  • p.318~319의 소스는 기상 데이터를 SequenceFile 형태로 변환해 주는 맵리듀스 프로그램.
  • 모든 음수를 제거하기 위해 오프셋을 추가한 다음, 왼쪽을 0으로 채우는 것.
  • 기온을 제대로 읽을 수 없는 레코드를 제거하기 위해 입력 데이터를 여과하는 맵-단독 잡.

부분정렬

  • p.319~320 소스는 IntWritable키를 가지고 시퀀스 파일을 정렬하는 일종의 변형된 맵리듀스 잡.

  • 기본 HashPartitioner를 사용하여 IntWritable 키를 가지고 SequenceFile 을 정렬하는 맵리듀스 프로그램.

  • 30개의 리듀서를 사용해서 이 프로그램을 실행할 경우 30개의 출력 파일을 만들어 내며 각 파일은 정렬되어 있다.

    응용프로그램: 파티션된 MapFile 검색

    • 키로 검색할 경우 다수 파일이 있어도 잘 동작.
    • 출력 포멧을 MapFileOutputFormat으로 변경하면 30개의 MapFile이 생기며, 이 파일을 대상으로 검색을 수행할 수 있다.
    • p.321 소스 참고(SequenceFile을 정렬하고 MapFile을 결과물로 생산하는 맵리듀스 프로그램)
    • MapFileOutputFormat은 맵리듀스 출력물에 대해 검색을 수행하는 한쌍의 편리한 정적 메소드를 제공. (p.322 소스 참고)
    Reader[] readers = MapFileOutputFormat.getReaders(fs, path, getConf());
    Writable entry = MapFileOutputFormat.getEntry(readers, partitioner, key, val);
    • getReaders() 메소드는 맵리듀스 잡에 의해 생성된 각각의 출력파일에 대한 MapFile.Reader를 오픈.
    • getEntry() 메소드는 키에 대한 Reader를 선택하기 위해 파티셔너를 사용하며 Reader의 get() 메소드를 호출해서 키에 대한 값을 찾는다.
    • getEntry() 메소드가 null을 반환하면 매칭되는 키가 아무것도 없다는 것.
    • 주어진 키로 모든 레코드를 가져오기 위해 Reader를 직접 사용할 수도 있다.
    • 파티션 순으로 된 Reader의 배열이 반환되기 때문에 해당 키에 대한 Reader의 맵 리듀스 잡에서 사용된 Partitioner와 같다.
    Reader reader = readers[partitioner.getPartition(key, val, readers.length)];
    • Reader가 있고, MapFile의 get() 메소드를 이용해서 첫 번째 키를 얻게 되고, 반복적으로 next() 메소드를 호출해서
      키가 바뀔 때까지 다음 키와 값을 가져올 수 있다. (p.323~324 참고)

전체 정렬

  • 전체적으로 정렬된 파일을 만드는 가장 간단한 해답은 단일 파티션을 사용하는 것.
  • 파일이 클때 서버 하나에서 모든 출력물을 처리하기 때문에 매우 비효율(병렬 아키텍처의 모든 장점을 포기)
  • 대신, 정렬된 파일의 집합을 만들어 내어 이 파일들을 연결, 전체적으로 정렬된 하나의 파일을 만들 수 있다.
  • 예를 들어 4개의 파티션이 있을때 키값을 분리하여 각각의 파티션으로 키를 할당 하는것.
    • 첫번째 파티션에는 < -10, 두번째는 -10 ~ 0, 세번째에는 0~10, 네번째에는 < 10
  • 전체 데이터셋에 대한 분포를 잘 이해하여 각 파티션의 크기를 균등하도록 해야 한다.
  • 균등한 파티션 집합을 만들기 위해 전체 데이터셋을 가지고 잡을 실행하는 것읍 이상적인 방법이 아니다.
  • 키를 샘플링하는 방법을 이용.
  • 하둡은 몇 개의 샘플링 예제를 제공.
    • InputSampler 클래스는 Sampler 인터페이스를 상속한 구현체를 정의하며 InputFormat과 JobConf를 인자로 키의 샘플을 반환.
  • p.326~327 소스 참고 (RandomSampler를 사용. 인자 : (균등확률을 갖는 키, 샘플 최대 갯수, 샘플링할 스플릿의 최대 갯수))
  • SplitSampler : 스플릿에서 처음 n개의 레코드를 샘플링
  • IntervalSampler : 스플릿 상에서 일정한 간격으로 키 수집.
  • RandomSampler : 다목적으로 활용

보조 정렬

  • 키를 원래 키와 원래 값의 조합으로 만든다.
  • 키 비교기는 원래 키와 값의 조합인 복합 키를 가지고 순서를 매겨야 한다.
  • 복합 키에 대한 파티셔너와 그룹핑 비교기는 원래 키만을 고려해서 파티셔닝과 그룹핑을 수행한다.
  • p.330~332 참고.

    스트리밍

    • 스트리밍에서 보조 정렬을 하기 위해 하둡이 제공하는 몇개의 라이브러리 클래스를 활용할 수 있다.
    • 보조 정렬에 사용할 수 있는 드라이버가 있다.
    • p.333 참고

3. 조인

  • 맵리듀는 큰 데이터셋에서 조인을 수행할 수 있지만 처음부터 조인한느 코드를 작성한느 것은 상당히 복잡.
  • 피그, 하이브, 케이스케이팅 사용을 고려. 이들은 조인 연산을 기본 기능으로 내장.
  • 맵-사이드 조인(p.336~337 참고), 리듀스-사이드 조인(p.338~342 참고).

4. 사이드 데이터 분배

  • 사이드 데이터는 주 데이터셋을 처리하기 위해 잡 과정에서 필요한 부가적인 읽기전용 데이터.
  • 분배 매커니즘과 별도로, 정적인 필드의 메모리에서 사이드 데이터를 캐시 할 수 있다.
  • 같은 잡이라면, 동일한 테스크트래커에서 연속적으로 실행되는 태스크끼리는 데이터를 공유할 수 있다.

잡 환경 설정 파일 사용

  • JobConf객체(Configuration 클래스로부터 상속)의 다양한 setter 메소드를 이용하여 임의의 키/값 쌍을 잡 환경 설정에 저장할 수 있다.
  • 작은 양의 메타데이터를 태스크에 전달할 때 이 방법은 매우 유용.
  • 태스크에서 값을 가져오기 위해 매퍼나 리듀서에서 configure() 메소드를 오버라이드해서 전달된 JobConf객체의 getter 메소드를 사용하면 된다.
  • 임의의 객체는 자체적으로 직렬화를 수행하거나, 또는 하둡의 Stringifier 클래스를 사용할 수 있다.
  • 수 KB이상의 데이터를 전송하는데 이 방법을 사용하면 안 된다.
  • 잡 환경 설정은 잡트래커, 태스크트래커, 자식 JVM에 의해 읽히며, 환경 설정이 읽힐 때마다 모든 엔트리는 사용 여부와 상관없이 메모리에 적재
  • 사용자 속성은 잡트래커나 태스크트래커에서 읽히지 않기 때문에 시간과 메모리만 낭비된다.

분산 캐시

  • 분산 캐시는 파일을 복사해서 태스크가 실행되어 파일을 사용할 시점에 파일을 분배시켜 주는 서비스.
  • 네트워크의 대역폭을 줄이기 위해 일반적으로 파일이 특정 노드에 잡 단위로 복사된다.

    사용법

    • GenericOptionsParser를 사용하는 도구는 분산시킬 파일을 -files 옵션을 이용해서 따옴표로 구분 지은 URI의 형태로 지정.
    • 파일은 로컬 파일시스템, HDFS, 하둡이 읽을 수 있는 다른 파일시스템(S3처럼)에 있을 수 있다.
    • 스킴이 없으면 파일은 로컬에 있는 것으로 간주.
    • -archives 옵션을 사용하여 아카이브 파일(JAR, ZIP, tar, gzipped tar 파일)을 태스크에 복사할 수도 있다.
      (아카이브 파일들은 태스크 노드에서 해제)
    • libjars 옵션은 JAR 파일을 매퍼와 리듀서 태스크의 클래스패스에 추가한다.
      (라이브러리 JAR 파일을 잡 JAR 파일과 함께 묶지 못했을 때 유용)
    • p.344~345 참고

    동작방식

    • 잡을 실행하면 하둡은 -files와 -archives 옵션으로 지정한 파일을 잡트래커의 파일시스템으로 복사.
    • 태스크가 실행되기 전에 태스크트래커는 잡트래커의 파일시스템으로부터 파일을 로컬 디스크(혹은 캐시)로 복사.
      (태스크의 관점에서 파일은 항상 로컬 어딘가에 있다.)
    • 태스크트래커는 캐시에서 각 파일을 사용하는 태스크 개수에 대해서 참조를 유지.
    • 태스크가 실행되기 전에는 파일 참조 개수가 하나씩 증가하고, 태스크 실행이 끝난 후에는 파일 참조 개수가 하나씩 줄어든다.
    • 참조 개수가 0이 되면 어떤 태스크도 파일을 사용하지 않고 있다는 의미이므로 해당 파일을 삭제할 수 있다.
    • 캐시가 일정 크기를 넘어서면(기본 10GB) 새로운 파일에 대한 공간 확보를 위해 파일을 삭제.
      (캐시 크기는 환경 설정 속성인 local.cache.size를 바이트 단위로 설정해서 변경 가능)

DistributedCache API

  • 대부분의 응용프로그램은 DistrubuedCache API를 사용할 필요가 없다.
    (GenericOptionsParser를 이용해서 간접적으로 분산 캐시를 사용할 수 있기 때문)
  • 모든 가용한 캐시 파일의 목록을 얻을 필요가 있을 경우, JobConf의 getLocalCacheFiles()와 getLocalCacheArchives() 메소드를 사용할 수 있다.
    (로컬 파일을 가리키는 경로 객체의 배열을 반환)

5. 맵리듀스 라이브러리 클래스

  • 하둡은 자주 사용하는 기능을 제공하는 매퍼와 리듀서를 라이브러리로 제공.

참조

profile
Data Engineer

0개의 댓글