Developer Tutorials/Parallelize Your App

When you write your app (or applet) for the DNAnexus Platform, it has the ability to run on multiple worker machines in the cloud simultaneously. To enable this feature, you will need to break your code into separate callable functions which we call entry points.

Calling an entry point creates a new subjob to run it. This child job will be run on a separate machine in the cloud and will have access to the same temporary workspace as its parent job. Thus if your "main" entry point has created a GenomicTable and passes the ID of the table to a child job, the child job can then perform some computation and add rows to the open GTable directly. A third entry point may be responsible for closing the GTable once all the other jobs have finished processing.

Adding Entry Points to Your Code

The input and output hashes that are provided to (and received from) subjobs are just like the inputs and outputs you use when running a separate app or applet. However, unlike apps and applets, subjobs never declare their input and output specs.

Using bash

To create multiple entry points for an app/applet that uses the bash interpreter (everything other than Python), simply create bash functions in your script with the same name as your entry point. The following code snippet demonstrates when each part of your script will be run.

# Anything outside the function declarations is always run

myfunc() {
        # myfunc only gets called when invoked by main (or by another
        # entry point)

        echo $myinput
}

main() {
        # main gets run when you run the app/applet

        # The following line creates a new job running "myfunc" which
        # will receive an input variable $myinput set to "hello world"

        dx-jobutil-new-job myfunc -imyinput='hello world'
}

Using Python

To designate entry points in your Python script, simply decorate the functions with @dxpy.entry_point("entry_point_name"). The following code snippet demonstrates when each part of your script will be run.

import dxpy

@dxpy.entry_point("myfunc")
def myfunc(myinput):
    # myfunc only gets called when invoked by main (or by another entry
    # point)

    print myinput

@dxpy.entry_point("main")
def main():
    # main gets run when you run the app/applet

    # The following line creates a new job running "myfunc" which
    # will receive an input variable myinput set to "hello world"

    dxpy.new_dxjob(fn_input={ "myinput": "hello world" }, fn_name="myfunc")

# The following line will call the appropriate entry point.
dxpy.run()

Specifying Dependencies And Passing Data Between Entry Points

In the DNAnexus Platform, rather than explicitly scheduling tasks to run in a certain order, you can specify your data dependencies directly using job-based object references, which allow you to pass data objects and other parameters between subjobs.

Using Job-Based Object References (JBORs)

A Job-Based Object Reference (JBOR) is a way of referring to a specific output field of another job or subjob. When specifying a job's input or output, you can supply a JBOR instead of an actual object ID or value. When the Platform encounters such references, it waits for the referenced job to finish; only then does it run the downstream subjobs that require that value. This provides a natural way to coordinate the operation of multiple (possibly parallel) subjobs in your app or applet.

Below is a simple example of using JBORs in an app that runs in parallel across multiple workers. This app takes a single required input, input_gtable, and counts how many reads in the table consist exclusively of "A"s. It spawns multiple subjobs to scan through the table in parallel. The general idea is as follows:

  • main: takes a single required input input_gtable and produces a single output count. When run, it splits the GTable into multiple chunks and schedules the process and postprocess subjobs.

  • process (invoked multiple times): takes as input a GTable and a row range, and outputs the number of matching rows in that row range.

  • postprocess: takes as input an array of integers (representing the outputs of the process stages) and adds them to produce the final desired answer.

When the app runs: the main subjob starts, schedules all the other subjobs, and exits; the process subjobs all run; when the last process subjob has finished, the postprocess subjob runs. When the postprocess subjob has finished, then the result of the job as a whole (the "count" output) is available.

In the example below, JBORs are used in two slightly different ways. We pass JBORs containing the output of process as input to the postprocess task. We also supply JBORs containing the output of postprocess as the output of the main job.

import re, dxpy

