Skip to Content

Abstract

A classic use case for Vora is to accelerate HDFS datasets by caching a subset of data in Vora’s Relational (In Memory) or Disk-based stores to allow external SQL access via ODBC/JDBC.  Previous versions of Vora could only allow ingest via the HDFS Adapter.  Vora 2.0 (in conjunction with Spark 2.1) now supplies Spark Extensions that permit writing of DataFrames directly into a Vora table.  This article discusses the improvements and walks you through writing data into a Vora Relational table from a Spark 2.1.2 DataFrame.
 

History

Vora’s HDFS Adapter

Prior to Vora 2.0, the only way for Vora to consume data was through it’s HDFS adapter.  Unfortunately this adapter was unable to read HDFS Partitioned datasets. Furthermore, given a flat HDFS dataset, it could consume neither a subset of columns nor a subset of columns rows – it could only consume the entire dataset, or one or more of the complete individual HDFS files that made up the dataset.

SAP Vora Spark Extensions – Vora 1.x

Accessing Vora and HANA tables via a Spark Datasource has been available since Vora 1.x.  However, previously Vora tables could only be read from, they could not be written to

SAP Vora Spark Extensions – Vora 2.x

The latest Vora Spark Extensions running within Spark 2.x* on top of Vora 2.0 now allow us to write to a Vora table from Spark, effectively pushing a Spark DataFrame into a Vora table.  Since Spark is capable of fully supporting HDFS Partitions via Hive, this now means that the HDFS limitation has been surpassed – we can now access an HDFS Partitioned table via Spark and write it directly into a Vora table.  Even better, we can project a subset of the rows and carve off unwanted rows from a dataset prior to writing them into the Vora table.  This is an incredibly powerful capability!
The goals of this whitepaper is to illustrate:

  1. We do not need to load all the rows from the original source or HDFS file (unlike the Vora HDFS adapter). In other words, it’s not “all or nothing”.  We can choose to filter off a subset of rows from the source and perform an initial load followed by subsequent incremental loads later on.
  2. We do not need to load all the columns from the original source (unlike the Vora HDFS adapter).  We can select a subset of the columns we’re interested in.
  3. We can perform filtering and projections (i.e transformations) between reading of the source hive table and writing the target vora table (unlike the Vora HDFS adapter)

The following steps walk you through writing data into a Vora Relational table from a Spark 2.1.2 DataFrame (with Spark/Hive integration enabled).   If you already have a partitioned table then you can skip to PART III, otherwise follow the instructions to download a tiny sample dataset, use Spark to create a partitioned HDFS dataset and record the table entry/table partitions within the Hive metastore.

*Be advised:  The PAM for Vora 2.0 (Page 4, section”Hadoop Platforms for Vora Spark Integration”), states that support is currently limited to Spark 1.6.x and Spark 2.1.x (neither Spark 2.0.x nor Spark 2.2.x).  The enhancements to the Vora Spark Extensions only apply to Spark 2.1.x.  Since Spark is a client tool (there’s no server component, except for the optional History Server), even if Spark 2 is not currently available on your cluster,  you can download and install a Spark 2.1.x client for Hadoop 2.7 on any node in your cluster to run these examples.

 

 

PART I – DOWNLOADING THE SAMPLE DATASET

Download the zip archive

We’ll be using a small dataset (only 20,000 rows – but enough records to create a date-partitioned dataset).  The zip archive contains three CSV files.  We’ll unzip them into a directory and then upload the entire directory into HDFS

$ wget http://archive.ics.uci.edu/ml/machine-learning-databases/00357/occupancy_data.zip
--2017-12-19 18:34:43--  http:|//archive.ics.uci.edu/ml/machine-learning-databases/00357/occupancy_data.zip
Length: 335713 (328K) [application/zip]
Saving to: ‘occupancy_data.zip’


100%[===================================================================================================>] 335,713      614KB/s   in 0.5s


2017-12-19 18:34:44 (614 KB/s) - ‘occupancy_data.zip’ saved [335713/335713]

The dataset is merely timeseries data for an ML exercise.  It contains metrics such as temperature and humidity of a room in order to attempt to identify whether the room was occupied by an individual or not.

Extract the zip into a dedicated folder

$ mkdir occupancy_data.d
$ unzip occupancy_data.zip -d occupancy_data.d

