[Yarn] Yarn 작업 흐름

Hyunjun Kim·2025년 8월 21일
0

Data_Engineering

목록 보기
133/153
post-thumbnail

3 Yarn 작업 흐름

3.1 Yarn high-level workflow

  1. 클라이이언트는 어플리케이션 실행을 요청한다. 어플리케이션은 Yarn API를 구현한 프로그램이면 된다.
    이때, ResourceManager는 실행 요청이 유효할 경우 클라리언트에 새로운 Application ID를 할당한다.

    • 보통 app이 java기반이니까 -jar와 함꼐 같이 보내던지, 네트워크 통해서 보내지 않고 이미 HDFS나 외부 스토리지에 있으면 HDFS 경로를 같이 보내서 실행할 프로그램 파일에 대한 바이너리가 여기있어 라고 알려주면서 요청을 보낸다.
  2. ResourceManager는 NodeManager에게 Application Master 실행을 요청한다.

  3. NodeManager는 ResourceManager의 요청을 받고, 컨테이너에서 ApplicationMaster를 실행한다.
    이때, 컨테이너는 새로운 JVM을 생성해 ApplicationMaster를 실행한다.

  4. ApplicationMaster는 ResourceManager에게 어플리케이션을 실행하기 위한 리소스를 요청한다.
    이때 고려하는 리소스는 필요한 호스트, 랙 정보, 메모리, CPU, Network 정보, 컨테이너 개수 등으로 구성된다. ResourceManager는 전체 클러스터의 리소스 상태를 확인한 후 ApplicationMaster에게 NodeManager 목록을 전달한다.

  5. ApplicationMaster는 할당받은 NodeManager들에게 컨테이너 실행을 요청한다.

  • NodeManager 목록 전달 이유
    • 컨테이너 실행 위치 결정
      • AM은 “이 Job의 컨테이너를 어느 노드에서 실행할지” 결정해야 함
      • RM이 현재 클러스터 상태를 고려해, 사용 가능한 NM 목록을 알려줌
      • AM은 이 목록 중에서 요구하는 리소스(CPU, Memory, Network 등)가 맞는 노드를 선택
    • 데이터 로컬리티(Data Locality) 최적화
      • HDFS나 분산 파일 시스템에서는 데이터가 특정 노드에 있음
      • AM이 NM 목록을 알고 있어야, 데이터가 있는 노드에 가까운 곳에 컨테이너를 실행하여 네트워크 비용 최소화 가능
    • 효율적인 리소스 요청
      • AM은 NM 목록을 기반으로, 필요한 컨테이너 수와 리소스를 구체적인 노드 단위로 요청
      • RM은 AM이 선택한 노드에 컨테이너를 바로 배치하도록 허용
  1. NodeManager들은 컨테이너에 새로운 JVM을 생성한 후, 해당 어플리케이션을 실행한다.
    어플리케이션이 종료되면 해당 ApplicationMaster가 종료된다. 마지막으로 ResourceManager는 종료된 ApplicationMaster에게 할당했던 리소스를 해제한다.


3.2 Application 실행 요청

여기 써져 있는 Yarn Client는 우리가 실제로 잡을 서밋하는 클라이언트라고 보면 됨.

  1. 클라이언트가 어플리케이션을 Yarn Cluster 에서 실행하려면 Yarn Cluster 에서 신규 Application ID를 발급받아야 한다.
    클라이언트는 ClientRMService 의 createNewApplication() (createApplication())을 호출해서 Application ID 발급을 요청한다.

  2. ClientRMService는 클라이언트의 요청에 새로운 Application ID와
    Yarn 클러스터에서 최대로 할당할 수 있는 리소스 정보가 설정되어있는 GetNewApplicationResponse 객체를 전달한다.

  3. 클라이언트는 Application ID가 정상적으로 반환됐는지 확인하고, ClientRMService 의 submitApplication() 메소드를 호출한다.
    이때 파라미터로 ApplicationSubmissionContext를 전달한다.
    ApplicationSubmissionContext는 ResourceManager가 해당 어플리케이션의 ApplicationMaster를 실행하기 위한 다음 정보를 포함하고 있다.

    • ApplicaitonID
    • Application Name
    • Queue name
      • Yarn Cluster의 Scheduler에 미리 설정되어있는 Queue만 사용할 수 있다.
    • Application Priority
    • Application 이 필요한 Resource
    • Application Master를 실행할 컨테이너 정보가 담긴 ContainerLaunchContext 객체
  4. 클라이언트는 ClientRMService 의 getApplicationReport로 ResourceManager에게 ApplicationReport 를 요청한다.

  5. ResourceManager는 어플리케이션이 정상적으로 등록됐을 경우, ApplicationReport를 반환한다.
    ApplicationReport는 Yarn 클러스터에서 실행되는 어플리케이션의 통계정보를 담고 있다. 다음 정보를 포함한다.

    • ApplicationId of the application.
    • Applications user.
    • Application queue.
    • Application name.
    • Host on which the ApplicationMaster is running.
    • RPC port of the ApplicationMaster.
    • Tracking URL.
    • YarnApplicationState of the application.
    • Diagnostic information in case of errors.
    • Start time of the application.
    • Client Token of the application (if security is enabled).


