DNAnexus Spark Apps

This document aims to describe how to develop Spark apps for Apollo. It is intended for app developers, so readers should be familiar with how to develop apps on DNAnexus.

Overview

Spark apps are special apps that instantiate a fully-managed, on-demand Spark/Hadoop cluster on the DNAnexus platform for big data processing and translational informatics. Spark apps are like regular apps in many ways:

  • they have inputs/outputs
  • their jobs are identified by job ids
  • they are implemented using the familiar DNAnexus app framework

However, Spark apps have access to the data processing capabilities of Spark, which allow them to do distributed data processing in a natural, scalable way, and they can create and query DNAnexus databases using the structured query capabilities of Spark SQL.

Although Spark apps are implemented using the same framework as regular apps, there are some notable differences.

Minimal DNAnexus CLI version

Make sure your dx-toolkit has been updated to the latest release to take advantage of the latest features and improvements. You can download the latest version from the Downloads page.

The app specification

The following things must be included in the app specification dxapp.json.

The cluster specification

The cluster specification describes what the Spark cluster should look like. You can configure things like the cluster type, the Spark version (note that only Spark 2.2.0 is supported at the current time), and the number of nodes. The cluster specification is required for Spark apps--it should be added to the dxapp.json with the following syntax.

{
  "...": "...",
  "systemRequirements": {
    "*": {
      "...": "...",
      "clusterSpec": {
        "type": "spark",
        "version": "2.2.0",
        "initialInstanceCount": "<num_cluster_nodes>"
      }
    }
  }
}

The cluster specification is a mapping with the following key-values:

  • type -> the cluster type, "spark" for Spark apps
  • version -> the Spark version, "2.2.0" for Spark 2.2.0 (the only version supported at the current time)
  • initialInstanceCount -> the number (integer) of nodes in the cluster, including the master node. Should be at least 2

Output specification

For apps that write databases, the output specification should include:

{
  "outputSpec": [
    {
      "name": "database",
      "label": "Output database",
      "class": "string",
      "patterns": { "class": "database" },
      "help": "The output database"
    },
    "..."
  ],
  "...": "..."
}

Network and project access

Spark apps should have network access. Apps that write databases should have direct access to the parent project (minimally CONTRIBUTE), and apps that read databases should have access to all projects (minimally VIEW).

Example dxapp.json:

{
	"access": {
		"project": "CONTRIBUTE",
		"allProjects": "VIEW",
		"network": [ "*" ]
	},
	"...": "..."
}

Bash app always

It is recommended that Spark apps be Bash apps at the top level. If you want to develop PySpark applications, then you can include Python modules as app dependencies and submit them as Spark apps in the Bash script.

Bash script -- things to include

The following things should be included in the Bash app's main script, described in the order in which they might be required.

Sourcing the cluster environment

The following line must be included in the Bash script prior to operating on the Spark/Hadoop cluster.

source /cluster/dx-cluster.environment

The dx-cluster.environment file is included in the execution environment automatically for Spark jobs.

Distributing input data

Unlike regular jobs, input data for Spark jobs is not automatically available across the complete execution environment. In particular, input data must be distributed to the Spark workers before the Spark executors can process the data. To distribute data to all nodes, you can put it into HDFS using the following line.

$HADOOP_HOME/bin/hadoop fs -mkdir -p /hdfs/path/to
$HADOOP_HOME/bin/hadoop fs -put /local/path/to/file /hdfs/path/to/file

Submitting applications

To submit a PySpark application to the Spark cluster, include the following line.

$SPARK_HOME/bin/spark-submit /opt/pyspark_app.py ...

This is the minimum requirement for submitting applications. More advanced submissions are described herein.

Configuring runtime memory and cores

spark_executor_memory = ... # Could be parameterized as an app input
spark_executor_cores = ...  # Could be parameterized as an app input
$SPARK_HOME/bin/spark-submit \
  --executor-memory=$spark_executor_memory \
  --executor-cores=$spark_executor_cores \
  /opt/pyspark_app.py ...

Custom Spark properties

To pass custom Spark properties for the Spark application, you can do

$SPARK_HOME/bin/spark-submit \
  --conf $spark_property=$spark_property_value \
  /opt/pyspark_app.py ...

or

$SPARK_HOME/bin/spark-submit \
  --properties-file=/path/to/properties/file \
  /opt/pyspark_app.py ...