@dxpy.entry_point("main")
def main(input_gtable, num_subjobs=10):
    input_gtable = dxpy.DXGTable(input_gtable)

    num_rows = input_gtable.describe()["length"]

    # partial_counts[i] will be a JBOR pointing to the "partial_count" output
    # of the i'th 'process' subjob.
    partial_counts = []
    for i in range(num_subjobs):
        start_row = i * num_rows / num_subjobs
        end_row = (i + 1) * num_rows / num_subjobs
        subjob_input = {"input_gtable_id": input_gtable.get_id(), "start_row": start_row, "end_row": end_row}
        subjob = dxpy.new_dxjob(subjob_input, "process")
        partial_counts.append(subjob.get_output_ref("partial_count"))

    postprocess_subjob = dxpy.new_dxjob(fn_input={"partial_counts": partial_counts}, fn_name="postprocess")
    # Return a JBOR pointing to the output of the 'postprocess' subjob.
    return {
        "count": postprocess_subjob.get_output_ref("count")
    }

@dxpy.entry_point("process")
def process(input_gtable_id, start_row, end_row):
    """
    Input:
      input_gtable_id: (gtable, Mappings) gtable to read
      start_row: (int) row index to start scanning (inclusive)
      end_row: (int) row index to stop scanning (exclusive)
    Output:
      count: (int) number of reads in the row range consisting only of A's
    """
    partial_count = 0
    for row in dxpy.DXGTable(input_gtable_id).iterate_rows(start_row, end_row, want_dict=True):
        if re.match("^A+$", row["sequence"]):
            partial_count += 1
    return {"partial_count": partial_count}

@dxpy.entry_point("postprocess")
def postprocess(partial_counts):
    """
    Input:
      partial_counts: (array:int) partial counts from each process subjob
    Output:
      count: (int) total count for the entire GTable
    """
    return {"count": sum(partial_counts)}

dxpy.run()

Note that we use JBORs here to allow main to defer to subjobs that have not yet finished! The main entry point effectively says, "when the postprocess subjob is finished, supply its output as the output of my app as a whole". Because the Platform takes care of doing this propagation of the output (outside of the workers processing your jobs), this technique allows the main subjob to exit immediately, rather than waste resources explicitly waiting for the postprocess job to complete. In general, if you have an entry point that spawns another job and then waits for it to complete (using dx wait, for instance), you can rewrite it (and save money by lowering your compute usage!) by splitting it up into two or more entry points that are coordinated via JBORs.

In bash, a similar pattern might be (partially) implemented as follows. You can use the dx-jobutil-new-job command to create a new subjob, with argument-passing syntax similar to that of dx run.

main() {

  for ...
  do
    # Divide up the inputs however you wish here.
    # (Decide what args to pass to each process subjob based on the index.)
    additional_process_args=...

    process_job=($dx-jobutil-new-job process -iinput_gtable_id=$input_gtable $additional_process_args)

    # Now $process_job contains the job ID of the subjob we just
    # created. Create a JBOR pointing to its 'partial_count' output.
    postprocess_args="$postprocess_args -ipartial_counts=$process_job:partial_count"
  done

  # Supply the JBORs we created as input to the new postprocess subjob
  postprocess_job=$(dx-jobutil-new-job postprocess $postprocess_args)

  # Supply a JBOR that defers to the output of the postprocess subjob.
  dx-jobutil-add-output count "$postprocess_job":count --class=jobref

}

Declaring Job Dependencies Directly

In some cases it might not be natural to represent your dependency as a data dependency. For example, suppose you have an app that populates data into a GTable, where the process subjobs add rows to the GTable and the postprocess subjob simply closes it. In this case, unlike the example above, no data needs to be passed from process to postprocess: the only thing that postprocess needs to begin is the knowledge that all process subjobs have finished. You can schedule the jobs by writing something like the following in your main entry point:

@dxpy.entry_point("main")
def main(input_gtable, num_subjobs=10):
    # (Do any work here to initialize your output GTable)
    output_gtable = dxpy.new_dxgtable(...)

    process_jobs = []
    for i in range(num_subjobs):
        # {index: i} is a stand-in for whatever state is actually needed to
        # communicate to the subjob which part of the input it should work on
        subjob = dxpy.new_dxjob({"output_gtable_id": output_gtable.get_id(), "index": i}, "process")

        # This time, just collect the job ID instead of making a JBOR
        process_jobs.append(subjob.get_id())

    # 'postprocess' does not begin until all 'process' jobs have finished
    postprocess_subjob = dxpy.new_dxjob(fn_input={"output_gtable_id": output_gtable.get_id()},
                                        depends_on=process_jobs,
                                        fn_name="postprocess")

    return {"output": dxpy.dxlink(output_gtable)}