Archive:  occupancy_data.zip
inflating: occupancy_data.d/datatest.txt
inflating: occupancy_data.d/datatest2.txt
inflating: occupancy_data.d/datatraining.txt

 

Upload the entire folder to HDFS

If I supply a relative path, it will be uploaded to my HDFS user’s home directory (/user/<username>)

$ hdfs dfs -put occupancy_data.d
$ hdfs dfs -ls occupancy_data.d

Found 3 items
-rw-r--r--   3 vora hdfs     200766 2017-12-18 occupancy_data.d/datatest.txt
-rw-r--r--   3 vora hdfs     699664 2017-12-18 occupancy_data.d/datatest2.txt
-rw-r--r--   3 vora hdfs     596674 2017-12-18 occupancy_data.d/datatraining.txt

 

Confirm Spark and Hadoop Environment correct set for your system

Check your environment.  $SPARK_HOME should be pointing to a Spark 2.1.x installation and the $SPARK_HOME should contain the appropriate hive-site.xml 

$ env|grep -E "(SPARK|HADOOP)"

SPARK_DAEMON_MEMORY=1024m
SPARK_HOME=/home/vora/spark-2.1.2-bin-hadoop2.7
HADOOP_HOME=/usr/hdp/current/hadoop-client
SPARK_LOG_DIR=/var/log/spark2
HADOOP_CONF_DIR=/usr/hdp/current/hadoop-client/conf
SPARK_PID_DIR=/var/run/spark2

Determine whether your spark-defaults.conf file contains the necessary vora configuration.
Perform the following test. Is this thre result?

 grep vora $SPARK_HOME/conf/spark-defaults.conf
spark.sap.vora.host k8s1111.internal.sap.corp
spark.sap.vora.port 31932

…or is this the result?

 grep vora $SPARK_HOME/conf/spark-defaults.conf
(Nothing)
  • If your spark-defaults.conf file contains the two settings for the Vora Transaction co-ordinator’s host and port, you can “connect automatically” using the sap.spark.vora.PublicVoraClientUtils helper package that retrieves the host and port from your spark-defaults.conf file automatically.
  • If your spark-defaults.conf file does not contain the two settings for the Vora Transaction co-ordinator’s host and port, or you choose to explicity create a connection specifying them, you can “connect manually” use the sap.client.vora.PublicVoraClient package where you explicitly specify the host and port of the Vora Transaction co-ordinator.

Locate the SAP Vora Spark extensions on your node

While running the install.sh script of the SAP Vora Spark extensions installation, it would have reported something similar to the following:


SAP Vora 2.0 Spark Integration Installer
Installing SAP Vora 2.0 Spark Integration
Reading host file at /tmp/SAPVora-SparkIntegration/config/hosts.txt
Parsed 6 hostnames from hostfile

---------------INSTALLATION SUMMARY---------------
--------------------------------------------------
Ambari UI address                       : http://hadoopnode1.internal.sap.corp:8080
Ambari User ID                          : admin
Ambari cluster name                     : ciep
Ambari password                         : ****
Authentication                          : Disabled
Catalog host                            : k8s1111.internal.sap.corp
Catalog port                            : 31281
Cluster manager                         : ambari
HDFS installation folder                : /user/vora/lib
HDFS user                               : vora
Hortonworks version                     : 2.5.6.0-40
Host file                               : /tmp/SAPVora-SparkIntegration/config/hosts.txt
Install Spark 1.6.x support             : True
Install Spark 2.x support               : True
Installation folder                     : /opt/vora-spark
Path to datanucleus api-jdo             : /usr/hdp/current/spark2-client/jars/datanucleus-api-jdo-3.2.6.jar
Path to datanucleus core                : /usr/hdp/current/spark2-client/jars/datanucleus-core-3.2.10.jar
Path to datanucleus rdbms               : /usr/hdp/current/spark2-client/jars/datanucleus-rdbms-3.2.9.jar
Put Spark extensions jar to HDFS        : True
Ssh password                            : ****
Ssh username                            : root
Transaction coordinator host            : k8s1111.internal.sap.corp
Transaction coordinator port            : 31932
--------------------------------------------------

The “Installation folder” (in my case:  /opt/vora-spark) will point to the location of the SAP Vora Spark extensions on your node
 