Sharing environment variables

$SPARK_HOME/bin/spark-submit \
  --conf spark.executorEnv.PYTHONPATH=$PYTHONPATH \
  /opt/pyspark_app.py ...

Custom log levels

$SPARK_HOME/bin/spark-submit \
  --driver-java-options -Dlog4j.configuration=file:"$($DX_CLUSTER_UTILS/get_log_properties.sh $cluster_master_log_level)" \
  --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:"$($DX_CLUSTER_UTILS/get_log_properties.sh $cluster_workers_log_level)" \
  /opt/pyspark_app.py ...

The available log levels are {"WARN", "INFO", "DEBUG", "TRACE"}. By default, the log levels are WARN, but at times it may be desirable to see more detailed logs for debugging.

Collecting cluster logs

To enable the collection of Spark cluster logs for debugging after the fact, you must include the following in the output specification

{
  "outputSpec": [
    {
      "name": "cluster_runtime_logs_tarball",
      "label": "Cluster runtime logs tarball",
      "class": "file",
      "patterns": [ "*.tar.gz" ],
      "optional": true,
      "help": "The tarball of the cluster runtime logs"
    },
    "..."
  ],
  "...": "..."
}

and the following in the Bash script after the spark-submit call.

/cluster/log_collector.sh /home/dnanexus/out/cluster_runtime_logs_tarball

The event logs in the tarball could be passed to the Spark history server for debugging after the fact. How to do so is outside the scope of this documentation.

PySpark script

The following things should be included in the submitted PySpark script, described in the order in which they might be required.

Spark session and Hive support

Inside the submitted PySpark script (pyspark_app.py in the examples heretofore), a Spark session must be instantiated, and it must have Hive support.

import pyspark
spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()

Additional options may be specified, but they are outside the scope of this documentation.

Creating databases

To create DNAnexus databases, you can use the following line.

db = "..."
spark.sql("CREATE DATABASE {} LOCATION 'dnax://'".format(db))

Note the location "dnax://"--this is the custom scheme that must be used for all DNAnexus databases.

Interactive monitoring of the Spark UI and HDFS UI

While the Spark job is running, it is possible to interactively monitor the Spark UI and HDFS Web UI (cf. Collecting cluster logs). To enable interactive monitoring, do the following things:

  1. Configure SSH for your user account
  2. Run the Spark app with SSH enabled

    dx run app-xxxx --allow-ssh
    

    Note the job id (e.g., job-xxxx)

  3. SSH to the job with port forwarding for the Spark UI

    dx ssh job-FKfXzq00YpP8FX54KV8xZKJ9 \
      --suppress-running-check \
      -o 'StrictHostKeyChecking no' \
      -L 4040:localhost:4040 \
      -L 50070:localhost:50070 -N
    
  4. Navigate to localhost:4040 to access the Spark UI, and to localhost:50070 for the HDFS Web UI

Example

The dxapp.json:

{
  "...": "...",
  "outputSpec": [
    {
      "name": "database",
      "label": "Output database",
      "class": "string",
      "patterns": { "class": "database" },
      "help": "The output database"
    },
    "..."
  ],
  "runSpec": {
    "file": "src/bash_app.sh"
  },
  "systemRequirements": {
    "*": {
      "...": "...",
      "clusterSpec": {
        "type": "spark",
        "version": "2.2.0",
        "initialInstanceCount": "<num_cluster_nodes>"
      }
    }
  },
	"access": {
		"project": "CONTRIBUTE",
		"allProjects": "VIEW",
		"network": [ "*" ]
	},
}

The bash_app.sh:

source /cluster/dx-cluster.environment

$HADOOP_HOME/bin/hadoop fs -mkdir -p /hdfs/path/to
$HADOOP_HOME/bin/hadoop fs -put /local/path/to/file /hdfs/path/to/file

spark_executor_memory = ... # Could be parameterized as an app input
spark_executor_cores = ...  # Could be parameterized as an app input
$SPARK_HOME/bin/spark-submit \
  --executor-memory=$spark_executor_memory \
  --executor-cores=$spark_executor_cores \
  /opt/pyspark_app.py ...

The pyspark_app.py:

import pyspark
spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()
db = "..."
spark.sql("CREATE DATABASE {} LOCATION 'dnax://'".format(db))

Last edited by Elena Duranova, 2018-10-24 22:03:29

 Feedback