Wednesday, April 22, 2015

Access AWS DynamoDB Using Presto SQL Like Query

Presto is a good tool from Facebook to query multiple data sources. It supports multiple plugins (called "Connector") to talk to the physical data source like database or Apache Cassandra. But it lacks of support for AWS Dynamo DB.

I am developing a plugin for DynamoDB Connector. It is still in very early stage, and I hope this blog can get more people know about it and join the development work together with me.

Steps to use the DynamoDB Connector.

1. Download Source Code for DynamoDB Connector and Build the code

Go to my github https://github.com/Zutai/presto. It contains the full source code of Presto, and a new presto-dynamo child directory which contains my code for DynamoDB.

Switch to branch "presto_0.100". This is a MUST for now, since my code is based on Presto 0.100 and is only buildable with Presto 0.100. Thus I created that branch. The master branch in my repository is not buildable in current stage.

Download the source code from presto_0.100 brach. Go to presto-dynamo directory. Run following command to build:
  mvn compile package -DskipTests

After build, you will see file "presto-dynamo-0.100.zip" in the target directory. Unzip the zip file, and you will see multiple jar files inside. These jar files are all needed.

2. Deploy Dynamo Connector to Presto and Configure DynamoDB Metadata File

In Presto server, create a new directory "dynamo" under the plugin directory. Copy all the jar files inside the zip file in previous step to the "dynamo" directory.

Create a catalog properties file for Dynamo under director: etc/catalog/dynamo.properties with following content:

connector.name=dynamo
dynamo.metadata-file=/home/YourUser/presto-dynamo-metadata.json

The metadata file "/home/YourUser/presto-dynamo-metadata.json" will contain the metadata for your DynamoDB, for example, table name, column names and types.

Following is an example of presto-dynamo-metadata.json:

{
    "tables": [
        {
            "columns": [
                {
                    "columnName": "UserId",
                    "columnType": "STRING",
                    "typeArguments": []
                },
                {
                    "columnName": "Version",
                    "columnType": "LONG",
                    "typeArguments": []
                }
            ],
            "region": "us_west_2",
            "tableName": "Users"
        },
        {
            "columns": [
                {
                    "columnName": "BookName",
                    "columnType": "STRING",
                    "typeArguments": []
                },
                {
                    "columnName": "Writers",
                    "columnType": "LIST",
                    "typeArguments": [
                        "STRING"
                    ]
                }
            ],
            "region": "us_west_2",
            "tableName": "Books"
        }
    ]
}

Now start Presto server, for example:

  [PrestoDirectory]/bin/launcher run

3. Run Presto Command Line to Query DynamoDB

Download  Follow this link to download the command line tool (presto-cli-0.100-executable.jar), rename it to presto and make it executable (chmod +x).

Then run following command to start it with dynamo catalog and us_west_2 as schema.

./presto --server localhost:8080 --catalog dynamo --schema us_west_2

The AWS region is mapped to schema. Currently only us_west_2 is supported.

Then you can run query. For example, I previously manually created Users table in AWS DynamoDB, so I can run following command to query the table:

  select * from Users;

You will see result like following:

presto:us_west_2> select * from users;
  UserId  | Version
----------+---------
 001       | NULL    
(1 rows)


Note: current Dynamo Connector implementation always does full table scan. More work needs to be done to enable query by key or index.

[To Be Added More Content]

Wednesday, September 10, 2014

In-Process Application Master Container Executor for Hadoop YARN

Hadoop YARN is a great generic framework for job scheduling and dispatching. It creates a new process (Application Master) for each job, and the Application Master will creates other application containers to execute the job. This new process creation slows down the job dispatching when there is a large volume of jobs or people want fast response time for each job.

Another scenario is that the system has a long running job execution process, which accepts job (e.g. via JSON job definition) and executes job inside its own process. The long running job execution process is heavy, and it is not feasible for YARN to launch each such new process for each job. Thus there is also need for YARN to run a small in memory application inside Node Manager process, to forward the request to the long running job execution process.

A possible solution is to create a custom Container Executor implementation, and configure YARN to use that one instead of the default container executor. We call that In Process Container Executor.

Following is a rough example. It creates and finishes the application master inside same process. For demo purpose, the custom Container Executor codes read "sleepMilliseconds" from the application's environment variable, sleep for that amount of time, and then finish.