@dxpy.entry_point("process")
def process(output_gtable_id, index):
    output_gtable = dxpy.DXGTable(output_gtable_id)
    for i in range(1000):
        output_gtable.add_rows([...])
    return {}

@dxpy.entry_point("postprocess")
def postprocess(output_gtable_id):
    dxpy.DXGTable(output_gtable_id).close()
    return {}

In bash, you might start with the following pattern. Here, to set up the postprocess job, rather than passing JBORs, we use dx-jobutil-new-job --depends-on to set up the job dependencies.

main() {

  $output_gtable_id=...

  for i in {1..10}
  do
    # -iindex=$i is a stand-in for whatever state is actually needed to
    # communicate to the subjob which part of the input it should work
    # on
    process_jobs[$i]=($dx-jobutil-new-job process -ioutput_gtable_id=$output_gtable_id -iindex=$i)
  done

  # Use --depends-on to make postprocess depend on all process subjobs.
  postprocess_job=$(dx-jobutil-new-job postprocess -ioutput_gtable_id=$output_gtable_id --depends-on ${process_jobs[@]})

  dx-jobutil-add-output output "$output_gtable_id" --class=gtable

}

Downloading and Uploading Data

When you spawn a new subjob, it runs on a new worker instance that is freshly initialized and isolated from other workers. Also, note that when you pass a file or GTable to a subjob, you are just passing a reference to that data object (basically, a string containing its ID).

Therefore, if you need to download data (file or GTable) to the worker's local filesystem to process it, you need to do so in each subjob that needs the data. Similarly, any output produced by that subjob needs to be uploaded back to a file or GTable before the subjob finishes running.

Conversely, if you don't need data in a subjob, there's no need to download it in that subjob, and you can still pass a reference to that data to future subjobs. For example, the main entry point is frequently responsible for partitioning the input object into chunks to be processed by other entry points. In some cases you can do this simply by looking at the object's metadata (to find the byte size of a file, or the number of rows in a GTable, for example) without downloading the object in its entirety.

Parallel App Templates

An easy way to get the hang of using multiple entry points is to run dx-app-wizard and create an app with either the parallelized execution pattern or its more-advanced version, scatter-process-gather. (When you created an app for the first time, you probably selected basic.) From there, you can play around with the entry points and how they are called. The different patterns are described in more detail below.

Basic

As used in the Intro to Building Apps tutorial, the basic pattern is a single entry point that runs on a single machine from beginning to end. Input is given to the main entry point, and it produces output.

Parallelized

The parallelized template has three named entry points: main, process, and postprocess, and is similar to the example apps above.

Each box below represents a job running the labelled entry point. A box within another box represents a child job of the enclosing box. The input is converted by the main job into multiple pieces that are each given to a job running the process entry point. The main job also calls the postprocess entry point which takes as input all the outputs from the process stages. The postprocess stage is responsible for preparing the final output for the main entry point.

Scatter-Process-Gather

The scatter-process-gather execution pattern is similar to the parallelized pattern. What it does differently is that it formally breaks out a scatter phase as a separate black-box entry point in the app that splits up the app's input into arbitrarily many pieces. (As a side effect, this requires a map entry point to call process on each of the results from the scatter phase.)

TIP: You can always use the same pattern but replace an entry point with an API call to run a separate app or applet. For example, you could replace the call to run the scatter function with a call to run an external app, after which the rest of the entry points in your application act on the output of that app.

Further Reading

See the API Documentation for JBORs for more information about how to supply JBORs in the API directly and exactly how JBORs are interpreted by the system.

Last edited by Phil Sung, 2013-08-22 18:36:57

 Feedback