3.3 Application Master 실행 요청

  1. 어플리케이션 목록을 관리하는 RMAppManager는 ResourceManager의 내부 스케줄러에게 어플리케이션 등록 및 ApplicationMaster를 실행하기 위한 컨테이너를 요청한다.
  2. ApplicationAttemptId를 어플리케이션이 사용하는 큐에 등록한다.
    그리고 RMAppManager가 스케줄 등록 결과를 알 수 있게 RMAppAttemptEventType.ATTEMPT_ADDED 이벤트를 발생시킨다.
  3. RMAppManager는 스케줄러에게 ApplicationAttemptId에 대한 컨테이너 할당을 요청한다.
  4. 스케줄러는 ApplicationAttemptId에게 컨테이너를 할당한 뒤, RMAppManager가 ApplicationMaster를 실행할 수 있도록 RMContainerEventType.START를 발생시킨다.
  5. 스케줄러의 응답을 받은 RMAppManager는 ApplicationMaster를 실행할 수 있는 ApplicationMasterLauncher를 실행 한다.
  6. ApplicationMasterLauncher 는 AMLauncher를 실행해서 ApplicationMaster를 실행한다.
  7. AMLauncher는 컨테이너 정보를 설정한 뒤, NodeManager에게 ApplicationMaster 실행을 요청한다. 이때 파라미터로 ContainerLaunchContext를 전달한다.
    • Yarn 클러스터에서 실행되는 모든 어플리케이션은 컨테이너에서 실행된다.
    • ContainerLaunchContext 에는 NodeManager가 컨테이너를 실행하는 데 필요한 다음 정보가 있다.
      • ContainerId of the container.
      • Resource allocated to the container.(vcpu, memory 등등)
      • User to whom the container is allocated.
      • Security tokens (if security is enabled).
      • LocalResource necessary for running the container such as binaries, jar, shared-objects, side-files etc.(실행파일 같은 거. 자바 library 등)
      • Optional, application-specific binary service data.
      • Environment variables for the launched process.
      • Command to launch the container.
      • Retry strategy when container exits with failure.
  8. NodeManager는 AMLauncher가 요청한 컨테이를 실행한 뒤, 실행 결과가 저장되어있는 StartContainersResponse를 반환한다.


3.4 Application Master 등록

NodeManager가 ApplicationMaster를 정상적으로 실행했을 경우, 해당 ApplicationMaster가 ResourceManager에게 등록돼야 한다.
그래야 ResourceManager는 클러스터 내에서 실행하는 여러 ApplicationMaster에게 자원을 할당하고, 상태를 모니터링 할 수 있다.


3.4.1 ApplicationMasterProtocol

ApplicationMasterProtocol은 ResourceManager와 ApplicationMaster 사이에 필요한 인터페이스가 정의되어있다.
Yarn 은 ApplicationMasterProtocol을 구현한 기본 클라이언트인 AMRMClient, AMRMClientAsync 를 제공한다.
또한 기본 클라이언트 외에 직접 ApplicationMasterProtocol 인터페이스를 이용해서 구현할수도 있다.

  1. ApplicationMasterProtocol의 인터페이스들

  2. allocate(AllocateRequest request): AllocateResponse

  3. finishApplicationMaster(FinishApplicationMasterRequest request): FinishApplicationMasterResponse

  4. registerApplicationMaster(RegisterApplicationMasterRequest request): RegisterApplicationMasterResponse