PART II – CREATING A PARTITIONED DATASET

Start the Spark Shell including the Vora/Spark Extensions package

Using the Vora-supplied script will automatically include the spark-sap-datasources-spark2.jar under the SAP Vora Spark extensions installation folder (in my case, /opt/vora-spark/lib/)

 

$ /opt/vora-spark/bin/start-spark-shell.sh

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://99.99.99.99:4040
Spark context available as 'sc' (master = local[*], app id = local-1513709993631).
Spark session available as 'spark'.
Welcome to
   ____              __
  / __/__  ___ _____/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 2.1.2
    /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Ensure you’re using one of the supported versions of Spark for Vora i.e. Spark 2.1.x

 

Create a Spark DataFrame from the three occupancy csv files

Import the necessary data types

scala> import org.apache.spark.sql.types.{StructType,StructField,IntegerType,StringType,FloatType,TimestampType,DateType}
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, FloatType, TimestampType, DateType}

Create the schema for the CSV ingest

scala> val occupancySchema = StructType(Array(
          StructField("row_id", IntegerType, false),
          StructField("obs_ts", TimestampType, false),
          StructField("temp_c", FloatType, false),
          StructField("humidity", FloatType, false),
          StructField("light", FloatType, false),
          StructField("co2", FloatType, false),
          StructField("humidityRatio", FloatType, false),
          StructField("occupancy", IntegerType, false)
       ))
occupancySchema: org.apache.spark.sql.types.StructType = StructType(StructField(row_id,IntegerType,false), StructField(obs_ts,TimestampType,false), StructField(temp_c,FloatType,false), StructField(humidity,FloatType,false), StructField(light,FloatType,false), StructField(co2,FloatType,false), StructField(humidityRatio,FloatType,false), StructField(occupancy,IntegerType,false))

Read the contents of the directory (the three CSV files) into a Spark dataFrame

scala> val occupancyIngest_DF = spark.read.
                                       format("csv").
                                       schema(occupancySchema).
                                       option("header", "true").
                                       option("mode", "FAILFAST").
                                       load("occupancy_data.d")
occupancyIngest_DF: org.apache.spark.sql.DataFrame = [row_id: int, obs_ts: timestamp ... 6 more fields]

Perform a simple row count (should amount to 20,560 rows)

scala> occupancyIngest_DF.count()
res0: Long = 20560

scala> occupancyIngest_DF.select("obs_ts", "temp_c", "occupancy").show(5)
+--------------------+-------+---------+
|              obs_ts| temp_c|occupancy|
+--------------------+-------+---------+
|2015-02-11 14:48:...|  21.76|        1|
|2015-02-11 14:49:...|  21.79|        1|
|2015-02-11 14:50:...|21.7675|        1|
|2015-02-11 14:51:...|21.7675|        1|
|2015-02-11 14:51:...|  21.79|        1|
+--------------------+-------+---------+
only showing top 5 rows

 

Add a date column (from the Timestamp) to partition the table

scala> val occpyHive_DF = occupancyIngest_DF.
                       withColumn("obs_date", occupancyIngest_DF.col("obs_ts").
                       cast(DateType))
res4: org.apache.spark.sql.DataFrame = [row_id: int, obs_ts: timestamp ... 7 more fields]

scala> occpyHive_DF.select("obs_ts", "temp_c", "occupancy", "obs_date").show(5)

+--------------------+-------+---------+----------+
|              obs_ts| temp_c|occupancy|  obs_date|
+--------------------+-------+---------+----------+
|2015-02-11 14:48:...|  21.76|        1|2015-02-11|
|2015-02-11 14:49:...|  21.79|        1|2015-02-11|
|2015-02-11 14:50:...|21.7675|        1|2015-02-11|
|2015-02-11 14:51:...|21.7675|        1|2015-02-11|
|2015-02-11 14:51:...|  21.79|        1|2015-02-11|
+--------------------+-------+---------+----------+
only showing top 5 rows

 

Write the Partitioned HDFS dataset and record it in the Hive Metastore

(Optional) Specify your choice of ORC, Parquet or CSV

scala> occpyHive_DF.write.
                     format("orc").
                     partitionBy("obs_date").
                     saveAsTable("occupancyPrtnd_orc")

