Wednesday, September 10, 2014

In-Process Application Master Container Executor for Hadoop YARN

Hadoop YARN is a great generic framework for job scheduling and dispatching. It creates a new process (Application Master) for each job, and the Application Master will creates other application containers to execute the job. This new process creation slows down the job dispatching when there is a large volume of jobs or people want fast response time for each job.

Another scenario is that the system has a long running job execution process, which accepts job (e.g. via JSON job definition) and executes job inside its own process. The long running job execution process is heavy, and it is not feasible for YARN to launch each such new process for each job. Thus there is also need for YARN to run a small in memory application inside Node Manager process, to forward the request to the long running job execution process.

A possible solution is to create a custom Container Executor implementation, and configure YARN to use that one instead of the default container executor. We call that In Process Container Executor.

Following is a rough example. It creates and finishes the application master inside same process. For demo purpose, the custom Container Executor codes read "sleepMilliseconds" from the application's environment variable, sleep for that amount of time, and then finish.

The code example works with Hadoop 2.4 and 2.5. But there might be some other issue, e.g. application life cycle management. Please feel free to provide your comments.


package service;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;

public class InProcessDummyContainerExecutor extends ContainerExecutor {

 private static final Log LOG = LogFactory
   .getLog(InProcessDummyContainerExecutor.class);

 private final FileContext lfs;

 public InProcessDummyContainerExecutor() {
  try {
   this.lfs = FileContext.getLocalFSFileContext();
  } catch (UnsupportedFileSystemException e) {
   throw new RuntimeException(e);
  }
 }

 @Override
 public void init() throws IOException {
 }

 @Override
 public void startLocalizer(Path nmPrivateContainerTokens,
   InetSocketAddress nmAddr, String user, String appId, String locId,
   List localDirs, List logDirs) throws IOException,
   InterruptedException {
 }

 @Override
 public int launchContainer(Container container,
   Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
   String userName, String appId, Path containerWorkDir,
   List localDirs, List logDirs) throws IOException {

  LOG.info("Use InProcessContainerExecutor: "
    + nmPrivateContainerScriptPath + ", " + nmPrivateTokensPath
    + ", " + containerWorkDir);

  Map env = container.getLaunchContext().getEnvironment();
  LOG.info("ENV: " + container.getLaunchContext().getEnvironment());

  // Here we create a dummy in-process application which sleeps for the given duration
  final String sleepMillisecondsStr = env.get("sleepMilliseconds");

  if (sleepMillisecondsStr == null || sleepMillisecondsStr.isEmpty()) {
   LOG.warn("Did not find sleepMilliseconds in environment when trying to launch container "
     + container.getContainerId());
   return -1;
  }

  // Have to do following to run code inside a UserGroupInformation context, otherwise, I got error:
  // SIMPLE authentication is not enabled. It seems related to authentication configuration.
  
  final ApplicationInfo appInfo = this.buildApplicationInfo(container,
    nmPrivateTokensPath, userName, containerWorkDir);

  UserGroupInformation ugi = appInfo.createUgi();

  final int sleepMilliseconds = Integer.parseInt(sleepMillisecondsStr);
    
  return ugi.doAs(new PrivilegedAction() {
   @Override
   public Integer run() {
    try {
     InProcessDummyContainerExecutor.this
       .doInProcessWork(sleepMilliseconds);
     return 0;
    } catch (Exception ex) {
     LOG.warn("Failed to do in-process work", ex);
     return -1;
    }
   }
  });
 }

 @Override
 public String getProcessId(ContainerId containerID) {
  return null;
 }

 @Override
 public boolean signalContainer(String user, String pid, Signal signal)
   throws IOException {
  return true;
 }

 @Override
 public void deleteAsUser(String user, Path subDir, Path... baseDirs)
   throws IOException, InterruptedException {
  if (baseDirs == null || baseDirs.length == 0) {
   LOG.info("Deleting absolute path : " + subDir);
   if (!lfs.delete(subDir, true)) {
    // Maybe retry
    LOG.warn("delete returned false for path: [" + subDir + "]");
   }
   return;
  }
  for (Path baseDir : baseDirs) {
   Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
   LOG.info("Deleting path : " + del);
   if (!lfs.delete(del, true)) {
    LOG.warn("delete returned false for path: [" + del + "]");
   }
  }
 }

 @Override
 public void activateContainer(ContainerId containerId, Path pidFilePath) {
 }

 @Override
 public void deactivateContainer(ContainerId containerId) {
 }

 @Override
 protected boolean isContainerActive(ContainerId containerId) {
  return true;
 }

 private ApplicationInfo buildApplicationInfo(Container container,
   final Path nmPrivateTokensPath, final String userName,
   final Path containerWorkDir) {
  ApplicationInfo app = new ApplicationInfo(userName);

  app.setConfiguration(this.getConf());

  try {
   String filePath = new Path(containerWorkDir,
     ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE).toString();

   LOG.info("nmPrivateTokensPath: " + nmPrivateTokensPath);
   LOG.info("ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE: " + filePath);

   Credentials credentials = Credentials.readTokenStorageFile(
     new File(nmPrivateTokensPath.toString()), this.getConf());

   app.getTokens().addAll(credentials.getAllTokens());
  }

  catch (Exception e) {
   throw new IllegalArgumentException(
     "Failed to build UserGroupInformation to launch container "
       + container, e);
  }

  return app;
 }
 