3.4.2 Application Master 등록 과정

  1. ApplicationMaster 등록 요청
  2. ResourceManager에서 ApplicationMaster 등록 및 AllocateResponse 반환
  3. ApplicationMaster에서 리소스 할당 요청(allocate 호출)
  4. 스케줄러를 통한 컨테이너 할당 처리 및 응답 반환

1. ApplicationMaster 등록 요청

ApplicationMaster 에 있는 클라이언트는 registerApplicationMaster() 메소드를 호출해서 ApplicationMaster등록을 요청한다.
이때 전달되는 RegisterApplicationMasterRequest 안에는 다음 내용이 포함된다.

  • Hostname on which the AM is running.
  • RPC Port
  • Tracking URL (ApplicationMaster(AM) 또는 Job의 상태, 진행률, 로그 등을 볼 수 있는 웹 UI 주소)

2. ResourceManager에서 ApplicationMaster 등록 및 AllocateResponse 반환

ResourceManager의 ApplicationMaster목록을 관리하는 ApplicationMasterService는
자신이 관리하는 ApplicationMaster목록에 해당 ApplicationMaster를 추가한 뒤, AllocateResponse 객체를 반환한다.

AllocateResponse에는 다음과 같은 정보들이 있다.

  • Response ID to track duplicate responses. (고유 ID 붙여서 전달. 네트워크 지연이나 재전송으로 AM이 같은 AllocateResponse를 여러 번 받을 수 있는데, 중복 처리를 방지하기 위함)
  • An AMCommand sent by ResourceManager to let the ApplicationMaster take some actions (resync, shutdown etc.).
  • A list of newly allocated Container.
  • A list of completed Containers' statuses.
  • The available headroom for resources in the cluster for the application.
    • Headroom = 해당 Application이 추가로 사용할 수 있는 남은 리소스 용량
  • A list of nodes whose status has been updated.
  • The number of available nodes in a cluster.
  • A description of resources requested back by the cluster
  • AMRMToken
    • if AMRMToken has been rolled over(기존 토큰이 만료되었거나 갱신될 필요가 있으면 새 AMRMToken을 전달)
    • 장시간 실행되는 Application을 위해 만료된 토큰 대신 새 토큰을 발급
  • A list of Container representing the containers whose resource has been increased.
  • A list of Container representing the containers whose resource has been decreased.

3. ApplicationMaster에서 리소스 할당 요청(allocate 호출)

ApplicationMaster는 allocate() 메소드를 호출해서 어플리케이션을 실행하는데 필요한 리소스 할당을 요청한다.
이때 AllocateRequest 를 전달한다. AllocateRequest에는 다음과 같은 내용이 담겨있다.

  • A response id to track duplicate responses.
  • Progress information.
    • ApplicationMaster(AM)가 자신의 애플리케이션(Job) 진행 상황을 ResourceManager(RM)에 전달하는 정보
    • 보통 0.0 ~ 1.0 사이의 숫자로 표현 (0.0 → 시작 전, 0.5 → 절반 진행, 1.0 → 완료)
  • A list of ResourceRequest to inform the ResourceManager about the application's resource requirements.
  • A list of unused Container which are being returned.
  • A list of UpdateContainerRequest to inform the ResourceManager about the change in requirements of running containers.

allocate() 메소드는 ApplicationMaster의 클라이언트가 ResourceManager에게 자신의 상태를 알려주기 위한 heartbeat 용도로 사용된다.
ResourceManager는 AllocateRequest에 포함되어 있는 어플리케이션의 진행 상태나 heartbeat 전송 주기를 체크해서 ApplicationMaster의 상태를 알 수 있다.

이렇게 allocate() 메소드가 heartbeat 용도로 주기적으로 호출되기 때문에,
ApplicationMaster는 필요한 컨테이너를 한 번에 할당받지 못하더라도 최종적으로 필요한 모든 컨테이너를 할당받을 수 있다.

  • AMRMClient(Async) 이용시 interval=1s

4. 스케줄러를 통한 컨테이너 할당 처리 및 응답 반환

ApplicationMasterService는 컨테이너 할당 요청을 스케줄러에게 위임한다. 스케줄러는 해당 컨테이너가 가용한지를 알려준다.
ApplicationMasterService가 스케줄러의 응답 결과를 AllocateResponse에 설정한 뒤 ApplicationMaster에게 반환한다.

