[Yarn] Yarn Application 실습

Hyunjun Kim·2025년 8월 21일
0

Data_Engineering

목록 보기
134/153

4 Yarn Application 실습

ℹ️ 실습은 P08-C02 AWS EMR Hadoop 실습에서 세팅한 클러스터에서 진행한다. 클러스터가 없거나 비용이 문제가 되는 경우 hadoop의 single node cluster를 추천. 단, 강의와 내용이 조금 다를 수 있음.

4.1 Code

Yanr Application에는 여러 개의 main이 있다.
처음 자바 배울 때 하나의 프로그램에는 하나의 메인만 있을 수 있다고 배웠는데, 엄밀히 말하면
app에 Main이 여러 개가 담겨 있어도 별도의 프로세스로 따로따로 컨테이너에서 실행시킬 거니까
실제로 여러 개가. 존재해도 되긴 함. 그런데 시작할 때 어떤 걸 지정하느냐가 중요함.

settings.gradle

rootProject.name = 'hadoop-yarn-app'

build.gradle

plugins {
    id 'java'
    id 'com.github.johnrengelman.shadow' version '7.1.2'
}

group 'de.example.hadoop.yarn'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

def hadoop_version = "3.2.1"

dependencies {
    implementation "org.apache.hadoop:hadoop-client:${hadoop_version}"
    implementation("org.apache.hadoop:hadoop-common:${hadoop_version}") {
        exclude group: 'commons-el', module: 'commons-el'
        exclude group: 'tomcat', module: 'jasper-runtime'
        exclude group: 'tomcat', module: 'jasper-implementationr'
        exclude group: 'org.mortbay.jetty', module: 'jsp-2.1-jetty'
    }
    implementation "org.apache.hadoop:hadoop-hdfs:${hadoop_version}"
    implementation "org.apache.hadoop:hadoop-yarn-api:${hadoop_version}"
    implementation "org.apache.hadoop:hadoop-yarn-common:${hadoop_version}"
    implementation "org.apache.hadoop:hadoop-yarn-client:${hadoop_version}"
    implementation "org.apache.hadoop:hadoop-yarn-server-nodemanager:${hadoop_version}"
    implementation "org.apache.hadoop:hadoop-yarn-server-resourcemanager:${hadoop_version}"
    implementation 'commons-lang:commons-lang:2.6'
    implementation 'com.google.guava:guava:11.0.2'
    implementation 'commons-cli:commons-cli:1.2'
    implementation 'commons-io:commons-io:2.4'
}

test {
    useJUnitPlatform()
}
  • shadow는 shadowJar를 실행하면 app>build>libs 폴더 위치에
    -all이 붙은 필요 라이브러리를 다 담고 있는 뚱뚱한 Jar파일이 만들어질 것
  • hadoop_versiond은 EMR 설치된 버전에 맞춰 "3.2.1"
  • 톰캣이나 서버 관련된 패키지들은 exclude를 했다. 하둡 패키지가 커서 필요 없는 기능들이 들어가는 경우가 많은데, 그것 때문에 문제가 되는 경우가 종종 있음.

de.example.hadoop.yarn.Constants.java

package de.example.hadoop.yarn;

public class Constants {
    /**
     * Environment key name pointing to the the app master jar location
     */
    public static final String AM_JAR_PATH = "AM_JAR_PATH";

    /**
     * Environment key name denoting the file timestamp for the shell script.
     * Used to validate the local resource.
     */
    public static final String AM_JAR_TIMESTAMP = "AM_JAR_TIMESTAMP";

    /**
     * Environment key name denoting the file content length for the shell script.
     * Used to validate the local resource.
     */
    public static final String AM_JAR_LENGTH = "AM_JAR_LENGTH";

    public static final String AM_JAR_NAME = "AppMaster.jar";
    
    //AM_JAR_NAME : 하둡 경로에다가 yarn application사용할 전용 경로나 캐시에 요 이름을 바꿔서 올리게 됨.
}

de.example.hadoop.yarn.MyClient.java

package de.example.hadoop.yarn;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;