 private void doInProcessWork(int sleepMilliseconds) {
  Configuration conf = this.getConf();

  LOG.info("Starting resource manager client");
  AMRMClient rmClient = AMRMClient.createAMRMClient();
  rmClient.init(conf);
  rmClient.start();

  try {

   LOG.info("Registering application master");
   rmClient.registerApplicationMaster("", 0, "");

   LOG.info("Sleeping sleepMilliseconds: " + sleepMilliseconds);
   
   try {
    Thread.sleep(sleepMilliseconds);
   } catch (InterruptedException e) {
    LOG.info("Got exception in thread sleep", e);
   };
   
   LOG.info("Unregistering application master");
   rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
  } catch (YarnException | IOException e) {
   LOG.warn("Failed to run in-process work", e);

   try {
    rmClient.unregisterApplicationMaster(
      FinalApplicationStatus.FAILED, "", "");
   } catch (Exception e1) {
    LOG.warn("Failed to unregisterApplicationMaster", e1);
   }
  } finally {
   try {
    rmClient.close();
   } catch (IOException e) {
    LOG.warn("Failed to close rmClient", e);
   }
  }
 }

}

Thursday, September 4, 2014

Submit Application To Hadoop YARN Using C#

Hadoop 2.5.0 introduces new REST API for YARN Resource Manager to submit applications. Now we can utilize that to submit applications to YARN using non-Java programming languages, e.g. C#.

Following is an example. In my future post, I may write how to customize YARN Container Executor to execute your own logic to create application, for example, creating in-process application which runs inside Resource Manager process rather than a new process. Let's put that aside for now, and see how to submit application in C#.

Define JSON Classes for YARN REST API

YARN REST API supports JSON or XML request. Here we create several classes which are used to serialize JSON request. We use Newtonsoft JSON library.


    #region Classes for JSON serialization

    public class SubmitApplicationRequest
    {
        [JsonProperty("application-id")]
        public string ApplicationId { get; set; }

        [JsonProperty("application-name")]
        public string ApplicationName { get; set; }

        [JsonProperty("am-container-spec")]
        public AMContainerSpec AMContainerSpec { get; set; }

        [JsonProperty("unmanaged-AM")]
        public string UnmanagedAM { get; set; }

        [JsonProperty("max-app-attempts")]
        public string MaxAppAttempts { get; set; }

        [JsonProperty("resource")]
        public Resource Resource { get; set; }

        [JsonProperty("application-type")]
        public string ApplicationType { get; set; }

        [JsonProperty("keep-containers-across-application-attempts")]
        public string KeepContainersAcrossApplicationAttempts { get; set; }
    }

    public class AMContainerSpec
    {
        [JsonProperty("local-resources")]
        public LocalResources LocalResources { get; set; }

        [JsonProperty("commands")]
        public Commands Commands { get; set; }
    }

    public class LocalResources
    {
        [JsonProperty("entry")]
        public LocalResourceEntry[] Entry { get; set; }
    }

    public class LocalResourceEntry
    {
        [JsonProperty("key")]
        public string Key { get; set; }

        [JsonProperty("value")]
        public LocalResourceEntryValue Value { get; set; }
    }

    public class LocalResourceEntryValue
    {
        [JsonProperty("resource")]
        public string Resource { get; set; }

        [JsonProperty("type")]
        public string Type { get; set; }

        [JsonProperty("visibility")]
        public string Visibility { get; set; }

        [JsonProperty("size")]
        public string Size { get; set; }

        [JsonProperty("timestamp")]
        public string Timestamp { get; set; }
    }

    public class Commands
    {
        [JsonProperty("command")]
        public string Command { get; set; }
    }

    public class Resource
    {
        [JsonProperty("memory")]
        public string Memory { get; set; }

        [JsonProperty("vCores")]
        public string VCores { get; set; }
    }

    #endregion


Codes to Submit Application

The default REST service url is like http://localhost:8088/ws/v1/cluster/... It accepts POST or PUT on different APIs. We can use WebClient class in C# to either post data or put data to the server side.

Below are the sample code. It submit an application to launch notepad.exe.



class Program
    {
        public const string NewApplicationUrl = "http://localhost:8088/ws/v1/cluster/apps/new-application";
        public const string SubmitApplicationUrl = "http://localhost:8088/ws/v1/cluster/apps";
        public const string ApplicationStateUrl = "http://localhost:8088/ws/v1/cluster/apps/{0}/state";

        static void Main(string[] args)
        {
            var submitApplicationRequest = new SubmitApplicationRequest()
            {
                ApplicationId = "", // will assign real application id later
                ApplicationName = "TestApp",

                AMContainerSpec = new AMContainerSpec()
                {
                    Commands = new Commands()
                    {
                        Command = "notepad.exe"
                    }
                },

                UnmanagedAM = "false",
                MaxAppAttempts = "2",

                Resource = new Resource()
                {
                    Memory = "1024",
                    VCores = "1"
                },

                ApplicationType = "YARN",
                KeepContainersAcrossApplicationAttempts = "false"
            };

            using (WebClient client = new WebClient())
            {
                client.Headers[HttpRequestHeader.Accept] = "application/json";
                client.Headers[HttpRequestHeader.ContentType] = "application/json";

                byte[] responseBytes = client.UploadData(NewApplicationUrl, "POST", Encoding.UTF8.GetBytes(""));
                string responseString = Encoding.UTF8.GetString(responseBytes);
                Console.WriteLine(responseString);

                var json = (JObject)JsonConvert.DeserializeObject(responseString);

                var applicationId = (string)json["application-id"];

                submitApplicationRequest.ApplicationId = applicationId;
                string requestString = JsonConvert.SerializeObject(submitApplicationRequest);

                client.Headers[HttpRequestHeader.Accept] = "application/json";
                client.Headers[HttpRequestHeader.ContentType] = "application/json";

                responseBytes = client.UploadData(SubmitApplicationUrl, "POST", Encoding.UTF8.GetBytes(requestString));
                responseString = Encoding.UTF8.GetString(responseBytes);
                Console.WriteLine(responseString);
            }
        }
    }