The code example works with Hadoop 2.4 and 2.5. But there might be some other issue, e.g. application life cycle management. Please feel free to provide your comments.


package service;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;

public class InProcessDummyContainerExecutor extends ContainerExecutor {

 private static final Log LOG = LogFactory
   .getLog(InProcessDummyContainerExecutor.class);

 private final FileContext lfs;

 public InProcessDummyContainerExecutor() {
  try {
   this.lfs = FileContext.getLocalFSFileContext();
  } catch (UnsupportedFileSystemException e) {
   throw new RuntimeException(e);
  }
 }

 @Override
 public void init() throws IOException {
 }

 @Override
 public void startLocalizer(Path nmPrivateContainerTokens,
   InetSocketAddress nmAddr, String user, String appId, String locId,
   List localDirs, List logDirs) throws IOException,
   InterruptedException {
 }

 @Override
 public int launchContainer(Container container,
   Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
   String userName, String appId, Path containerWorkDir,
   List localDirs, List logDirs) throws IOException {

  LOG.info("Use InProcessContainerExecutor: "
    + nmPrivateContainerScriptPath + ", " + nmPrivateTokensPath
    + ", " + containerWorkDir);

  Map env = container.getLaunchContext().getEnvironment();
  LOG.info("ENV: " + container.getLaunchContext().getEnvironment());

  // Here we create a dummy in-process application which sleeps for the given duration
  final String sleepMillisecondsStr = env.get("sleepMilliseconds");

  if (sleepMillisecondsStr == null || sleepMillisecondsStr.isEmpty()) {
   LOG.warn("Did not find sleepMilliseconds in environment when trying to launch container "
     + container.getContainerId());
   return -1;
  }

  // Have to do following to run code inside a UserGroupInformation context, otherwise, I got error:
  // SIMPLE authentication is not enabled. It seems related to authentication configuration.
  
  final ApplicationInfo appInfo = this.buildApplicationInfo(container,
    nmPrivateTokensPath, userName, containerWorkDir);

  UserGroupInformation ugi = appInfo.createUgi();

  final int sleepMilliseconds = Integer.parseInt(sleepMillisecondsStr);
    
  return ugi.doAs(new PrivilegedAction() {
   @Override
   public Integer run() {
    try {
     InProcessDummyContainerExecutor.this
       .doInProcessWork(sleepMilliseconds);
     return 0;
    } catch (Exception ex) {
     LOG.warn("Failed to do in-process work", ex);
     return -1;
    }
   }
  });
 }

 @Override
 public String getProcessId(ContainerId containerID) {
  return null;
 }

 @Override
 public boolean signalContainer(String user, String pid, Signal signal)
   throws IOException {
  return true;
 }

 @Override
 public void deleteAsUser(String user, Path subDir, Path... baseDirs)
   throws IOException, InterruptedException {
  if (baseDirs == null || baseDirs.length == 0) {
   LOG.info("Deleting absolute path : " + subDir);
   if (!lfs.delete(subDir, true)) {
    // Maybe retry
    LOG.warn("delete returned false for path: [" + subDir + "]");
   }
   return;
  }
  for (Path baseDir : baseDirs) {
   Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
   LOG.info("Deleting path : " + del);
   if (!lfs.delete(del, true)) {
    LOG.warn("delete returned false for path: [" + del + "]");
   }
  }
 }

 @Override
 public void activateContainer(ContainerId containerId, Path pidFilePath) {
 }

 @Override
 public void deactivateContainer(ContainerId containerId) {
 }

 @Override
 protected boolean isContainerActive(ContainerId containerId) {
  return true;
 }

 private ApplicationInfo buildApplicationInfo(Container container,
   final Path nmPrivateTokensPath, final String userName,
   final Path containerWorkDir) {
  ApplicationInfo app = new ApplicationInfo(userName);

  app.setConfiguration(this.getConf());

  try {
   String filePath = new Path(containerWorkDir,
     ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE).toString();

   LOG.info("nmPrivateTokensPath: " + nmPrivateTokensPath);
   LOG.info("ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE: " + filePath);

   Credentials credentials = Credentials.readTokenStorageFile(
     new File(nmPrivateTokensPath.toString()), this.getConf());

   app.getTokens().addAll(credentials.getAllTokens());
  }

  catch (Exception e) {
   throw new IllegalArgumentException(
     "Failed to build UserGroupInformation to launch container "
       + container, e);
  }

  return app;
 }
 