public class MyClient {

    // Start time for client
    private final long clientStartTime = System.currentTimeMillis();

    // Configuration
    private Configuration conf;

    private YarnClient yarnClient;

    // Application master specific info to register a new Application with RM/ASM
    private String appName = "";

    // App master priority
    private int amPriority = 0;

    // Queue for App master
    private String amQueue = "";

    // Amt. of memory resource to request for to run the App Master
    private long amMemory = 10;

    // Amt. of virtual core resource to request for to run the App Master
    private int amVCores = 1;

    // ApplicationMaster jar file
    private String appMasterJarPath = "";

    // Container priority
    private int requestPriority = 0;

    // Amt of memory to request for container in which the HelloYarn will be executed
    private int containerMemory = 10;

    // Amt. of virtual cores to request for container in which the HelloYarn will be executed
    private int containerVirtualCores = 1;

    // No. of containers in which the HelloYarn needs to be executed
    private int numContainers = 1;

    // Timeout threshold for client. Kill app after time interval expires.
    private long clientTimeout = 600000;

    // Command line options
    private Options opts;

    /**
     * Constructor
     *
     */
    public MyClient() throws Exception {
        createYarnClient();
        initOptions();
    }

    private void createYarnClient() {
    	// 맨 처음 yarn 클라이언트가 yarn에 있는 리소스 매니저한테 뭔가 요청을 하는데 그 전에 클라이언트가 있어야겠지? 그 클라이언트를 말한다. 잡을 요청하는 쪽에서 동작하는 친구
        yarnClient = YarnClient.createYarnClient();
        this.conf = new YarnConfiguration();
        yarnClient.init(conf);
    }

    private void initOptions() {
        opts = new Options();
        // Yarn으로 이 작업을 서밋할 때 파라미터로 넣을 수 있게 만든 옵션 
        opts.addOption("appname", true, "Application Name. Default value - HelloYarn");
        opts.addOption("priority", true, "Application Priority. Default 0");
        opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
        opts.addOption("timeout", true, "Application timeout in milliseconds");
        opts.addOption("master_memory", true,
                       "Amount of memory in MB to be requested to run the application master");
        opts.addOption("master_vcores", true,
                       "Amount of virtual cores to be requested to run the application master");
        opts.addOption("jar", true, "Jar file containing the application master");
        opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the HelloYarn");
        opts.addOption("container_vcores", true,
                       "Amount of virtual cores to be requested to run the HelloYarn");
        opts.addOption("num_containers", true, "No. of containers on which the HelloYarn needs to be executed");
        opts.addOption("help", false, "Print usage");
    }

    /**
     * Helper function to print out usage
     */
    private void printUsage() {
        new HelpFormatter().printHelp("Client", opts);
    }