역할위치설명
ApplicationMasterServiceResourceManager 내부AM 요청을 받아 스케줄러에 위임하고 결과를 AM에게 전달
SchedulerResourceManager 내부클러스터 전체 리소스 기준으로 컨테이너 할당 가능 여부 결정


3.5 컨테이너 실행

ApplicationMaster는 자신이 할당받은 컨테이너에서 어플리케이션을 실행해야 한다. 이때 ApplicationMaster는 NodeManager와 상호작용한다.

  • 왜냐면 리소스 매니저한테는 이미 너 여기다가 할당 받으면 돼. 라는 업데이트 받았으니까.

  1. ApplicationMaster의 클라이언트는 NodeManager에게 컨테이너 실행을 요청한다. 이때 사용되는 StartContainersRequest는 다음 정보가 담겨있다.
    • 할당된 리소스, 보안 토큰(활성화된 경우), 컨테이너를 시작하기 위해 실행할 명령, 프로세스 환경, 필요한 바이너리/jar/shared-details 등.
  2. NodeManager의 ContainerManager가 startContainer() 메소드를 처리한다.
    ContainerManager는 ApplicationMaster가 요청한대로 컨테이너를 실행하고 StartContainersResponse 를 반환한다.
    StartContainersResponse에서는 다음 정보를 얻을 수 있다.
  3. 컨테이너가 성공적으로 실행되고 나면 ApplicationMaster는 getContainerStatuses를 주기적으로 호출해서 각 컨테이너의 어플리케이션의 상태를 모니터링 한다.
  4. ContainerManager는 ApplicationMaster가 요청한 컨테이너의 상태를 GetContainerStatusesResponse 로 반환한다.


3.6 Application Master 종료

컨테이너에서 실행했던 어플리케이션들이 종료되면 ApplicationMaster도 종료되어야 한다.
ApplicationMaster의 종료로 하나의 어플리케이션의 라이프사이클이 종료된다.

  1. ApplicationMaster의 클라이언트는 ResourceManager에게 ApplicationMaster의 종료를 요청한다.

  2. ApplicationMasterService는 해당 ApplicationMaster를 ResourceManager에서 해제하고 FinishApplicationMasterResponse를 전달한다.

    1. FinishApplicationMasterResponse 는 boolean 의 `getIsUnregistered()` 메소드로 정상적인 해제와 stop 가능 여부를 알린다.
    2. 만약 이것이 true 가 되기 전에 application이 먼저 stop 한다면, RM은 어플리케이션을 재시도 한다.


3.7 Auxiliary Service


3.7.1 Auxiliary Service란? (보조 서비스)

하둡 Map Reduce에서는 Map TaskReduce Task 사이에 셔플 작업을 한다.

  • Map Task : 입력 데이터를 작은 조각(split)으로 나누고, 각 조각에서 필요한 데이터를 맵핑해 (키, 값) 형태의 중간 데이터 생성
  • Reduce Task : 중간 데이터를 키별로 모아서 집계/처리
  • Suffle : Map Task의 출력 데이터를 키 기준으로 나눠서 Reduce Task로 전송하는 과정. 데이터 이동과 재배치

셔플 단계에서 리듀스로 전송할 데이터를 파티셔닝하고, 파티셔닝 된 데이터를 네트워크로 전달한다.
Yarn 클러스터에서 Map Reduce 어플리케이션을 실행할 경우, Map Task는 노드 매니저의 컨테이너에서 실행된다.
그런데 NodeManager는 컨테이너에서 실행 중이던 어플리케이션 실행이 종료될 경우 컨테이너도 함께 종료시킨다.
이렇게 컨테이너가 종료된다면 Map TaskReduce Task에 데이터를 전달할 수 없게된다.
따라서 별도의 조치를 취하지 않으면 셔플이 일어날 수 없다.

Yarn은 이런 상황의 방지를 위해서 Auxiliary Service(보조 서비스)를 제공한다.
Auxiliary Service는 NodeManager 사이의 서비스 제어를 위한 기능이다.
이 서비스를 이용해서 서로 다른 NodeManager 사이에 데이터를 전달하거나, 다른 NodeManager를 제어할 수 있다.
Auxiliary Service를 Map Reduce에 적용하면 Map Task를 실행하는 NodeManager와 Reduce Task를 실행하는 NodeManager 사이에 셔플(데이터 전송 및 재배치)이 가능해진다.


