Spark SQL Runner

Overview

The Spark SQL Runner application brings up a Spark cluster and executes your provided list of SQL queries. This is especially useful if you need to perform a sequence repeatedly or if you need to run a complex set of queries. You can vary the size of your cluster to speed up your tasks.

How to Run Spark SQL Runner

Input:

  • sqlfile -> [Required] A .sql file which contains an ordered list of SQL queries.
  • paramfile -> [Optional] A json file which contains the input parameters like variable substitutions and export configurations.
  • user_config -> [Optional] User configuration json file, in case you want to set or override certain Spark configurations.

Other Options:

  • export -> [Optional] default false -- will export output files with results for the queries in the sqlfile
  • collect_logs -> [Optional] default false -- will collect executor, driver logs from all nodes of the cluster to a folder.
  • executor_memory -> [Optional] Amount of memory to use per executor process, in MiB unless otherwise specified. (e.g. 2g, 8g). This is passed as --executor-memory to Spark submit.
  • executor_cores -> [Optional] Number of cores to use per executor process. This is passed as --executor-cores to Spark submit.
  • driver_memory -> [Optional] Amount of memory to use for the driver process (e.g. 2g, 8g). This is passed as --driver-memory to Spark submit.
  • log_level -> [Optional] default INFO -- logging level for both driver and executors. [ALL, TRACE, DEBUG, INFO]

Output:

  • output_files -> Output files include report SQL file and query export files.

Basic Run

dx run spark-sql-runner \
  -i sqlfile=file-FQ4by2Q0Yy3pGp21F7vp8XGK \
  -i paramfile=file-FK7Qpj00GQ8Q7ybZ0pqYJj6G \
  -i export=true

Examples

sqlfile

SELECT * FROM ${srcdb}.${patient_table};
DROP DATABASE IF EXISTS ${dstdb} CASCADE;
CREATE DATABASE IF NOT EXISTS ${dstdb} LOCATION 'dnax://';
CREATE VIEW ${dstdb}.patient_view AS SELECT * FROM ${srcdb}.patient;
SELECT * FROM ${dstdb}.patient_view;

How the sqlfile is processed

1) The SQL runner extracts each command in sqlfile and runs them in sequential order. 2) Every SQL command needs to be separated with a semicolon ;.

SHOW DATABASES;
SELECT * FROM dbname.tablename1;
SELECT * FROM 
dbname.tablename2;
DESCRIBE DATABASE EXTENDED dbname;

3) Any command starting with -- is ignored (comments). Any comment within a command should be inside /*...*/ The following are examples of valid comments:

-- SHOW DATABASES;
-- SELECT * FROM dbname.tablename1;
SHOW /* this is valid comment */ TABLES;

Variable Substitution

Variable substitution can be done by specifying the variables to replace in paramfile.

{
  "substitutions" : {
    "srcdb": "sskrdemo1",
    "dstdb": "sskrtest201",
    "patient": "patient_new",
    "f2c":"patient_f2c",
    "derived":"patient_derived",
    "composed":"patient_composed",
    "complex":"patient_complex",
    "patient_view": "patient_newview",
    "brca": "brca_new",
    "patient_table":"patient",
    "cna": "cna_new"
  },

  "export":{
    "maxparts" : 2,
    "fileprefix":"demo",
    "header": true
  }
}
In the above example, each reference to srcdb in sqlfile within ${...} will be substituted with sskrdemo1. For example, select * from ${srcdb}.${patient_table};. The script adds the set command before executing any of the SQL commands in sqlfile. So select * from ${srcdb}.${patient_table}; would translate to:
set srcdb=sskrdemo1;
set patient_table=patient;
select * from ${srcdb}.${patient_table};

Export

If enabled, the results of the SQL commands will be exported to a CSV file. paramfile defines an export configuration.

{
  "export":
  {
    "maxparts" : 2,
    "fileprefix":"demo",
    "header": true
  }
}

1) maxparts -> default 1 -- specified to define the maximum part files you want to generate. This generally depends on how many executors you are running in the cluster as well as how many partitions of this file exist in the system. 2) fileprefix -> The filename prefix for every SQL output file. By default the output files will be prefixed with query_id which is the order in which the queries are listed in sqlfile (starting with 1). For example 1-out.csv. If we specify prefix, it will generate output files like <prefix>-1-out.csv. 3) header -> default true -- will add a header row to each exported file.

User Configuration

These values in spark-defaults.conf will override or add to the default Spark configuration.

{
  "spark-defaults.conf": [
    {
      "name": "spark.app.name",
      "value": "SparkAppName"
    },
    {
      "name": "spark.test.conf",
      "value": true
    }
  ]
}

Output Files

$ dx tree export
export
├── job-FFp7K2j0xppVXZ791fFxp2Bg-export.tar
├── job-FFp7K2j0xppVXZ791fFxp2Bg-debug.sql
There are two files generated in export folder: 1) <JobId>-export.tar : Contains all the query results. 2) <JobId>-outfile.sql : SQL debug file.