    /**
     * Parse command line options
     * @param args Parsed command line options
     * @return Whether the init was successful to run the client
     * @throws org.apache.commons.cli.ParseException
     */
    public boolean init(String[] args) throws ParseException {

        CommandLine cliParser = new GnuParser().parse(opts, args);

        if (args.length == 0) {
            throw new IllegalArgumentException("No args specified for client to initialize");
        }

        if (cliParser.hasOption("help")) {
            printUsage();
            return false;
        }

        appName = cliParser.getOptionValue("appname", "HelloYarn");
        amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
        amQueue = cliParser.getOptionValue("queue", "default");
        amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
        amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));

        if (amMemory < 0) {
            throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
                                               + " Specified memory=" + amMemory);
        }
        if (amVCores < 0) {
            throw new IllegalArgumentException(
                    "Invalid virtual cores specified for application master, exiting."
                    + " Specified virtual cores=" + amVCores);
        }

        if (!cliParser.hasOption("jar")) {
            throw new IllegalArgumentException("No jar file specified for application master");
        }

        appMasterJarPath = cliParser.getOptionValue("jar");

        containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
        containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
        numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));

        if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
            throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
                                               + " exiting."
                                               + " Specified containerMemory=" + containerMemory
                                               + ", containerVirtualCores=" + containerVirtualCores
                                               + ", numContainer=" + numContainers);
        }

        clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));

        return true;
    }

    /**
     * Main run function for the client
     * @return true if application completed successfully
     * @throws java.io.IOException
     * @throws org.apache.hadoop.yarn.exceptions.YarnException
     */
    public boolean run() throws IOException, YarnException {

        System.out.println("Running Client");
        yarnClient.start();

        // Get a new application id
        YarnClientApplication app = yarnClient.createApplication();
        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
        // 여기서 id 가져옴

        long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
        // yarn이 너한테 할당해 줄 수 있는 최대 메모리 사이즈 얼만큼이야 알려줌
        System.out.println("Max mem capabililty of resources in this cluster " + maxMem);

        // A resource ask cannot exceed the max.
        if (amMemory > maxMem) {
            System.out.println("AM memory specified above max threshold of cluster. Using max value."
                     + ", specified=" + amMemory
                     + ", max=" + maxMem);
            amMemory = maxMem;
        }

        int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
        System.out.println("Max virtual cores capabililty of resources in this cluster " + maxVCores);

		// AM를 띄우는 걸 우선하는 코드
        if (amVCores > maxVCores) {
            System.out.println("AM virtual cores specified above max threshold of cluster. "
                     + "Using max value." + ", specified=" + amVCores
                     + ", max=" + maxVCores);
            amVCores = maxVCores;
        }

        // set the application name
        // Yanr을 제출할 때 사용할 context 객체 하나 만들고
        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
        ApplicationId appId = appContext.getApplicationId();

        appContext.setApplicationName(appName);

        // Set up resource type requirements
        // For now, both memory and vcores are supported, so we set memory and
        // vcores requirements
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemorySize(amMemory);
        capability.setVirtualCores(amVCores);
        appContext.setResource(capability);

        // Set the priority for the application master
        Priority priority = Records.newRecord(Priority.class);
        priority.setPriority(amPriority);
        appContext.setPriority(priority);

        // Set the queue to which this application is to be submitted in the RM
        appContext.setQueue(amQueue);

        // Set the ContainerLaunchContext to describe the Container ith which the ApplicationMaster is launched.
        appContext.setAMContainerSpec(getAMContainerSpec(appId.getId()));

        // Submit the application to the applications manager
        // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
        // Ignore the response as either a valid response object is returned on success
        // or an exception thrown to denote some form of a failure
        System.out.println("Submitting application to ASM");

        yarnClient.submitApplication(appContext);

        // Monitor the application
        // 모니터는 임시로 구현한 것
        return monitorApplication(appId);
    }

    private ContainerLaunchContext getAMContainerSpec(int appId) throws IOException, YarnException {
        // Set up the container launch context for the application master
        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

        FileSystem fs = FileSystem.get(conf);

        // set local resources for the application master
        // local files or archives as needed
        // In this scenario, the jar file for the application master is part of the local resources
        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

        System.out.println("Copy App Master jar from local filesystem and add to local environment");
        // Copy the application master jar to the filesystem
        // Create a local resource to point to the destination jar path
        addToLocalResources(fs, appMasterJarPath, Constants.AM_JAR_NAME, appId,
                            localResources, null);

        // Set local resource info into app master container launch context
        amContainer.setLocalResources(localResources);

        // Set the env variables to be setup in the env where the application master will be run
        System.out.println("Set the environment for the application master");
        amContainer.setEnvironment(getAMEnvironment(localResources, fs));

        // Set the necessary command to execute the application master
        Vector<CharSequence> vargs = new Vector<CharSequence>(30);

        // Set java executable command
        System.out.println("Setting up app master command");
        vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
        // Set Xmx based on am memory size
        vargs.add("-Xmx" + amMemory + "m");
        // Set class name
        vargs.add("de.example.hadoop.yarn.MyApplicationMaster");
        // Set params for Application Master
        vargs.add("--container_memory " + String.valueOf(containerMemory));
        vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
        vargs.add("--num_containers " + String.valueOf(numContainers));
        vargs.add("--priority " + String.valueOf(requestPriority));
        // 1>,2> 리눅스 redirect를 써서 요 경로에다가 해줘라
		// LOG_DIR_EXPANSION_VAR 는 어플리케이션 마스터가 어느 노드에 뜰지 리소스 매니저랑 노드 매니저가 통신하면서 서로 결정하는데 결정된 context가 있는 그 경로로 돌려 봐야 정해지는 경로다.
        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

        // Get final command
        StringBuilder command = new StringBuilder();
        for (CharSequence str : vargs) {
            command.append(str).append(" ");
        }

        System.out.println("Completed setting up app master command " + command.toString());
        List<String> commands = new ArrayList<String>();
        commands.add(command.toString());
        amContainer.setCommands(commands);

        return amContainer;
    }

    private void addToLocalResources(FileSystem fs, String fileSrcPath,
                                     String fileDstPath, int appId, Map<String, LocalResource> localResources,
                                     String resources) throws IOException {
        String suffix = appName + "/" + appId + "/" + fileDstPath;
        Path dst = new Path(fs.getHomeDirectory(), suffix);
        if (fileSrcPath == null) {
            FSDataOutputStream ostream = null;
            try {
                ostream = FileSystem
                        .create(fs, dst, new FsPermission((short) 0710));
                ostream.writeUTF(resources);
            } finally {
                IOUtils.closeQuietly(ostream);
            }
        } else {
            fs.copyFromLocalFile(new Path(fileSrcPath), dst);
        }
        FileStatus scFileStatus = fs.getFileStatus(dst);
        LocalResource scRsrc =
                LocalResource.newInstance(
                        URL.fromURI(dst.toUri()),
                        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                        scFileStatus.getLen(), scFileStatus.getModificationTime());
        localResources.put(fileDstPath, scRsrc);
    }

    private Map<String, String> getAMEnvironment(Map<String, LocalResource> localResources
            , FileSystem fs) throws IOException {
        Map<String, String> env = new HashMap<String, String>();

        // Set ApplicationMaster jar file
        LocalResource appJarResource = localResources.get(Constants.AM_JAR_NAME);
        Path hdfsAppJarPath = new Path(fs.getHomeDirectory(), appJarResource.getResource().getFile());
        FileStatus hdfsAppJarStatus = fs.getFileStatus(hdfsAppJarPath);
        long hdfsAppJarLength = hdfsAppJarStatus.getLen();
        long hdfsAppJarTimestamp = hdfsAppJarStatus.getModificationTime();

        env.put(Constants.AM_JAR_PATH, hdfsAppJarPath.toString());
        env.put(Constants.AM_JAR_TIMESTAMP, Long.toString(hdfsAppJarTimestamp));
        env.put(Constants.AM_JAR_LENGTH, Long.toString(hdfsAppJarLength));

        // Add AppMaster.jar location to classpath
        // At some point we should not be required to add
        // the hadoop specific classpaths to the env.
        // It should be provided out of the box.
        // For now setting all required classpaths including
        // the classpath to "." for the application jar
        StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
                .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
        for (String c : conf.getStrings(
                YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
            classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
            classPathEnv.append(c.trim());
        }
        env.put("CLASSPATH", classPathEnv.toString());

        return env;
    }

    /**
     * Monitor the submitted application for completion.
     * Kill application if time expires.
     * @param appId Application Id of application to be monitored
     * @return true if application completed successfully
     * @throws org.apache.hadoop.yarn.exceptions.YarnException
     * @throws java.io.IOException
     */
    private boolean monitorApplication(ApplicationId appId)
            throws YarnException, IOException {
		// application 끝날 때 까지 무한 루프.
        while (true) {
            // Check app status every 1 second.
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.err.println("Thread sleep in monitoring loop interrupted");
            }

            // Get application report for the appId we are interested in
            ApplicationReport report = yarnClient.getApplicationReport(appId);
            YarnApplicationState state = report.getYarnApplicationState();
            FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
            if (YarnApplicationState.FINISHED == state) {
                if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
                    System.out.println("Application has completed successfully. "
                             + " Breaking monitoring loop : ApplicationId:" + appId.getId());
                    return true;
                } else {
                    System.out.println("Application did finished unsuccessfully."
                             + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
                             + ". Breaking monitoring loop : ApplicationId:" + appId.getId());
                    return false;
                }
            } else if (YarnApplicationState.KILLED == state
                       || YarnApplicationState.FAILED == state) {
                System.out.println("Application did not finish."
                         + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
                         + ". Breaking monitoring loop : ApplicationId:" + appId.getId());
                return false;
            }
			// (시작된 시간 + 타임아웃된 시간)이 (현재시간)보다 크다면 타임아웃이 지난 것 
            if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
                System.out.println("Reached client specified timeout for application. Killing application"
                         + ". Breaking monitoring loop : ApplicationId:" + appId.getId());
                forceKillApplication(appId);
                return false;
            }
        }
    }

    /**
     * Kill a submitted application by sending a call to the ASM
     * @param appId Application Id to be killed.
     * @throws org.apache.hadoop.yarn.exceptions.YarnException
     * @throws java.io.IOException
     */
    private void forceKillApplication(ApplicationId appId)
            throws YarnException, IOException {
        yarnClient.killApplication(appId);

    }

    /**
     * @param args Command line arguments
     */
    public static void main(String[] args) {
        boolean result = false;
        try {
            MyClient client = new MyClient();
            System.out.println("Initializing Client");
            try {
                boolean doRun = client.init(args);
                if (!doRun) {
                    System.exit(0);
                }
            } catch (IllegalArgumentException e) {
                System.err.println(e.getLocalizedMessage());
                client.printUsage();
                System.exit(-1);
            }
            result = client.run();
        } catch (Throwable t) {
            System.err.println("Error running CLient\n" + t.getMessage() + Arrays.toString(t.getStackTrace()));
            System.exit(1);
        }
        if (result) {
            System.out.println("Application completed successfully");
            System.exit(0);
        }
        System.err.println("Application failed to complete successfully");
        System.exit(2);
    }
}