3.7.2 Auxiliary Service 의 동작과정

  1. 클라이언트가 ResourceManager에게 어플리케이션 실행을 요청한다.

  2. ResourceManager는 해당 어플리케이션의 ApplicationMaster를 실행한다.

  3. Yarn은 Map Reduce의 ApplicationMaster로 MRAppMaster를 제공한다.
    ResourceManager가 ApplicationMaster 실행을 요청하면 NodeManager가 컨테이너에서 MRAppMaster를 실행한다.

  4. MRAppMaster는 또 다른 NodeManager에게 Map Task의 실행을 요청한다.

  5. NodeManager는 컨테이너에서 Map Task를 실행한다.

  6. NodeManager에서 실행된 Map Task는 태스크 수행 결과를 셔플을 통해 Reduce Task 에 전달한다.

    • 여기서 셔플을 담당하는 기본 클래스는 하둡의 mapred 패키지에 구현되어있는 ShuffleHandler이다. source
  7. 어플리케이션 실행 요청

    • 클라이언트 → ResourceManager에게 MapReduce 어플리케이션 실행 요청
  8. ApplicationMaster 실행

    • ResourceManager는 클러스터 내 한 NodeManager에게 ApplicationMaster 실행 지시함
    • NodeManager는 컨테이너 하나를 띄워서 MRAppMaster 실행
      • Yarn은 Map Reduce의 ApplicationMaster로 MRAppMaster를 제공
  9. Map Task 실행 요청

    • MRAppMaster는 다른 NodeManager들에게 Map Task 실행을 요청
    • 즉, 각 노드의 NodeManager가 컨테이너를 생성하고 그 안에서 Map Task 실행
  10. Map Task 수행 및 결과 생성

    • 각 NodeManager의 컨테이너에서 Map Task가 실행
    • Map Task는 중간 결과(Intermediate Data)를 로컬 디스크에 저장
  11. Shuffle 단계 (데이터 전송 및 재배치)

    • Reduce Task가 실행될 NodeManager들이 Map Task가 생성한 데이터를 네트워크로 받아옴 (즉, Pull 방식) > 이 과정을 Shuffle이라 함
    • NodeManager 자체가 새로 실행되는 것이 아니라, 이미 실행 중인 다른 NodeManager들의 컨테이너에서 Reduce Task가 데이터를 가져가는 것
    • 여기서 셔플을 담당하는 기본 클래스는 하둡의 mapred 패키지에 구현되어있는 ShuffleHandler이다. source
  12. Reduce Task 실행 및 최종 결과 생성

    • Shuffle된 데이터를 바탕으로 Reduce Task 수행
    • 최종 출력 결과가 HDFS 같은 스토리지에 저장됨

3.7.3 Pluggable Shuffle and Pluggable Sort

매뉴얼

MapReduce의 Shuffle 에서 사용할 서비스를 다음 두가지 방법으로 커스텀하게 설정할 수 있다.(pluggable)

  1. Job 제출시의 configuration에 설정, job 의 패키지 안에 인터페이스(ShuffleConsumerPlugin, MapOutputCollector) 구현체를 구현한 클래스를 위치
  2. aux-service 를 명시한 yarn-site.xml 을 모든 하둡 노드에 배포

정리

요즘은 MapReduce를 직접 구현하기보다는 Spark, Flink, Streaming과 같은 전문 프레임워크를 활용한다. 이러한 프레임워크들은 내부적으로 YARN 위에서 실행될 수 있도록 셔플이나 리소스 관리 같은 복잡한 기능들을 이미 구현해 두었기 때문에, 사용자는 추상화된 고수준 API만 활용하면 된다. 따라서 YARN의 세부 워크플로를 몰라도 프레임워크를 사용하는 데에는 큰 문제가 없다.

그러나 내부 동작 원리를 이해하고 넘어가는 것은 중요하다. 이를 통해 아키텍처 설계 방법, 구현 기법, 디자인 패턴, 대규모 데이터 처리에서 검증된 안정적인 방식들을 배울 수 있으며, 이는 다른 서비스 요구사항을 구현할 때 유용한 아이디어로 활용될 수 있다. 또한 특정 API를 사용하지 않는 상황이나 예상치 못한 오류, 버그 해결 과정에서도 이러한 이해가 문제를 파악하고 해결책을 찾는 데 큰 도움이 될 것

profile
Data Analytics Engineer 가 되

0개의 댓글