$ wget http://archive.ics.uci.edu/ml/machine-learning-databases/00357/occupancy_data.zip
--2017-12-19 18:34:43-- http:|//archive.ics.uci.edu/ml/machine-learning-databases/00357/occupancy_data.zip
Length: 335713 (328K) [application/zip]
Saving to: ‘occupancy_data.zip’
100%[===================================================================================================>] 335,713 614KB/s in 0.5s
2017-12-19 18:34:44 (614 KB/s) - ‘occupancy_data.zip’ saved [335713/335713]
$ mkdir occupancy_data.d
$ unzip occupancy_data.zip -d occupancy_data.d
Archive: occupancy_data.zip
inflating: occupancy_data.d/datatest.txt
inflating: occupancy_data.d/datatest2.txt
inflating: occupancy_data.d/datatraining.txt
$ hdfs dfs -put occupancy_data.d
$ hdfs dfs -ls occupancy_data.d
Found 3 items
-rw-r--r-- 3 vora hdfs 200766 2017-12-18 occupancy_data.d/datatest.txt
-rw-r--r-- 3 vora hdfs 699664 2017-12-18 occupancy_data.d/datatest2.txt
-rw-r--r-- 3 vora hdfs 596674 2017-12-18 occupancy_data.d/datatraining.txt
$ env|grep -E "(SPARK|HADOOP)"
SPARK_DAEMON_MEMORY=1024m
SPARK_HOME=/home/vora/spark-2.1.2-bin-hadoop2.7
HADOOP_HOME=/usr/hdp/current/hadoop-client
SPARK_LOG_DIR=/var/log/spark2
HADOOP_CONF_DIR=/usr/hdp/current/hadoop-client/conf
SPARK_PID_DIR=/var/run/spark2
$ grep vora $SPARK_HOME/conf/spark-defaults.conf
spark.sap.vora.host k8s1111.internal.sap.corp
spark.sap.vora.port 31932
$ grep vora $SPARK_HOME/conf/spark-defaults.conf
(Nothing)
SAP Vora 2.0 Spark Integration Installer
Installing SAP Vora 2.0 Spark Integration
Reading host file at /tmp/SAPVora-SparkIntegration/config/hosts.txt
Parsed 6 hostnames from hostfile
---------------INSTALLATION SUMMARY---------------
--------------------------------------------------
Ambari UI address : http://hadoopnode1.internal.sap.corp:8080
Ambari User ID : admin
Ambari cluster name : ciep
Ambari password : ****
Authentication : Disabled
Catalog host : k8s1111.internal.sap.corp
Catalog port : 31281
Cluster manager : ambari
HDFS installation folder : /user/vora/lib
HDFS user : vora
Hortonworks version : 2.5.6.0-40
Host file : /tmp/SAPVora-SparkIntegration/config/hosts.txt
Install Spark 1.6.x support : True
Install Spark 2.x support : True
Installation folder : /opt/vora-spark
Path to datanucleus api-jdo : /usr/hdp/current/spark2-client/jars/datanucleus-api-jdo-3.2.6.jar
Path to datanucleus core : /usr/hdp/current/spark2-client/jars/datanucleus-core-3.2.10.jar
Path to datanucleus rdbms : /usr/hdp/current/spark2-client/jars/datanucleus-rdbms-3.2.9.jar
Put Spark extensions jar to HDFS : True
Ssh password : ****
Ssh username : root
Transaction coordinator host : k8s1111.internal.sap.corp
Transaction coordinator port : 31932
--------------------------------------------------
$ /opt/vora-spark/bin/start-spark-shell.sh
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://99.99.99.99:4040
Spark context available as 'sc' (master = local[*], app id = local-1513709993631).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.2
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
scala> import org.apache.spark.sql.types.{StructType,StructField,IntegerType,StringType,FloatType,TimestampType,DateType}
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, FloatType, TimestampType, DateType}
scala> val occupancySchema = StructType(Array(
StructField("row_id", IntegerType, false),
StructField("obs_ts", TimestampType, false),
StructField("temp_c", FloatType, false),
StructField("humidity", FloatType, false),
StructField("light", FloatType, false),
StructField("co2", FloatType, false),
StructField("humidityRatio", FloatType, false),
StructField("occupancy", IntegerType, false)
))
occupancySchema: org.apache.spark.sql.types.StructType = StructType(StructField(row_id,IntegerType,false), StructField(obs_ts,TimestampType,false), StructField(temp_c,FloatType,false), StructField(humidity,FloatType,false), StructField(light,FloatType,false), StructField(co2,FloatType,false), StructField(humidityRatio,FloatType,false), StructField(occupancy,IntegerType,false))
scala> val occupancyIngest_DF = spark.read.
format("csv").
schema(occupancySchema).
option("header", "true").
option("mode", "FAILFAST").
load("occupancy_data.d")
occupancyIngest_DF: org.apache.spark.sql.DataFrame = [row_id: int, obs_ts: timestamp ... 6 more fields]
scala> occupancyIngest_DF.count()
res0: Long = 20560
scala> occupancyIngest_DF.select("obs_ts", "temp_c", "occupancy").show(5)
+--------------------+-------+---------+
| obs_ts| temp_c|occupancy|
+--------------------+-------+---------+
|2015-02-11 14:48:...| 21.76| 1|
|2015-02-11 14:49:...| 21.79| 1|
|2015-02-11 14:50:...|21.7675| 1|
|2015-02-11 14:51:...|21.7675| 1|
|2015-02-11 14:51:...| 21.79| 1|
+--------------------+-------+---------+
only showing top 5 rows
scala> val occpyHive_DF = occupancyIngest_DF.
withColumn("obs_date", occupancyIngest_DF.col("obs_ts").
cast(DateType))
res4: org.apache.spark.sql.DataFrame = [row_id: int, obs_ts: timestamp ... 7 more fields]
scala> occpyHive_DF.select("obs_ts", "temp_c", "occupancy", "obs_date").show(5)
+--------------------+-------+---------+----------+
| obs_ts| temp_c|occupancy| obs_date|
+--------------------+-------+---------+----------+
|2015-02-11 14:48:...| 21.76| 1|2015-02-11|
|2015-02-11 14:49:...| 21.79| 1|2015-02-11|
|2015-02-11 14:50:...|21.7675| 1|2015-02-11|
|2015-02-11 14:51:...|21.7675| 1|2015-02-11|
|2015-02-11 14:51:...| 21.79| 1|2015-02-11|
+--------------------+-------+---------+----------+
only showing top 5 rows
scala> occpyHive_DF.write.
format("orc").
partitionBy("obs_date").
saveAsTable("occupancyPrtnd_orc")
$ hdfs dfs -ls /apps/hive/warehouse/occupancyprtnd_orc|sed 's/^.* \//\//'
Found 18 items
/apps/hive/warehouse/occupancyprtnd_orc/_SUCCESS
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-02
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-03
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-04
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-05
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-06
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-07
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-08
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-09
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-10
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-11
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-12
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-13
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-14
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-15
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-16
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-17
/apps/hive/warehouse/occupancyprtnd_orc/obs_date=2015-02-18
$ /opt/vora-spark/bin/start-spark-shell.sh
scala> spark.sql("""
SELECT obs_date, temp_c, occupancy
FROM occupancyprtnd_orc
LIMIT 5
""").show
+----------+------+---------+
| obs_date|temp_c|occupancy|
+----------+------+---------+
|2015-02-12| 20.6| 0|
|2015-02-12| 20.55| 0|
|2015-02-12| 20.6| 0|
|2015-02-12| 20.6| 0|
|2015-02-12| 20.6| 0|
+----------+------+---------+
scala> spark.sql("DESCRIBE occupancyprtnd_orc").show
+--------------------+---------+-------+
| col_name|data_type|comment|
+--------------------+---------+-------+
| row_id| int| null|
| obs_ts|timestamp| null|
| temp_c| float| null|
| humidity| float| null|
| light| float| null|
| co2| float| null|
| humidityRatio| float| null|
| occupancy| int| null|
| obs_date| date| null|
|# Partition Infor...| | |
| # col_name|data_type|comment|
| obs_date| date| null|
+--------------------+---------+-------+
scala> import sap.spark.vora.PublicVoraClientUtils
import sap.spark.vora.PublicVoraClientUtils
scala> val vclient = PublicVoraClientUtils.createClient( spark )
vclient: sap.client.vora.PublicVoraClient = sap.client.vora.PublicVoraClient@6a7db16e
scala> import sap.client.vora.PublicVoraClient
import sap.client.vora.PublicVoraClient
scala> val vclient = PublicVoraClient("k8s1111.internal.sap.corp", 31932)
vclient: sap.client.vora.PublicVoraClient = sap.client.vora.PublicVoraClient@6a7db16e
scala> vclient.execute("""
CREATE TABLE "occupancyVora"
("obsDate" DATE, "isOcc" VARCHAR(1), "tempF" FLOAT)
STORE IN MEMORY
""")
scala> vclient.query("""
SELECT * FROM SYS.TABLE_COLUMNS
WHERE TABLE_NAME = 'occupancyVora'
""").foreach(println)
VoraRow(Some(VORA), Some(occupancyVora), Some(obsDate), Some(DATE))
VoraRow(Some(VORA), Some(occupancyVora), Some(isOcc), Some(CHARACTER VARYING(1,EN_US)))
VoraRow(Some(VORA), Some(occupancyVora), Some(tempF), Some(DOUBLE))
scala> spark.sql("""
SELECT obs_date,
SUBSTRING(IF(occupancy = 0,'No','Yes'),0,1) AS occ_str,
CAST(round((temp_c*9/5)+32,2) AS DOUBLE) AS t2
FROM occupancyprtnd_orc
WHERE occupancy=0
LIMIT 3
""").show
+----------+-------+-----+
| obs_date|occ_str| t2|
+----------+-------+-----+
|2015-02-12| N|69.08|
|2015-02-12| N|68.99|
|2015-02-12| N|69.08|
+----------+-------+-----+
scala> val occupancyVoraInitial_DF = spark.sql("""
SELECT
obs_date,
IF(occupancy = 0,'N','Y') AS occ_str,
CAST(round((temp_c*9/5)+32,2) AS DOUBLE) AS t2
FROM occupancyprtnd_orc
WHERE obs_date BETWEEN '2015-02-02' AND '2015-02-04'
""")
occupancyVoraInitial_DF: org.apache.spark.sql.DataFrame = [obs_date: date, isOcc: string ... 1 more field]
scala> occupancyVoraInitial_DF.count()
res33: Long = 3034
scala> vclient.query("""
SELECT COUNT(*)
FROM "occupancyVora"
""").foreach(println)
VoraRow(Some(0))
scala> vclient.query("""
SELECT COUNT(*)
FROM occupancyVora
""").foreach(println)
com.sap.vora.jdbc.VoraException: HL(9): Runtime error. (sql_error:1:23-1:36: error: Table not found
SELECT COUNT(*) FROM occupancyVora
^^^^^^^^^^^^^
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> occupancyVoraInitial_DF.write.
format( "sap.spark.vora" ).
option( "table", "occupancyVora" ).
mode( SaveMode.Append ).
save()
scala> vclient.query("""
SELECT COUNT(*)
FROM "occupancyVora"
""").foreach(println)
VoraRow(Some(3034))
scala> val occupancyVoraNextDay_DF = spark.sql("""
SELECT
obs_date,
IF(occupancy = 0,'N','Y') AS isOcc,
CAST(round((temp_c*9/5)+32,2) AS DOUBLE) AS t2
FROM occupancyprtnd_orc WHERE obs_date = '2015-02-05'
""")
occupancyVoraNextDay_DF: org.apache.spark.sql.DataFrame = [obs_date: date, isOcc: string ... 1 more field]
scala> occupancyVoraNextDay_DF.count()
res12: Long = 1440
scala> occupancyVoraNextDay_DF.write.
format( "sap.spark.vora" ).
option( "table", "occupancyVora" ).
mode( SaveMode.Append ).
save()
scala> vclient.query("""
SELECT COUNT(*)
FROM "occupancyVora"
""").foreach(println)
VoraRow(Some(4474))
scala> val occupancyVoraNextDay_DF = spark.sql("""
SELECT
obs_date,
IF(occupancy = 0,'N','Y') AS isOcc,
CAST(round((temp_c*9/5)+32,2) AS DOUBLE) AS t2
FROM occupancyprtnd_orc
WHERE obs_date = '2015-02-06'
""")
occupancyVoraNextDay_DF: org.apache.spark.sql.DataFrame = [obs_date: date, isOcc: string ... 1 more field]
scala> occupancyVoraNextDay_DF.write.
format( "sap.spark.vora" ).
option( "table", "occupancyVora" ).
mode( SaveMode.Append ).
save()
scala> vclient.query("""
SELECT COUNT(*)
FROM "occupancyVora"
""").foreach(println)
VoraRow(Some(5914))
scala> vclient.execute("""DROP TABLE "occupancyVora" """)
scala> spark.sql("DROP TABLE occupancyprtnd_orc")
$ hdfs dfs -rm -r -skipTrash occupancy_data.d
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
24 | |
10 | |
8 | |
7 | |
7 | |
7 | |
6 | |
6 | |
6 | |
6 |