de.example.hadoop.yarn.MyApplicationMaster.java

package de.example.hadoop.yarn;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.LogManager;

public class MyApplicationMaster {

    // Application Attempt Id ( combination of attemptId and fail count )
    protected ApplicationAttemptId appAttemptID;

    // No. of containers to run shell command on
    private int numTotalContainers = 1;

    // Memory to request for the container on which the shell command will run
    private int containerMemory = 10;

    // VirtualCores to request for the container on which the shell command will run
    private int containerVirtualCores = 1;

    // Priority of the request
    private int requestPriority;

    // Location of shell script ( obtained from info set in env )
    // Shell script path in fs
    private String appJarPath = "";
    // Timestamp needed for creating a local resource
    private long appJarTimestamp = 0;
    // File length needed for local resource
    private long appJarPathLen = 0;

    // Configuration
    private Configuration conf;

    public MyApplicationMaster() {
        // Set up the configuration
        conf = new YarnConfiguration();
    }

    /**
     * Parse command line options
     *
     * @param args Command line args
     * @return Whether init successful and run should be invoked
     * @throws org.apache.commons.cli.ParseException
     * @throws java.io.IOException
     */
    public boolean init(String[] args) throws Exception {
        Options opts = new Options();
        opts.addOption("app_attempt_id", true,
                       "App Attempt ID. Not to be used unless for testing purposes");
        opts.addOption("shell_env", true,
                       "Environment for shell script. Specified as env_key=env_val pairs");
        opts.addOption("container_memory", true,
                       "Amount of memory in MB to be requested to run the shell command");
        opts.addOption("container_vcores", true,
                       "Amount of virtual cores to be requested to run the shell command");
        opts.addOption("num_containers", true,
                       "No. of containers on which the shell command needs to be executed");
        opts.addOption("priority", true, "Application Priority. Default 0");
        opts.addOption("help", false, "Print usage");

        CommandLine cliParser = new GnuParser().parse(opts, args);

        Map<String, String> envs = System.getenv();

        if (!envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.name())) {
            if (cliParser.hasOption("app_attempt_id")) {
                String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
                appAttemptID = ApplicationAttemptId.fromString(appIdStr);
            } else {
                throw new IllegalArgumentException(
                        "Application Attempt Id not set in the environment");
            }
        } else {
            ContainerId containerId = ContainerId.fromString(
                    envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()));
            appAttemptID = containerId.getApplicationAttemptId();
        }

        if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
            throw new RuntimeException(
                    ApplicationConstants.APP_SUBMIT_TIME_ENV + " not set in the environment");
        }
        if (!envs.containsKey(ApplicationConstants.Environment.NM_HOST.name())) {
            throw new RuntimeException(
                    ApplicationConstants.Environment.NM_HOST.name() + " not set in the environment");
        }
        if (!envs.containsKey(ApplicationConstants.Environment.NM_HTTP_PORT.name())) {
            throw new RuntimeException(
                    ApplicationConstants.Environment.NM_HTTP_PORT + " not set in the environment");
        }
        if (!envs.containsKey(ApplicationConstants.Environment.NM_PORT.name())) {
            throw new RuntimeException(
                    ApplicationConstants.Environment.NM_PORT.name() + " not set in the environment");
        }

        if (envs.containsKey(Constants.AM_JAR_PATH)) {
            appJarPath = envs.get(Constants.AM_JAR_PATH);

            if (envs.containsKey(Constants.AM_JAR_TIMESTAMP)) {
                appJarTimestamp = Long.valueOf(envs.get(Constants.AM_JAR_TIMESTAMP));
            }
            if (envs.containsKey(Constants.AM_JAR_LENGTH)) {
                appJarPathLen = Long.valueOf(envs.get(Constants.AM_JAR_LENGTH));
            }

            if (!appJarPath.isEmpty() && (appJarTimestamp <= 0 || appJarPathLen <= 0)) {
                System.err.println("Illegal values in env for shell script path" + ", path="
                          + appJarPath + ", len=" + appJarPathLen + ", timestamp=" + appJarTimestamp);
                throw new IllegalArgumentException(
                        "Illegal values in env for shell script path");
            }
        }

        System.out.println("Application master for app" + ", appId="
                 + appAttemptID.getApplicationId().getId() + ", clusterTimestamp="
                 + appAttemptID.getApplicationId().getClusterTimestamp()
                 + ", attemptId=" + appAttemptID.getAttemptId());

        containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
        containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
        numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
        if (numTotalContainers == 0) {
            throw new IllegalArgumentException("Cannot run MyAppliCationMaster with no containers");
        }
        requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));

        return true;
    }

    /**
     * Main run function for the application master
     *
     * @throws org.apache.hadoop.yarn.exceptions.YarnException
     * @throws java.io.IOException
     */
    @SuppressWarnings({ "unchecked" })
    public void run() throws Exception {
        System.out.println("Running MyApplicationMaster");

        // Initialize clients to ResourceManager and NodeManagers
        AMRMClient<ContainerRequest> amRMClient = AMRMClient.createAMRMClient();
        amRMClient.init(conf);
        amRMClient.start();

        // Register with ResourceManager
        amRMClient.registerApplicationMaster("", 0, "");

        // Set up resource type requirements for Container
        Resource capability = Records.newRecord(Resource.class);
        capability.setMemorySize(containerMemory);
        capability.setVirtualCores(containerVirtualCores);

        // Priority for worker containers - priorities are intra-application
        Priority priority = Records.newRecord(Priority.class);
        priority.setPriority(requestPriority);

        // Make container requests to ResourceManager
        for (int i = 0; i < numTotalContainers; ++i) {
            ContainerRequest containerAsk = new ContainerRequest(capability, null, null, priority);
            amRMClient.addContainerRequest(containerAsk);
        }

        NMClient nmClient = NMClient.createNMClient();
        nmClient.init(conf);
        nmClient.start();

        // Setup CLASSPATH for Container
        Map<String, String> containerEnv = new HashMap<String, String>();
        containerEnv.put("CLASSPATH", "./*");

        // Setup ApplicationMaster jar file for Container
        LocalResource appMasterJar = createAppMasterJar();

        // Obtain allocated containers and launch
        int allocatedContainers = 0;
        // We need to start counting completed containers while still allocating
        // them since initial ones may complete while we're allocating subsequent
        // containers and if we miss those notifications, we'll never see them again
        // and this ApplicationMaster will hang indefinitely.
        int completedContainers = 0;
        // 컨테이너를 다 할당 받을 때까지 반복을 함
        while (allocatedContainers < numTotalContainers) {
            AllocateResponse response = amRMClient.allocate(0);
            for (Container container : response.getAllocatedContainers()) {
                allocatedContainers++;

                ContainerLaunchContext appContainer = createContainerLaunchContext(appMasterJar, containerEnv);
                System.out.println("Launching container " + container.getId().toString());
                System.out.println("Allocated containers " + allocatedContainers);

                nmClient.startContainer(container, appContainer);
            }
            for (ContainerStatus status : response.getCompletedContainersStatuses()) {
                ++completedContainers;
                System.out.println("ContainerID:" + status.getContainerId() + ", state:" + status.getState().name());
            }
            // 무한루프다 보니 어플리케이션 마스터가 리소스 매니저나 노드매니저한테 엄청 요청을 많이 보냄. 그러면 시스템에 부하가 오니까 sleep 넣어줌.
            Thread.sleep(100);
        }
		
        // 다 끝날 때까지 반복.
        // Now wait for the remaining containers to complete
        while (completedContainers < numTotalContainers) {
            AllocateResponse response = amRMClient.allocate(completedContainers / numTotalContainers);
            for (ContainerStatus status : response.getCompletedContainersStatuses()) {
                ++completedContainers;
                System.out.println("ContainerID:" + status.getContainerId() + ", state:" + status.getState().name());
            }
            Thread.sleep(100);
        }

        System.out.println("Completed containers:" + completedContainers);

        // Un-register with ResourceManager
        amRMClient.unregisterApplicationMaster(
                FinalApplicationStatus.SUCCEEDED, "", "");
        System.out.println("Finished MyApplicationMaster");
    }

    private LocalResource createAppMasterJar() throws IOException {
        LocalResource appMasterJar = Records.newRecord(LocalResource.class);
        if (!appJarPath.isEmpty()) {
            appMasterJar.setType(LocalResourceType.FILE);
            Path jarPath = new Path(appJarPath);
            jarPath = FileSystem.get(conf).makeQualified(jarPath);
            appMasterJar.setResource(URL.fromPath(jarPath));
            appMasterJar.setTimestamp(appJarTimestamp);
            appMasterJar.setSize(appJarPathLen);
            appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
        }
        return appMasterJar;
    }

    /**
     * Launch container by create ContainerLaunchContext
     *
     * @param appMasterJar
     * @param containerEnv
     * @return
     */

    private ContainerLaunchContext createContainerLaunchContext(LocalResource appMasterJar,
                                                                Map<String, String> containerEnv) {
        ContainerLaunchContext appContainer =
                Records.newRecord(ContainerLaunchContext.class);
        appContainer.setLocalResources(
                Collections.singletonMap(Constants.AM_JAR_NAME, appMasterJar));
        appContainer.setEnvironment(containerEnv);
        // 컨테이너 안에서 실행될 어플리케이션도 하나의 어플리케이션이니까 자바 어플리케이션을 실행할 옵션을 커멘드에다가 세팅
		// 최대 메모리 128M, HelloYarn 클래스가 메인이다
        appContainer.setCommands(
                Collections.singletonList(
                        "$JAVA_HOME/bin/java" +
                        " -Xmx128M" +
                        " de.example.hadoop.yarn.HelloYarn" +
                        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
                        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
                )
        );

        return appContainer;
    }
	
    // 여기에 있는 main이 컨테이너에 이 친구가 할당 받아서 Jar의 main class 까지 찾았을 때, 실행되는 함수.
    public static void main(String[] args) throws Exception {
        try {
            MyApplicationMaster appMaster = new MyApplicationMaster();
            System.out.println("Initializing MyApplicationMaster");
            boolean doRun = appMaster.init(args);
            if (!doRun) {
                System.exit(0);
            }
            appMaster.run();
        } catch (Throwable t) {
            System.err.println("Error running MyApplicationMaster\n" + t.getMessage() + Arrays.toString(t.getStackTrace()));
            LogManager.shutdown();
            ExitUtil.terminate(1, t);
        }
    }
}