Export files

Extracting the export tar file will look like:

├── demo-0
│   ├── demo-0-out.csv
│   │   ├── _SUCCESS
│   │   ├── part-00000-1e2c301e-6b28-47de-b261-c74249cc6724-c000.csv
│   │   └── part-00001-1e2c301e-6b28-47de-b261-c74249cc6724-c000.csv
│   └── demo-0.sql
├── demo-1
│   ├── demo-1-out.csv
│   │   ├── _SUCCESS
│   │   └── part-00000-b21522da-0e5f-42ba-8197-e475841ba9c3-c000.csv
│   └── demo-1.sql
├── demo-2
│   ├── demo-2-out.csv
│   │   ├── _SUCCESS
│   │   ├── part-00000-e61c6eff-5448-4c39-8c72-546279d8ce6f-c000.csv
│   │   └── part-00001-e61c6eff-5448-4c39-8c72-546279d8ce6f-c000.csv
│   └── demo-3.sql
├── demo-3
│   ├── demo-3-out.csv
│   │   ├── _SUCCESS
│   │   └── part-00000-5a48ba0f-d761-4aa5-bdfa-b184ca7948b5-c000.csv
│   └── demo-3.sql
In the above example, demo is the fileprefix used. We have one folder for each query, and each folder has a .sql file containing the query executed and a .csv folder containing the result csv.

SQL Report file

Every sql run execution generates a sql runner debug report file. This is a .sql file.

-- [SQL Runner Report] --;
-- [SUCCESS][TimeTaken: 1.90734863281e-06 secs ] set f2c=patient_f2c;
-- [SUCCESS][TimeTaken: 1.90734863281e-06 secs ] set srcdb=sskrdemosrcdb1_13;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set patient=patient_new;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set derived=patient_derived;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set composed=patient_composed;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set patient_table=patient;
-- [SUCCESS][TimeTaken: 1.19209289551e-06 secs ] set complex=patient_complex;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set patient_view=patient_newview;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set cna=cna_new;
-- [SUCCESS][TimeTaken: 0.0 secs ] set brca=brca_new;
-- [SUCCESS][TimeTaken: 2.14576721191e-06 secs ] set dstdb=sskrdemodstdb1_13;
-- [SUCCESS][OutputFile: demo-0-out.csv, TimeTaken: 8.83630990982 secs] SHOW DATABASES;
-- [SUCCESS][OutputFile: demo-1-out.csv, TimeTaken: 3.85295510292 secs] create database sskrdemo2 location 'dnax://';
-- [SUCCESS][OutputFile: demo-2-out.csv, TimeTaken: 4.8106200695 secs] use sskrdemo2;
-- [SUCCESS][OutputFile: demo-3-out.csv , TimeTaken: 1.00737595558 secs] create table patient (first_name string, last_name string, age int, glucose int, temperature int, dob string, temp_metric string) stored as parquet;
It lists all the queries executed and status of the execution (Success or Fail). It also lists the name of the output file for that command and the time taken. If there are any failures, it will report the query and stop executing subsequent commands.

How to handle SQL errors?

While executing the series of SQL commands, one of the commands could fail (error, syntax, etc). In that case the app will quit and upload a SQL debug file to the project:

-- [SQL Runner Report] --;
-- [SUCCESS][TimeTaken: 1.90734863281e-06 secs ] set f2c=patient_f2c;
-- [SUCCESS][TimeTaken: 1.90734863281e-06 secs ] set srcdb=sskrdemosrcdb1_13;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set patient=patient_new;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set derived=patient_derived;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set composed=patient_composed;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set patient_table=patient;
-- [SUCCESS][TimeTaken: 1.19209289551e-06 secs ] set complex=patient_complex;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set patient_view=patient_newview;
-- [SUCCESS][TimeTaken: 9.53674316406e-07 secs ] set cna=cna_new;
-- [SUCCESS][TimeTaken: 0.0 secs ] set brca=brca_new;
-- [SUCCESS][TimeTaken: 2.14576721191e-06 secs ] set dstdb=sskrdemodstdb1_13;
-- [SUCCESS][OutputFile: demo-0-out.csv, TimeTaken: 8.83630990982 secs] select * from ${srcdb}.${patient_table};
-- [FAIL] SQL ERROR while below command [ Reason: u"\nextraneous input '`' expecting <EOF>(line 1, pos 45)\n\n== SQL ==\ndrop database if exists sskrtest2011 cascade `\n---------------------------------------------^^^\n"];
drop database if exists ${dstdb} cascade `;
create database if not exists ${dstdb} location 'dnax://';
create view ${dstdb}.patient_view as select * from ${srcdb}.patient;
select * from ${dstdb}.patient_view;
As you can see, it identifies the line with the SQL error and its response.
drop database if exists ${dstdb} cascade `;
Now we can fix the query in the .sql file and even use this report file as an input for a subsequent run -- picking up where it left off.

Last edited by Peter Murray, 2019-01-17 16:22:54

 Feedback