 private void doInProcessWork(int sleepMilliseconds) {
  Configuration conf = this.getConf();

  LOG.info("Starting resource manager client");
  AMRMClient rmClient = AMRMClient.createAMRMClient();
  rmClient.init(conf);
  rmClient.start();

  try {

   LOG.info("Registering application master");
   rmClient.registerApplicationMaster("", 0, "");

   LOG.info("Sleeping sleepMilliseconds: " + sleepMilliseconds);
   
   try {
    Thread.sleep(sleepMilliseconds);
   } catch (InterruptedException e) {
    LOG.info("Got exception in thread sleep", e);
   };
   
   LOG.info("Unregistering application master");
   rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
  } catch (YarnException | IOException e) {
   LOG.warn("Failed to run in-process work", e);

   try {
    rmClient.unregisterApplicationMaster(
      FinalApplicationStatus.FAILED, "", "");
   } catch (Exception e1) {
    LOG.warn("Failed to unregisterApplicationMaster", e1);
   }
  } finally {
   try {
    rmClient.close();
   } catch (IOException e) {
    LOG.warn("Failed to close rmClient", e);
   }
  }
 }

}

Thursday, September 4, 2014

Submit Application To Hadoop YARN Using C#

Hadoop 2.5.0 introduces new REST API for YARN Resource Manager to submit applications. Now we can utilize that to submit applications to YARN using non-Java programming languages, e.g. C#.

Following is an example. In my future post, I may write how to customize YARN Container Executor to execute your own logic to create application, for example, creating in-process application which runs inside Resource Manager process rather than a new process. Let's put that aside for now, and see how to submit application in C#.

Define JSON Classes for YARN REST API

YARN REST API supports JSON or XML request. Here we create several classes which are used to serialize JSON request. We use Newtonsoft JSON library.


    #region Classes for JSON serialization

    public class SubmitApplicationRequest
    {
        [JsonProperty("application-id")]
        public string ApplicationId { get; set; }

        [JsonProperty("application-name")]
        public string ApplicationName { get; set; }

        [JsonProperty("am-container-spec")]
        public AMContainerSpec AMContainerSpec { get; set; }

        [JsonProperty("unmanaged-AM")]
        public string UnmanagedAM { get; set; }

        [JsonProperty("max-app-attempts")]
        public string MaxAppAttempts { get; set; }

        [JsonProperty("resource")]
        public Resource Resource { get; set; }

        [JsonProperty("application-type")]
        public string ApplicationType { get; set; }

        [JsonProperty("keep-containers-across-application-attempts")]
        public string KeepContainersAcrossApplicationAttempts { get; set; }
    }

    public class AMContainerSpec
    {
        [JsonProperty("local-resources")]
        public LocalResources LocalResources { get; set; }

        [JsonProperty("commands")]
        public Commands Commands { get; set; }
    }

    public class LocalResources
    {
        [JsonProperty("entry")]
        public LocalResourceEntry[] Entry { get; set; }
    }

    public class LocalResourceEntry
    {
        [JsonProperty("key")]
        public string Key { get; set; }

        [JsonProperty("value")]
        public LocalResourceEntryValue Value { get; set; }
    }

    public class LocalResourceEntryValue
    {
        [JsonProperty("resource")]
        public string Resource { get; set; }

        [JsonProperty("type")]
        public string Type { get; set; }

        [JsonProperty("visibility")]
        public string Visibility { get; set; }

        [JsonProperty("size")]
        public string Size { get; set; }

        [JsonProperty("timestamp")]
        public string Timestamp { get; set; }
    }

    public class Commands
    {
        [JsonProperty("command")]
        public string Command { get; set; }
    }

    public class Resource
    {
        [JsonProperty("memory")]
        public string Memory { get; set; }

        [JsonProperty("vCores")]
        public string VCores { get; set; }
    }

    #endregion


Codes to Submit Application

The default REST service url is like http://localhost:8088/ws/v1/cluster/... It accepts POST or PUT on different APIs. We can use WebClient class in C# to either post data or put data to the server side.

