Skip to Content

Here I want to build a pipeline that retrieves economic indicators from the internet, processes them and then loads into the SAP Vora engine, within SAP Data Hub.

The Pipeline components I will use for this pipeline are

  • HTTP Client – Download CSV file with Exchange Rates
  • JavaScript – Remove 5 Header Rows
  • Multiplexer – Split the pipeline to provide multiple outputs
  • Wiretap – View the pipeline data on the screen
  • Vora Avro Ingestor – Load the data into Vora
  • HDFS Producer – Persist the data into HDFS
  • Graph Terminator – Stop the graph from continuously running

The European Central Bank (ECB) provides the Statistical Data Warehouse  that we will use in our pipeline.

We can then take the desired feed, in my case I’ve chosen the daily EUR-USD Exchange Rate

http://sdw.ecb.europa.eu/quickviewexport.do?SERIES_KEY=120.EXR.D.USD.EUR.SP00.A&type=csv

Using the Data Hub Pipeline HTTP Client operator
We can paste that URL into the “getURL” field

getUrl: http://sdw.ecb.europa.eu/quickviewexport.do?SERIES_KEY=120.EXR.D.USD.EUR.SP00.A&type=csv

I have decreased the frequency of polling to 30 seconds, getPeriodInMs to 30,000 ms

We can now test this with the WireTap which accepts any input type.

After Saving our pipeline, we can now run this.

Creating the file name as above prefixed with “.” will automatically create the desired repository folder structure.

We can press Run and we should see it running at the bottom

With the pipeline running, we open the UI for the WireTap

We can see our data being returned to the screen

The pipeline will continue to to run forever, so you should stop it.

Now lets extend the pipeline with a Multiplexer and and HDFS Producer.

The HDFS directory structure is automatically created if it does not already exist.  <counter>, <date> and <time> are built in variables that can be used to create filenames.  The only configuration required on the Data Hub Technical Academy (TA) image is.

hadoopNamenode: spark-hdfs-adapter:8020

Re-running the pipeline will save the CSV file into HDFS.

We can browse the output in HDFS with the Data Hub Discovery User Interface

We can see that the are a number of header rows that should be handled.

Using a simple piece of JavaScript will allow us to do this,

Much of the code below is the framework for the JavaScript Operator 2, we just need the inbody lines to actually strip out the header

$.setPortCallback("input",onInput);

function isByteArray(data) {
    return (typeof data === 'object' && Array.isArray(data) 
        && data.length > 0 && typeof data[0] === 'number')
}

function onInput(ctx,s) {
    var msg = {};
    var inbody = s.Body;

    if (isByteArray(inbody)) {
        inbody = String.fromCharCode.apply(null, inbody);
    }
    
    // remove first 5 lines
    // break the textblock into an array of lines
    inbody = inbody.split('\n');
    // remove 5 lines, starting at the first position
    inbody.splice(0,5);
    // join the array back into a single string
    inbody = inbody.join('\n');
    
    msg.Body = inbody;
    $.output(msg);
}

We can save and re-run our job, and check the output with either the WireTap or Discovery

We can now extend the pipeline further, by loading into SAP Vora using the Vora Avro Ingestor.
Despite what the name suggests, this operator actually works with JSON, CSV and Avro file formats.

The Vora Avro Ingestor expects an Avro Schema, the schema tells Vora the table name, columns, data types and specification, we need to supply this, which is shown below.

DefaultAvroSchema:

{
  "name": "ECB_EXCHANGE_RATE",
  "type": "record",
  "fields": [
    {
      "name": "ER_DATE",
      "type": [
        "null",
        "date"
      ]
    },
    {
      "name": "EXCHANGE_RATE",
      "type": [
        "null",
        "double"
      ]
    }
  ]
}

The other parameters for the Vora Avro Ingestor with the TA image would like like the following.

dsn: v2://dqp:50000/?binary=true
engineType: Disk
tableType: empty (not Streaming)
databaseSchema: WEBDATA
csvHeaderIncluded: false
user: admin
password: I can't tell you that

Rerunning this pipeline now produces an error, as the we attempt to insert a “-” into an numeric field.

We can add the following additional line of JavaScript to correct this.

// Replace "-" characters with Nulls in Exchange Rate data
inbody = inbody.replace(/,-/g,",");

If we rerun our pipeline now we can check Vora Tools to see that the Schema and Table and have been created and the data has also been loaded.

However the pipeline will continue to run and re-download the same data and insert into our Vora table continuously.  To change this behavior we need to supply a commit token within our existing JavaScript operator.  The Vora Avro Ingestor will then pass this on to the Graph Terminator.

msg.Attributes = {};    
// Add Commit Token to DataHub message header to stop pipeline from running continuously
msg.Attributes["message.commit.token"] = "stop-token";

The completed pipeline would then look like this.

Rerunning it now, we can see that the pipeline completes and all is good 🙂

I have now written a 2nd blog where I use Python instead of JavaScript, which can be found here.

Using the SAP Data Hub Pipeline Python Operator

To report this post you need to login first.

7 Comments

You must be Logged on to comment or reply to a post.

  1. Douglas Maltby

    Very nice tutorial, illustration & use case! I wasn’t familiar with the Wiretap operator but had used Terminal operator similarly. I now see the Wiretap operator is indeed in the SDH Dev Edition. Thanks for putting this together, Ian!

    Doug

     

    (0) 

Leave a Reply