ℹ️ 실습은 P08-C02 AWS EMR Hadoop 실습에서 세팅한 클러스터에서 진행한다. 클러스터가 없거나 비용이 문제가 되는 경우 hadoop의 single node cluster를 추천. 단, 강의와 내용이 조금 다를 수 있음.
Yanr Application에는 여러 개의 main이 있다.
처음 자바 배울 때 하나의 프로그램에는 하나의 메인만 있을 수 있다고 배웠는데, 엄밀히 말하면
app에 Main이 여러 개가 담겨 있어도 별도의 프로세스로 따로따로 컨테이너에서 실행시킬 거니까
실제로 여러 개가. 존재해도 되긴 함. 그런데 시작할 때 어떤 걸 지정하느냐가 중요함.
rootProject.name = 'hadoop-yarn-app'
plugins {
id 'java'
id 'com.github.johnrengelman.shadow' version '7.1.2'
}
group 'de.example.hadoop.yarn'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
def hadoop_version = "3.2.1"
dependencies {
implementation "org.apache.hadoop:hadoop-client:${hadoop_version}"
implementation("org.apache.hadoop:hadoop-common:${hadoop_version}") {
exclude group: 'commons-el', module: 'commons-el'
exclude group: 'tomcat', module: 'jasper-runtime'
exclude group: 'tomcat', module: 'jasper-implementationr'
exclude group: 'org.mortbay.jetty', module: 'jsp-2.1-jetty'
}
implementation "org.apache.hadoop:hadoop-hdfs:${hadoop_version}"
implementation "org.apache.hadoop:hadoop-yarn-api:${hadoop_version}"
implementation "org.apache.hadoop:hadoop-yarn-common:${hadoop_version}"
implementation "org.apache.hadoop:hadoop-yarn-client:${hadoop_version}"
implementation "org.apache.hadoop:hadoop-yarn-server-nodemanager:${hadoop_version}"
implementation "org.apache.hadoop:hadoop-yarn-server-resourcemanager:${hadoop_version}"
implementation 'commons-lang:commons-lang:2.6'
implementation 'com.google.guava:guava:11.0.2'
implementation 'commons-cli:commons-cli:1.2'
implementation 'commons-io:commons-io:2.4'
}
test {
useJUnitPlatform()
}
-all
이 붙은 필요 라이브러리를 다 담고 있는 뚱뚱한 Jar파일이 만들어질 것package de.example.hadoop.yarn;
public class Constants {
/**
* Environment key name pointing to the the app master jar location
*/
public static final String AM_JAR_PATH = "AM_JAR_PATH";
/**
* Environment key name denoting the file timestamp for the shell script.
* Used to validate the local resource.
*/
public static final String AM_JAR_TIMESTAMP = "AM_JAR_TIMESTAMP";
/**
* Environment key name denoting the file content length for the shell script.
* Used to validate the local resource.
*/
public static final String AM_JAR_LENGTH = "AM_JAR_LENGTH";
public static final String AM_JAR_NAME = "AppMaster.jar";
//AM_JAR_NAME : 하둡 경로에다가 yarn application사용할 전용 경로나 캐시에 요 이름을 바꿔서 올리게 됨.
}
package de.example.hadoop.yarn;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
public class MyClient {
// Start time for client
private final long clientStartTime = System.currentTimeMillis();
// Configuration
private Configuration conf;
private YarnClient yarnClient;
// Application master specific info to register a new Application with RM/ASM
private String appName = "";
// App master priority
private int amPriority = 0;
// Queue for App master
private String amQueue = "";
// Amt. of memory resource to request for to run the App Master
private long amMemory = 10;
// Amt. of virtual core resource to request for to run the App Master
private int amVCores = 1;
// ApplicationMaster jar file
private String appMasterJarPath = "";
// Container priority
private int requestPriority = 0;
// Amt of memory to request for container in which the HelloYarn will be executed
private int containerMemory = 10;
// Amt. of virtual cores to request for container in which the HelloYarn will be executed
private int containerVirtualCores = 1;
// No. of containers in which the HelloYarn needs to be executed
private int numContainers = 1;
// Timeout threshold for client. Kill app after time interval expires.
private long clientTimeout = 600000;
// Command line options
private Options opts;
/**
* Constructor
*
*/
public MyClient() throws Exception {
createYarnClient();
initOptions();
}
private void createYarnClient() {
// 맨 처음 yarn 클라이언트가 yarn에 있는 리소스 매니저한테 뭔가 요청을 하는데 그 전에 클라이언트가 있어야겠지? 그 클라이언트를 말한다. 잡을 요청하는 쪽에서 동작하는 친구
yarnClient = YarnClient.createYarnClient();
this.conf = new YarnConfiguration();
yarnClient.init(conf);
}
private void initOptions() {
opts = new Options();
// Yarn으로 이 작업을 서밋할 때 파라미터로 넣을 수 있게 만든 옵션
opts.addOption("appname", true, "Application Name. Default value - HelloYarn");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
opts.addOption("timeout", true, "Application timeout in milliseconds");
opts.addOption("master_memory", true,
"Amount of memory in MB to be requested to run the application master");
opts.addOption("master_vcores", true,
"Amount of virtual cores to be requested to run the application master");
opts.addOption("jar", true, "Jar file containing the application master");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the HelloYarn");
opts.addOption("container_vcores", true,
"Amount of virtual cores to be requested to run the HelloYarn");
opts.addOption("num_containers", true, "No. of containers on which the HelloYarn needs to be executed");
opts.addOption("help", false, "Print usage");
}
/**
* Helper function to print out usage
*/
private void printUsage() {
new HelpFormatter().printHelp("Client", opts);
}
/**
* Parse command line options
* @param args Parsed command line options
* @return Whether the init was successful to run the client
* @throws org.apache.commons.cli.ParseException
*/
public boolean init(String[] args) throws ParseException {
CommandLine cliParser = new GnuParser().parse(opts, args);
if (args.length == 0) {
throw new IllegalArgumentException("No args specified for client to initialize");
}
if (cliParser.hasOption("help")) {
printUsage();
return false;
}
appName = cliParser.getOptionValue("appname", "HelloYarn");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
if (amMemory < 0) {
throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
+ " Specified memory=" + amMemory);
}
if (amVCores < 0) {
throw new IllegalArgumentException(
"Invalid virtual cores specified for application master, exiting."
+ " Specified virtual cores=" + amVCores);
}
if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for application master");
}
appMasterJarPath = cliParser.getOptionValue("jar");
containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
+ " exiting."
+ " Specified containerMemory=" + containerMemory
+ ", containerVirtualCores=" + containerVirtualCores
+ ", numContainer=" + numContainers);
}
clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
return true;
}
/**
* Main run function for the client
* @return true if application completed successfully
* @throws java.io.IOException
* @throws org.apache.hadoop.yarn.exceptions.YarnException
*/
public boolean run() throws IOException, YarnException {
System.out.println("Running Client");
yarnClient.start();
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
// 여기서 id 가져옴
long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
// yarn이 너한테 할당해 줄 수 있는 최대 메모리 사이즈 얼만큼이야 알려줌
System.out.println("Max mem capabililty of resources in this cluster " + maxMem);
// A resource ask cannot exceed the max.
if (amMemory > maxMem) {
System.out.println("AM memory specified above max threshold of cluster. Using max value."
+ ", specified=" + amMemory
+ ", max=" + maxMem);
amMemory = maxMem;
}
int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
System.out.println("Max virtual cores capabililty of resources in this cluster " + maxVCores);
// AM를 띄우는 걸 우선하는 코드
if (amVCores > maxVCores) {
System.out.println("AM virtual cores specified above max threshold of cluster. "
+ "Using max value." + ", specified=" + amVCores
+ ", max=" + maxVCores);
amVCores = maxVCores;
}
// set the application name
// Yanr을 제출할 때 사용할 context 객체 하나 만들고
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setApplicationName(appName);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemorySize(amMemory);
capability.setVirtualCores(amVCores);
appContext.setResource(capability);
// Set the priority for the application master
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(amPriority);
appContext.setPriority(priority);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
// Set the ContainerLaunchContext to describe the Container ith which the ApplicationMaster is launched.
appContext.setAMContainerSpec(getAMContainerSpec(appId.getId()));
// Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
// Ignore the response as either a valid response object is returned on success
// or an exception thrown to denote some form of a failure
System.out.println("Submitting application to ASM");
yarnClient.submitApplication(appContext);
// Monitor the application
// 모니터는 임시로 구현한 것
return monitorApplication(appId);
}
private ContainerLaunchContext getAMContainerSpec(int appId) throws IOException, YarnException {
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
FileSystem fs = FileSystem.get(conf);
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
System.out.println("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
addToLocalResources(fs, appMasterJarPath, Constants.AM_JAR_NAME, appId,
localResources, null);
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);
// Set the env variables to be setup in the env where the application master will be run
System.out.println("Set the environment for the application master");
amContainer.setEnvironment(getAMEnvironment(localResources, fs));
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
// Set java executable command
System.out.println("Setting up app master command");
vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
vargs.add("de.example.hadoop.yarn.MyApplicationMaster");
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(requestPriority));
// 1>,2> 리눅스 redirect를 써서 요 경로에다가 해줘라
// LOG_DIR_EXPANSION_VAR 는 어플리케이션 마스터가 어느 노드에 뜰지 리소스 매니저랑 노드 매니저가 통신하면서 서로 결정하는데 결정된 context가 있는 그 경로로 돌려 봐야 정해지는 경로다.
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
System.out.println("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
amContainer.setCommands(commands);
return amContainer;
}
private void addToLocalResources(FileSystem fs, String fileSrcPath,
String fileDstPath, int appId, Map<String, LocalResource> localResources,
String resources) throws IOException {
String suffix = appName + "/" + appId + "/" + fileDstPath;
Path dst = new Path(fs.getHomeDirectory(), suffix);
if (fileSrcPath == null) {
FSDataOutputStream ostream = null;
try {
ostream = FileSystem
.create(fs, dst, new FsPermission((short) 0710));
ostream.writeUTF(resources);
} finally {
IOUtils.closeQuietly(ostream);
}
} else {
fs.copyFromLocalFile(new Path(fileSrcPath), dst);
}
FileStatus scFileStatus = fs.getFileStatus(dst);
LocalResource scRsrc =
LocalResource.newInstance(
URL.fromURI(dst.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
localResources.put(fileDstPath, scRsrc);
}
private Map<String, String> getAMEnvironment(Map<String, LocalResource> localResources
, FileSystem fs) throws IOException {
Map<String, String> env = new HashMap<String, String>();
// Set ApplicationMaster jar file
LocalResource appJarResource = localResources.get(Constants.AM_JAR_NAME);
Path hdfsAppJarPath = new Path(fs.getHomeDirectory(), appJarResource.getResource().getFile());
FileStatus hdfsAppJarStatus = fs.getFileStatus(hdfsAppJarPath);
long hdfsAppJarLength = hdfsAppJarStatus.getLen();
long hdfsAppJarTimestamp = hdfsAppJarStatus.getModificationTime();
env.put(Constants.AM_JAR_PATH, hdfsAppJarPath.toString());
env.put(Constants.AM_JAR_TIMESTAMP, Long.toString(hdfsAppJarTimestamp));
env.put(Constants.AM_JAR_LENGTH, Long.toString(hdfsAppJarLength));
// Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
return env;
}
/**
* Monitor the submitted application for completion.
* Kill application if time expires.
* @param appId Application Id of application to be monitored
* @return true if application completed successfully
* @throws org.apache.hadoop.yarn.exceptions.YarnException
* @throws java.io.IOException
*/
private boolean monitorApplication(ApplicationId appId)
throws YarnException, IOException {
// application 끝날 때 까지 무한 루프.
while (true) {
// Check app status every 1 second.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.err.println("Thread sleep in monitoring loop interrupted");
}
// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
System.out.println("Application has completed successfully. "
+ " Breaking monitoring loop : ApplicationId:" + appId.getId());
return true;
} else {
System.out.println("Application did finished unsuccessfully."
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop : ApplicationId:" + appId.getId());
return false;
}
} else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
System.out.println("Application did not finish."
+ " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop : ApplicationId:" + appId.getId());
return false;
}
// (시작된 시간 + 타임아웃된 시간)이 (현재시간)보다 크다면 타임아웃이 지난 것
if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
System.out.println("Reached client specified timeout for application. Killing application"
+ ". Breaking monitoring loop : ApplicationId:" + appId.getId());
forceKillApplication(appId);
return false;
}
}
}
/**
* Kill a submitted application by sending a call to the ASM
* @param appId Application Id to be killed.
* @throws org.apache.hadoop.yarn.exceptions.YarnException
* @throws java.io.IOException
*/
private void forceKillApplication(ApplicationId appId)
throws YarnException, IOException {
yarnClient.killApplication(appId);
}
/**
* @param args Command line arguments
*/
public static void main(String[] args) {
boolean result = false;
try {
MyClient client = new MyClient();
System.out.println("Initializing Client");
try {
boolean doRun = client.init(args);
if (!doRun) {
System.exit(0);
}
} catch (IllegalArgumentException e) {
System.err.println(e.getLocalizedMessage());
client.printUsage();
System.exit(-1);
}
result = client.run();
} catch (Throwable t) {
System.err.println("Error running CLient\n" + t.getMessage() + Arrays.toString(t.getStackTrace()));
System.exit(1);
}
if (result) {
System.out.println("Application completed successfully");
System.exit(0);
}
System.err.println("Application failed to complete successfully");
System.exit(2);
}
}
package de.example.hadoop.yarn;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.LogManager;
public class MyApplicationMaster {
// Application Attempt Id ( combination of attemptId and fail count )
protected ApplicationAttemptId appAttemptID;
// No. of containers to run shell command on
private int numTotalContainers = 1;
// Memory to request for the container on which the shell command will run
private int containerMemory = 10;
// VirtualCores to request for the container on which the shell command will run
private int containerVirtualCores = 1;
// Priority of the request
private int requestPriority;
// Location of shell script ( obtained from info set in env )
// Shell script path in fs
private String appJarPath = "";
// Timestamp needed for creating a local resource
private long appJarTimestamp = 0;
// File length needed for local resource
private long appJarPathLen = 0;
// Configuration
private Configuration conf;
public MyApplicationMaster() {
// Set up the configuration
conf = new YarnConfiguration();
}
/**
* Parse command line options
*
* @param args Command line args
* @return Whether init successful and run should be invoked
* @throws org.apache.commons.cli.ParseException
* @throws java.io.IOException
*/
public boolean init(String[] args) throws Exception {
Options opts = new Options();
opts.addOption("app_attempt_id", true,
"App Attempt ID. Not to be used unless for testing purposes");
opts.addOption("shell_env", true,
"Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("container_memory", true,
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true,
"Amount of virtual cores to be requested to run the shell command");
opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
Map<String, String> envs = System.getenv();
if (!envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.name())) {
if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ApplicationAttemptId.fromString(appIdStr);
} else {
throw new IllegalArgumentException(
"Application Attempt Id not set in the environment");
}
} else {
ContainerId containerId = ContainerId.fromString(
envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()));
appAttemptID = containerId.getApplicationAttemptId();
}
if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
throw new RuntimeException(
ApplicationConstants.APP_SUBMIT_TIME_ENV + " not set in the environment");
}
if (!envs.containsKey(ApplicationConstants.Environment.NM_HOST.name())) {
throw new RuntimeException(
ApplicationConstants.Environment.NM_HOST.name() + " not set in the environment");
}
if (!envs.containsKey(ApplicationConstants.Environment.NM_HTTP_PORT.name())) {
throw new RuntimeException(
ApplicationConstants.Environment.NM_HTTP_PORT + " not set in the environment");
}
if (!envs.containsKey(ApplicationConstants.Environment.NM_PORT.name())) {
throw new RuntimeException(
ApplicationConstants.Environment.NM_PORT.name() + " not set in the environment");
}
if (envs.containsKey(Constants.AM_JAR_PATH)) {
appJarPath = envs.get(Constants.AM_JAR_PATH);
if (envs.containsKey(Constants.AM_JAR_TIMESTAMP)) {
appJarTimestamp = Long.valueOf(envs.get(Constants.AM_JAR_TIMESTAMP));
}
if (envs.containsKey(Constants.AM_JAR_LENGTH)) {
appJarPathLen = Long.valueOf(envs.get(Constants.AM_JAR_LENGTH));
}
if (!appJarPath.isEmpty() && (appJarTimestamp <= 0 || appJarPathLen <= 0)) {
System.err.println("Illegal values in env for shell script path" + ", path="
+ appJarPath + ", len=" + appJarPathLen + ", timestamp=" + appJarTimestamp);
throw new IllegalArgumentException(
"Illegal values in env for shell script path");
}
}
System.out.println("Application master for app" + ", appId="
+ appAttemptID.getApplicationId().getId() + ", clusterTimestamp="
+ appAttemptID.getApplicationId().getClusterTimestamp()
+ ", attemptId=" + appAttemptID.getAttemptId());
containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
if (numTotalContainers == 0) {
throw new IllegalArgumentException("Cannot run MyAppliCationMaster with no containers");
}
requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
return true;
}
/**
* Main run function for the application master
*
* @throws org.apache.hadoop.yarn.exceptions.YarnException
* @throws java.io.IOException
*/
@SuppressWarnings({ "unchecked" })
public void run() throws Exception {
System.out.println("Running MyApplicationMaster");
// Initialize clients to ResourceManager and NodeManagers
AMRMClient<ContainerRequest> amRMClient = AMRMClient.createAMRMClient();
amRMClient.init(conf);
amRMClient.start();
// Register with ResourceManager
amRMClient.registerApplicationMaster("", 0, "");
// Set up resource type requirements for Container
Resource capability = Records.newRecord(Resource.class);
capability.setMemorySize(containerMemory);
capability.setVirtualCores(containerVirtualCores);
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(requestPriority);
// Make container requests to ResourceManager
for (int i = 0; i < numTotalContainers; ++i) {
ContainerRequest containerAsk = new ContainerRequest(capability, null, null, priority);
amRMClient.addContainerRequest(containerAsk);
}
NMClient nmClient = NMClient.createNMClient();
nmClient.init(conf);
nmClient.start();
// Setup CLASSPATH for Container
Map<String, String> containerEnv = new HashMap<String, String>();
containerEnv.put("CLASSPATH", "./*");
// Setup ApplicationMaster jar file for Container
LocalResource appMasterJar = createAppMasterJar();
// Obtain allocated containers and launch
int allocatedContainers = 0;
// We need to start counting completed containers while still allocating
// them since initial ones may complete while we're allocating subsequent
// containers and if we miss those notifications, we'll never see them again
// and this ApplicationMaster will hang indefinitely.
int completedContainers = 0;
// 컨테이너를 다 할당 받을 때까지 반복을 함
while (allocatedContainers < numTotalContainers) {
AllocateResponse response = amRMClient.allocate(0);
for (Container container : response.getAllocatedContainers()) {
allocatedContainers++;
ContainerLaunchContext appContainer = createContainerLaunchContext(appMasterJar, containerEnv);
System.out.println("Launching container " + container.getId().toString());
System.out.println("Allocated containers " + allocatedContainers);
nmClient.startContainer(container, appContainer);
}
for (ContainerStatus status : response.getCompletedContainersStatuses()) {
++completedContainers;
System.out.println("ContainerID:" + status.getContainerId() + ", state:" + status.getState().name());
}
// 무한루프다 보니 어플리케이션 마스터가 리소스 매니저나 노드매니저한테 엄청 요청을 많이 보냄. 그러면 시스템에 부하가 오니까 sleep 넣어줌.
Thread.sleep(100);
}
// 다 끝날 때까지 반복.
// Now wait for the remaining containers to complete
while (completedContainers < numTotalContainers) {
AllocateResponse response = amRMClient.allocate(completedContainers / numTotalContainers);
for (ContainerStatus status : response.getCompletedContainersStatuses()) {
++completedContainers;
System.out.println("ContainerID:" + status.getContainerId() + ", state:" + status.getState().name());
}
Thread.sleep(100);
}
System.out.println("Completed containers:" + completedContainers);
// Un-register with ResourceManager
amRMClient.unregisterApplicationMaster(
FinalApplicationStatus.SUCCEEDED, "", "");
System.out.println("Finished MyApplicationMaster");
}
private LocalResource createAppMasterJar() throws IOException {
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
if (!appJarPath.isEmpty()) {
appMasterJar.setType(LocalResourceType.FILE);
Path jarPath = new Path(appJarPath);
jarPath = FileSystem.get(conf).makeQualified(jarPath);
appMasterJar.setResource(URL.fromPath(jarPath));
appMasterJar.setTimestamp(appJarTimestamp);
appMasterJar.setSize(appJarPathLen);
appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
}
return appMasterJar;
}
/**
* Launch container by create ContainerLaunchContext
*
* @param appMasterJar
* @param containerEnv
* @return
*/
private ContainerLaunchContext createContainerLaunchContext(LocalResource appMasterJar,
Map<String, String> containerEnv) {
ContainerLaunchContext appContainer =
Records.newRecord(ContainerLaunchContext.class);
appContainer.setLocalResources(
Collections.singletonMap(Constants.AM_JAR_NAME, appMasterJar));
appContainer.setEnvironment(containerEnv);
// 컨테이너 안에서 실행될 어플리케이션도 하나의 어플리케이션이니까 자바 어플리케이션을 실행할 옵션을 커멘드에다가 세팅
// 최대 메모리 128M, HelloYarn 클래스가 메인이다
appContainer.setCommands(
Collections.singletonList(
"$JAVA_HOME/bin/java" +
" -Xmx128M" +
" de.example.hadoop.yarn.HelloYarn" +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
)
);
return appContainer;
}
// 여기에 있는 main이 컨테이너에 이 친구가 할당 받아서 Jar의 main class 까지 찾았을 때, 실행되는 함수.
public static void main(String[] args) throws Exception {
try {
MyApplicationMaster appMaster = new MyApplicationMaster();
System.out.println("Initializing MyApplicationMaster");
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
}
appMaster.run();
} catch (Throwable t) {
System.err.println("Error running MyApplicationMaster\n" + t.getMessage() + Arrays.toString(t.getStackTrace()));
LogManager.shutdown();
ExitUtil.terminate(1, t);
}
}
}
package de.example.hadoop.yarn;
public class HelloYarn {
private static final long MEGABYTE = 1024L * 1024L;
public HelloYarn() {
System.out.println("HelloYarn!");
}
public static long bytesToMegabytes(long bytes) {
return bytes / MEGABYTE;
}
public void printMemoryStats() {
long freeMemory = bytesToMegabytes(Runtime.getRuntime().freeMemory());
long totalMemory = bytesToMegabytes(Runtime.getRuntime().totalMemory());
long maxMemory = bytesToMegabytes(Runtime.getRuntime().maxMemory());
System.out.println("The amount of free memory in the Java Virtual Machine: " + freeMemory);
System.out.println("The total amount of memory in the Java virtual machine: " + totalMemory);
System.out.println("The maximum amount of memory that the Java virtual machine: " + maxMemory);
}
public static void main(String[] args) {
HelloYarn helloYarn = new HelloYarn();
helloYarn.printMemoryStats();
}
}
log4j.rootLogger=INFO, consoleAppender
log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.Threshold=All
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%d] [%-5p] %c %x - %m%n
build 를 수행한 뒤, jar 를 EMR primary node 로 scp 를 통해 복사한다.
scp -i $your_key $your_jar_path hadoop@$your_emr_primary_ec2_public_dns:~/example/jars/.
EMR Primary node 에 접속한 후 다음 명령어를 수행한다.
yarn jar $your_jar $yarn_client_mainclass $args
yarn jar hadoop-yarn-app-1.0-SNAPSHOT.jar de.example.hadoop.yarn.MyClient -jar hadoop-yarn-app-1.0-SNAPSHOT.jar
편의를 위해 핵심 내용은 Yarn Client, Application Master 로그에서 모두 확인할 수 있도록 하는 것이 좋다.
ApplicationMaster 까지의 로그는 webui 에서 확인이 가능하다.
EMR 기본 설치의 경우
/var/log/hadoop-yarn/apps/hadoop/logs-tfile/
경로에서 applicationid 별로 확인이 가능하다./var/log/hadoop-yarn/containers/
에서 applicationid 별로 디렉토리가 생성된다. 그 하위에 container 별 로그 디렉토리가 생긴다. 이 경로가 *ApplicationConstants*.***LOG_DIR_EXPANSION_VAR***
이다. 이 하위에 자세한 로그가 파일별로 남는다.