Below are the sample code. It submit an application to launch notepad.exe.



class Program
    {
        public const string NewApplicationUrl = "http://localhost:8088/ws/v1/cluster/apps/new-application";
        public const string SubmitApplicationUrl = "http://localhost:8088/ws/v1/cluster/apps";
        public const string ApplicationStateUrl = "http://localhost:8088/ws/v1/cluster/apps/{0}/state";

        static void Main(string[] args)
        {
            var submitApplicationRequest = new SubmitApplicationRequest()
            {
                ApplicationId = "", // will assign real application id later
                ApplicationName = "TestApp",

                AMContainerSpec = new AMContainerSpec()
                {
                    Commands = new Commands()
                    {
                        Command = "notepad.exe"
                    }
                },

                UnmanagedAM = "false",
                MaxAppAttempts = "2",

                Resource = new Resource()
                {
                    Memory = "1024",
                    VCores = "1"
                },

                ApplicationType = "YARN",
                KeepContainersAcrossApplicationAttempts = "false"
            };

            using (WebClient client = new WebClient())
            {
                client.Headers[HttpRequestHeader.Accept] = "application/json";
                client.Headers[HttpRequestHeader.ContentType] = "application/json";

                byte[] responseBytes = client.UploadData(NewApplicationUrl, "POST", Encoding.UTF8.GetBytes(""));
                string responseString = Encoding.UTF8.GetString(responseBytes);
                Console.WriteLine(responseString);

                var json = (JObject)JsonConvert.DeserializeObject(responseString);

                var applicationId = (string)json["application-id"];

                submitApplicationRequest.ApplicationId = applicationId;
                string requestString = JsonConvert.SerializeObject(submitApplicationRequest);

                client.Headers[HttpRequestHeader.Accept] = "application/json";
                client.Headers[HttpRequestHeader.ContentType] = "application/json";

                responseBytes = client.UploadData(SubmitApplicationUrl, "POST", Encoding.UTF8.GetBytes(requestString));
                responseString = Encoding.UTF8.GetString(responseBytes);
                Console.WriteLine(responseString);
            }
        }
    }

Friday, August 29, 2014

Enable file logging for Hadoop YARN Resource Manager and Node Manager

There is issue with file logging when running Hadoop yarn.cmd on Windows platform. This happens to both Hadoop 2.4.0 and 2.5.0. By default, you will not see log files for Resource Manager and Node Manager. It is caused by wrong configure of log4j.properties path in yarn.cmd. Following steps tell how to fix it (use Hadoop 2.5.0 for example, should be same for 2.4.0).

Update hadoop-2.5.0\bin\yarn.cmd: enable log4j debugging, so you can know what is happening with log4j.

set YARN_OPTS=%YARN_OPTS% -Dlog4j.debug=true

set java_arguments=%JAVA_HEAP_MAX% %YARN_OPTS% -classpath %CLASSPATH% %CLASS% %yarn-command-arguments%

Update hadoop-2.5.0\bin\yarn.cmd: remove log4j.properties in the end of "%YARN_CONF_DIR%\rm-config\log4j.properties", and move it to the beginning of classpath. The reason is that we need to specify the directory of log4j.properties file rather than its full path. Also moving it to be begining of classpath will make sure it will be loaded first.

:resourcemanager
 set CLASSPATH=%YARN_CONF_DIR%\rm-config;%CLASSPATH%
 set CLASS=org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
 set YARN_OPTS=%YARN_OPTS% %YARN_RESOURCEMANAGER_OPTS%
 if defined YARN_RESOURCEMANAGER_HEAPSIZE (
    set JAVA_HEAP_MAX=-Xmx%YARN_RESOURCEMANAGER_HEAPSIZE%m
 )
 goto :eof

Create file etc\hadoop\rm-config\log4j.properties:

log4j.rootLogger=info, stdout, R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
# Pattern to output the caller's file name and line number.
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%t] (%F:%L) - %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=c:/app/hadoop-2.5.0/logs/ResourceManager.log
log4j.appender.R.MaxFileSize=20MB
# Keep one backup file
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout

log4j.appender.R.layout.ConversionPattern=%d %p %t %c - %m%n


Now launch YARN, and you will see the log files as configured in log4j.properties file.