The ‘saveAsTable’ method will write the dataset under Hive’s HDFS area

  • (HDP) /apps/hive/warehouse/occupancyPrtnd_orc
  • (Cloudera) /user/hive/warehouse/occupancyPrtnd_orc

 

Now we’ve persisted the data within Hive, it’s safe to drop out of the Spark shell for a moment. Under HDFS you should see 18 items;  17 HDFS partition directories and a _SUCCESS file

$ hdfs dfs -ls /apps/hive/warehouse/occupancyprtnd_orc|sed 's/^.* \//\//'

Found 18 items
/apps/hive/warehouse/occupancyprtnd_orc/_SUCCESS
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-02
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-03
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-04
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-05
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-06
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-07
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-08
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-09
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-10
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-11
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-12
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-13
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-14
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-15
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-16
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-17
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-18

If necessary, restart the Vora Spark shell:

$ /opt/vora-spark/bin/start-spark-shell.sh

Back in the Vora Spark shell, we can now use Spark’s Hive Context to query the table

scala> spark.sql("""
           SELECT obs_date, temp_c, occupancy 
           FROM occupancyprtnd_orc 
           LIMIT 5
        """).show
+----------+------+---------+
|  obs_date|temp_c|occupancy|
+----------+------+---------+
|2015-02-12|  20.6|        0|
|2015-02-12| 20.55|        0|
|2015-02-12|  20.6|        0|
|2015-02-12|  20.6|        0|
|2015-02-12|  20.6|        0|
+----------+------+---------+

And a DESCRIBE of the table should reveal the partition information

scala> spark.sql("DESCRIBE occupancyprtnd_orc").show
+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|              row_id|      int|   null|
|              obs_ts|timestamp|   null|
|              temp_c|    float|   null|
|            humidity|    float|   null|
|               light|    float|   null|
|                 co2|    float|   null|
|       humidityRatio|    float|   null|
|           occupancy|      int|   null|
|            obs_date|     date|   null|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|            obs_date|     date|   null|
+--------------------+---------+-------+

 

 

PART III – QUERYING A PARTITIONED DATASET AND LOADING A SUBSET INTO VORA

Establish Vora session

We’ll create a Vora ‘Client’ in order to issue the table DDL and to perform Vora table queries.  There are two ways of doing creating one, depending upon whether your spark-defaults.conf file contains the necessary vora configuration

  • If your spark-defaults.conf file contains the two settings for the Vora Transaction co-ordinator’s host and port, you can “connect automatically” using the sap.spark.vora.PublicVoraClientUtils helper package that retrieves the host and port from your spark-defaults.conf file automatically.
  • If your spark-defaults.conf file does not contain the two settings for the Vora Transaction co-ordinator’s host and port, or you choose to explicity create a connection specifying them, you can “connect manually” use the sap.client.vora.PublicVoraClient package where you explicitly specify the host and port of the Vora Transaction co-ordinator.

a) Connect Automatically

scala> import sap.spark.vora.PublicVoraClientUtils
import sap.spark.vora.PublicVoraClientUtils 

scala> val vclient = PublicVoraClientUtils.createClient( spark )
vclient: sap.client.vora.PublicVoraClient = sap.client.vora.PublicVoraClient@6a7db16e

b) Connect Manually

scala> import sap.client.vora.PublicVoraClient
import sap.client.vora.PublicVoraClient

scala> val vclient = PublicVoraClient("k8s1111.internal.sap.corp", 31932)
vclient: sap.client.vora.PublicVoraClient = sap.client.vora.PublicVoraClient@6a7db16e 

 

The plan

We’ve created the source hive table.

  • It has nine columns (two integers, a Timestamp and five Float columns)
  • It is partitioned by date
  • It contains approximately 20,000 rows

Next we’ll create the target Vora table. It will have a matching number of columns and types as the Spark DataFrame

  • obsDate (of type date), isOcc (of type String) and tempF (of type Double).

Finally we’ll create an intermediate Spark DataFrame from the hive table. But it will have only three fields, and comprise of only approximately 3,000 rows

  • obs_date (of type date), occ_str (of type String) and t2 (of type Double).

 