de.example.hadoop.yarn.HelloYarn.java

package de.example.hadoop.yarn;

public class HelloYarn {

    private static final long MEGABYTE = 1024L * 1024L;

    public HelloYarn() {
        System.out.println("HelloYarn!");
    }

    public static long bytesToMegabytes(long bytes) {
        return bytes / MEGABYTE;
    }

    public void printMemoryStats() {
        long freeMemory = bytesToMegabytes(Runtime.getRuntime().freeMemory());
        long totalMemory = bytesToMegabytes(Runtime.getRuntime().totalMemory());
        long maxMemory = bytesToMegabytes(Runtime.getRuntime().maxMemory());
        System.out.println("The amount of free memory in the Java Virtual Machine: " + freeMemory);
        System.out.println("The total amount of memory in the Java virtual machine: " + totalMemory);
        System.out.println("The maximum amount of memory that the Java virtual machine: " + maxMemory);
    }

    public static void main(String[] args) {
        HelloYarn helloYarn = new HelloYarn();
        helloYarn.printMemoryStats();
    }
}

resources/log4j.properties

log4j.rootLogger=INFO, consoleAppender

log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.Threshold=All
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%d] [%-5p] %c %x - %m%n
  • 이렇게 하면 내가 남긴 로그가 아닌 하둡에서 남기는 로그가 이 설정을 따를 수 있게 된다.


