Another scenario is that the system has a long running job execution process, which accepts job (e.g. via JSON job definition) and executes job inside its own process. The long running job execution process is heavy, and it is not feasible for YARN to launch each such new process for each job. Thus there is also need for YARN to run a small in memory application inside Node Manager process, to forward the request to the long running job execution process.
A possible solution is to create a custom Container Executor implementation, and configure YARN to use that one instead of the default container executor. We call that In Process Container Executor.
Following is a rough example. It creates and finishes the application master inside same process. For demo purpose, the custom Container Executor codes read "sleepMilliseconds" from the application's environment variable, sleep for that amount of time, and then finish.
The code example works with Hadoop 2.4 and 2.5. But there might be some other issue, e.g. application life cycle management. Please feel free to provide your comments.
package service; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; import java.security.PrivilegedAction; import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; public class InProcessDummyContainerExecutor extends ContainerExecutor { private static final Log LOG = LogFactory .getLog(InProcessDummyContainerExecutor.class); private final FileContext lfs; public InProcessDummyContainerExecutor() { try { this.lfs = FileContext.getLocalFSFileContext(); } catch (UnsupportedFileSystemException e) { throw new RuntimeException(e); } } @Override public void init() throws IOException { } @Override public void startLocalizer(Path nmPrivateContainerTokens, InetSocketAddress nmAddr, String user, String appId, String locId, ListlocalDirs, List logDirs) throws IOException, InterruptedException { } @Override public int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName, String appId, Path containerWorkDir, List localDirs, List logDirs) throws IOException { LOG.info("Use InProcessContainerExecutor: " + nmPrivateContainerScriptPath + ", " + nmPrivateTokensPath + ", " + containerWorkDir); Map env = container.getLaunchContext().getEnvironment(); LOG.info("ENV: " + container.getLaunchContext().getEnvironment()); // Here we create a dummy in-process application which sleeps for the given duration final String sleepMillisecondsStr = env.get("sleepMilliseconds"); if (sleepMillisecondsStr == null || sleepMillisecondsStr.isEmpty()) { LOG.warn("Did not find sleepMilliseconds in environment when trying to launch container " + container.getContainerId()); return -1; } // Have to do following to run code inside a UserGroupInformation context, otherwise, I got error: // SIMPLE authentication is not enabled. It seems related to authentication configuration. final ApplicationInfo appInfo = this.buildApplicationInfo(container, nmPrivateTokensPath, userName, containerWorkDir); UserGroupInformation ugi = appInfo.createUgi(); final int sleepMilliseconds = Integer.parseInt(sleepMillisecondsStr); return ugi.doAs(new PrivilegedAction () { @Override public Integer run() { try { InProcessDummyContainerExecutor.this .doInProcessWork(sleepMilliseconds); return 0; } catch (Exception ex) { LOG.warn("Failed to do in-process work", ex); return -1; } } }); } @Override public String getProcessId(ContainerId containerID) { return null; } @Override public boolean signalContainer(String user, String pid, Signal signal) throws IOException { return true; } @Override public void deleteAsUser(String user, Path subDir, Path... baseDirs) throws IOException, InterruptedException { if (baseDirs == null || baseDirs.length == 0) { LOG.info("Deleting absolute path : " + subDir); if (!lfs.delete(subDir, true)) { // Maybe retry LOG.warn("delete returned false for path: [" + subDir + "]"); } return; } for (Path baseDir : baseDirs) { Path del = subDir == null ? baseDir : new Path(baseDir, subDir); LOG.info("Deleting path : " + del); if (!lfs.delete(del, true)) { LOG.warn("delete returned false for path: [" + del + "]"); } } } @Override public void activateContainer(ContainerId containerId, Path pidFilePath) { } @Override public void deactivateContainer(ContainerId containerId) { } @Override protected boolean isContainerActive(ContainerId containerId) { return true; } private ApplicationInfo buildApplicationInfo(Container container, final Path nmPrivateTokensPath, final String userName, final Path containerWorkDir) { ApplicationInfo app = new ApplicationInfo(userName); app.setConfiguration(this.getConf()); try { String filePath = new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE).toString(); LOG.info("nmPrivateTokensPath: " + nmPrivateTokensPath); LOG.info("ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE: " + filePath); Credentials credentials = Credentials.readTokenStorageFile( new File(nmPrivateTokensPath.toString()), this.getConf()); app.getTokens().addAll(credentials.getAllTokens()); } catch (Exception e) { throw new IllegalArgumentException( "Failed to build UserGroupInformation to launch container " + container, e); } return app; } private void doInProcessWork(int sleepMilliseconds) { Configuration conf = this.getConf(); LOG.info("Starting resource manager client"); AMRMClient rmClient = AMRMClient.createAMRMClient(); rmClient.init(conf); rmClient.start(); try { LOG.info("Registering application master"); rmClient.registerApplicationMaster("", 0, ""); LOG.info("Sleeping sleepMilliseconds: " + sleepMilliseconds); try { Thread.sleep(sleepMilliseconds); } catch (InterruptedException e) { LOG.info("Got exception in thread sleep", e); }; LOG.info("Unregistering application master"); rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", ""); } catch (YarnException | IOException e) { LOG.warn("Failed to run in-process work", e); try { rmClient.unregisterApplicationMaster( FinalApplicationStatus.FAILED, "", ""); } catch (Exception e1) { LOG.warn("Failed to unregisterApplicationMaster", e1); } } finally { try { rmClient.close(); } catch (IOException e) { LOG.warn("Failed to close rmClient", e); } } } }
wb board hs result, telangana results, icse 10th result, icse 12th result, 10th results, 12th results, ssc results, sslc results
ReplyDeleterajasthan board 10th result, rajasthan board 12th result, tnresults.nic.in, tamilnadu sslc results, tamilnadu hsc results, upresults.nic.in, up board 10th result, up intermediate result
ReplyDeletehbse 12th result, hpresults.nic.in, hp 10th result, hpbose 12th result, jkbose board, jammu board 10th result, jammu board 12th results
ReplyDeletecgbse 12th result, gseb.org, gseb ssc result, gujarat board 12th result, goaresults.nic.in, goa board ssc result, goa board hssc results
ReplyDeleteicse 12th result, bseap.org, ap 10th results, ap intermediate results, biharboard.ac.in, bseb matric result, bseb inter result
ReplyDeleteintermediate result, sslc result, ssc result, 12th result, http://www.resultsnic2017.in/, cbseresults.nic.in, cbse 10th result, cbse 12th result
ReplyDeleteNice post. It is really interesting. Thanks for sharing the post!
ReplyDeleteWeb Design Company In India