I’m trying to illustrate several things here:

  1. We do not need to load all the rows from the original source or HDFS file (unlike the Vora HDFS adapter). In other words, it’s not “all or nothing”.  We can choose to filter off a subset of rows from the source and perform an initial load followed by subsequent incremental loads later on.
  2. We do not need to load all the columns from the original source (unlike the Vora HDFS adapter).  We can select a subset of the columns we’re interested in.
  3. We can perform filtering and projections (i.e transformations) between reading of the source hive table and writing the target vora table (unlike the Vora HDFS adapter)
  4. When creating tables within Vora you must use Vora’s datatypes (e.g. VARCHAR), not Spark’s datatypes (e.g. STRING).
  5. The intermediate Spark DataFrame’s column names need not match the Vora column names.  However, the types must match (or at least be compatible)

 

Create the Vora table within the Relational (In Memory) Engine

  • The “STORE IN MEMORY” DDL directive creates the table within the Relational Engine.  Alternative Engines can be specified (e.g. “STORE IN DISK” to create the table in the Disk Engine)
  • We use the “.execute” method for DDL statements (we’ll use the “.query” method to perform queries that return a result)
scala> vclient.execute("""
           CREATE TABLE "occupancyVora" 
               ("obsDate" DATE, "isOcc" VARCHAR(1), "tempF" FLOAT) 
           STORE IN MEMORY
        """)

Confirm the column types by querying Vora’s metatdata.  Note that I refer to a string in a WHERE clause using single quotes, not double quotes.

scala> vclient.query(""" 
           SELECT * FROM SYS.TABLE_COLUMNS 
           WHERE TABLE_NAME = 'occupancyVora'
        """).foreach(println)
VoraRow(Some(VORA), Some(occupancyVora), Some(obsDate), Some(DATE))
VoraRow(Some(VORA), Some(occupancyVora), Some(isOcc), Some(CHARACTER VARYING(1,EN_US)))
VoraRow(Some(VORA), Some(occupancyVora), Some(tempF), Some(DOUBLE))

Note how my request for the “tempF” column to be a FLOAT has been overridden. It has been stored as a DOUBLE instead.  I’ll ensure my dataFrame types match accordingly…

 

Test some SQL projection logic (test occupancy for 0 or 1)

I’m demonstrating I can project new columns that differ from the original HDFS/Hive dataset. Again, the Spark DataFrame column names (obs_date, occS_str, t2) need not match the Vora column names (obsDate, isOcc, tempF). But the types must match, hence:

  • The second column of my Vora table is a VARCHAR(1).  So I must ensure that it does not return more than one character otherwise an overflow error will occur (You could rightly ask why I’m using a SUBSTRING when I could have formed the IF() statement to return only a single character – but I’m deliberately using the substring to labor the point!)
  • The third column of my Vora table is a DOUBLE. So I’m casting my third column appropriately.

I’m also demonstrating how I can perform transformations (converting an integer into a Y/N flag, and converting Celsius in Farenheit) prior to loading into Vora

scala> spark.sql("""
           SELECT obs_date,
           SUBSTRING(IF(occupancy = 0,'No','Yes'),0,1) AS occ_str,
           CAST(round((temp_c*9/5)+32,2) AS DOUBLE) AS t2
           FROM occupancyprtnd_orc
           WHERE occupancy=0 
           LIMIT 3
       """).show
+----------+-------+-----+
|  obs_date|occ_str|   t2|
+----------+-------+-----+
|2015-02-12|      N|69.08|
|2015-02-12|      N|68.99|
|2015-02-12|      N|69.08|
+----------+-------+-----+

 

Prepare a DataFrame for the initial upload into Vora

In addition to projecting my new columns I’m also filtering out only the rows I’m interested in caching into Vora.

I want to demonstrate incremental loads later on, so let’s not load the entire dataset in just yet – let’s filter out everything except for a few day’s worth of data.

scala> val occupancyVoraInitial_DF = spark.sql("""
                SELECT 
                  obs_date, 
                  IF(occupancy = 0,'N','Y') AS occ_str, 
                  CAST(round((temp_c*9/5)+32,2) AS DOUBLE) AS t2 
                FROM occupancyprtnd_orc 
                WHERE obs_date BETWEEN '2015-02-02' AND '2015-02-04'
             """)
occupancyVoraInitial_DF: org.apache.spark.sql.DataFrame = [obs_date: date, isOcc: string ... 1 more field]

 