4.2 실행

build 를 수행한 뒤, jar 를 EMR primary node 로 scp 를 통해 복사한다.

scp -i $your_key $your_jar_path hadoop@$your_emr_primary_ec2_public_dns:~/example/jars/.

EMR Primary node 에 접속한 후 다음 명령어를 수행한다.

yarn jar $your_jar $yarn_client_mainclass $args
yarn jar hadoop-yarn-app-1.0-SNAPSHOT.jar de.example.hadoop.yarn.MyClient -jar hadoop-yarn-app-1.0-SNAPSHOT.jar
  • -jar 부터는 코드에서 해당 파라미터를 받을 수 있도록 구현한 로직때문에 넣는 옵션이다.


4.3 log 확인

  1. Yarn 에 작업을 제출한 뒤 남는 로그는 YarnClient 입장에서의 로그이다.
  2. Yarn UI 에서 볼 수있는 Logs는 Yarn Client 의 로그에, 편의로 추가 로그를 볼 수 있다.
    1. Application Master 를 시작하는 container 로그를 볼 수 있다.
    2. Application Master 로그의 일부를 볼 수 있다.
  3. 각 클라이언트가 수행한 정확한 로그는 데이터노드에서 확인할 수 있다.

편의를 위해 핵심 내용은 Yarn Client, Application Master 로그에서 모두 확인할 수 있도록 하는 것이 좋다.

ApplicationMaster 까지의 로그는 webui 에서 확인이 가능하다.

EMR 기본 설치의 경우

  • ApplicationMaster의 full log는 HDFS의 /var/log/hadoop-yarn/apps/hadoop/logs-tfile/ 경로에서 applicationid 별로 확인이 가능하다.
  • 각 노드의 /var/log/hadoop-yarn/containers/ 에서 applicationid 별로 디렉토리가 생성된다. 그 하위에 container 별 로그 디렉토리가 생긴다. 이 경로가 *ApplicationConstants*.***LOG_DIR_EXPANSION_VAR*** 이다. 이 하위에 자세한 로그가 파일별로 남는다.
profile
Data Analytics Engineer 가 되

0개의 댓글