Three day’s worth (2nd, 3rd and 4th of Feb) amounts to relatively small subset of rows (~3,000) to begin with…

scala> occupancyVoraInitial_DF.count()
res33: Long = 3034

 

Here’s the empty table in Vora

Look, there’s nothing up my sleeves…

scala> vclient.query("""
              SELECT COUNT(*) 
              FROM "occupancyVora"
           """).foreach(println)
VoraRow(Some(0))
  • Note: When referring to my Vora table, I need to wrap it in quotes (double or single) in order to preserve case, otherwise the Vora parser will assume it’s upper case i.e.
scala> vclient.query("""
          SELECT COUNT(*) 
          FROM occupancyVora
       """).foreach(println)
com.sap.vora.jdbc.VoraException: HL(9): Runtime error. (sql_error:1:23-1:36: error: Table not found
SELECT COUNT(*) FROM occupancyVora
                     ^^^^^^^^^^^^^

Because I didn’t wrap the table name in quotes, Vora assumes the table name is OCCUPANCYVORA…and there is no table of that name. The table’s name is: occupancyVora.

 

And here comes the initial load into Vora

First import the ‘savemode’ package – we’ll be appending data in the Vora table.

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

And then the write

scala> occupancyVoraInitial_DF.write.
                                format( "sap.spark.vora" ).
                                option( "table", "occupancyVora" ).
                                mode( SaveMode.Append ).
                                save()
  • Note: the “host” and “port” of the Vora Transaction co-ordinator can also be fed in here, each as a .option()

 

Let’s take a peek at my Vora table…

scala> vclient.query("""
           SELECT COUNT(*) 
           FROM "occupancyVora" 
        """).foreach(println)
VoraRow(Some(3034))

3,034 rows – that’s the same size as my initial Spark DataFrame.

 

Let’s pretend it’s tomorrow

Let’s emulate an incremental load by grabbing the next day’s worth of data…

scala> val occupancyVoraNextDay_DF = spark.sql("""
            SELECT 
               obs_date, 
               IF(occupancy = 0,'N','Y') AS isOcc, 
               CAST(round((temp_c*9/5)+32,2) AS DOUBLE) AS t2 
            FROM occupancyprtnd_orc WHERE obs_date = '2015-02-05' 
         """)
occupancyVoraNextDay_DF: org.apache.spark.sql.DataFrame = [obs_date: date, isOcc: string ... 1 more field]

scala>  occupancyVoraNextDay_DF.count()
res12: Long = 1440

So let’s append those 1,440 rows onto my Vora table…

scala> occupancyVoraNextDay_DF.write.
                                format( "sap.spark.vora" ).
                                option( "table", "occupancyVora" ).
                                mode( SaveMode.Append ).
                                save()

scala> vclient.query(""" 
               SELECT COUNT(*) 
               FROM "occupancyVora"
            """).foreach(println)
VoraRow(Some(4474))

– Yep, 3034 rows + 1440 rows = 4474 rows!

And so on…

scala> val occupancyVoraNextDay_DF = spark.sql("""
               SELECT 
                 obs_date, 
                 IF(occupancy = 0,'N','Y') AS isOcc, 
                 CAST(round((temp_c*9/5)+32,2) AS DOUBLE) AS t2 
               FROM occupancyprtnd_orc 
               WHERE obs_date = '2015-02-06' 
            """)
occupancyVoraNextDay_DF: org.apache.spark.sql.DataFrame = [obs_date: date, isOcc: string ... 1 more field]

scala> occupancyVoraNextDay_DF.write.
                                format( "sap.spark.vora" ).
                                option( "table", "occupancyVora" ).
                                mode( SaveMode.Append ).
                                save()

scala> vclient.query(""" 
           SELECT COUNT(*) 
           FROM "occupancyVora" 
        """).foreach(println)
VoraRow(Some(5914))

Cleanup/Reset

Begin by dropping my Vora table

scala> vclient.execute("""DROP TABLE "occupancyVora" """)

Drop my table/data from the Hive Metastore

scala> spark.sql("DROP TABLE occupancyprtnd_orc")

Delete my HDFS directory containing the CSV Files

$ hdfs dfs -rm -r -skipTrash occupancy_data.d
To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply