Skip to Content
Technical Articles

Utilize Vora Avro Ingestor and Format Converter in SAP Data Hub 2.5

Exercises / Solutions
Yao James / SAP DBS EIM

 

Exercise overview

In this exercise, you will be working with two data sources.

  1. Product reviews that come from an online shop. This is usually referred to as big data.
  2. Product data coming from an ERP system. This is usually referred to as enterprise data.

The goal of this exercise is to test operator “Vora Avro Ingestor” and “Format Converter”. We will convert json message to csv then ingest into Vora tables, and convert csv into json message then ingest in to Vora table.

Some example files are from pipeline exercise DAT361, for more details please refer to https://jam4.sapjam.com/groups/WY5owLnS74xvWMTdOAZcHb/content?folder_id=MzG7juuHi5nBbLcoMiicn2

 

Exercise 1 – Extract product reviews and perform sentiment analysis on them

In this first exercise, you will build a pipeline that extracts product reviews from HDFS, performs sentiment analysis on the reviews using Python and load the results in an SAP Vora table.

The sample steps are in SAP Internal CAL environment, you can also use your own Data Hub tenant and user account.

Explanation / Screenshot

1.     Open Chrome and go the SAP Data Hub URL provided by the instructor.

2.     Set the Tenant name to default

3.     Set the Username to taXX where XX is your assigned group number.

4.     Set Password to Welcome01

5.     Click on the Modeler tile.

6.     Click on the + icon to create a new graph.

7.     Add the Read File operator to the graph by clicking it and dragging it onto the graph editor.

8.     Click on the Open Configuration icon.

9.     Set the Service to webhdfs.

10.  Open the Connection Editor.

11.  Set the Configuration Type to Configuration Manager.
12.  Set the Connection ID to WebHDFS and click Save.

13.  Set the Path to /DAT361/Product_Reviews.

14.  Set Recursive to true.

15.  Set the Pattern to .*txt.

Please note that this is a regular expression that will find all files ending with txt in the provided path.  It is not the typical wildcard search used in Windows which would be *.txt instead.

16.  Set Only Read on Change to true.

17.  Add the Wiretap operator to the graph.

18.  Connect the outFileName output port of the Read File operator to the Wiretap operator.

19.  Click on the Save icon in the toolbar.

20.  Set the name to teched.xx.sentiment_analysis where xx is your assigned group number.

21.  Click on the Run icon and execute the pipeline.

22.  Make sure your graph is running before going to the next step.

23.  Right click on the Wiretap operator.

24.  Click on Open UI

25.  You should see the file names of all the files being read.

26.  Stop the execution of your graph.

27.  Add the Python2Operator operator to the graph.

28.  Click on the Add Port icon.

To pass the contents of the HDFS files from the Read File operator to the Python2Operator, you must add an input port to the Python2Operator.

29.  Set the Name to input.

30.  Set the Type to message

31.  Click on the Input radio button.

32.  Click OK

33.  Connect the outFile port of the ReadFile operator to the input port of the Python2Operator operator.

Once the Read File operator has read all the files, it will notify the Python2Operator that there are no more files.

34.  Click on the Python2Operatory to see the icons.

35.  Click on the Script icon.

36.  Delete the existing content in the Python script editor

37.  Copy the contents of the SentimentAnalysisOnProductReviews.py file and paste them in the Python script editor.

The first part of the script contains code that would be written by a data scientist.  It parses the contents of the HDFS files and uses the textblob library to extract the sentiments being expressed in the text (positive, negative, neutral).

The second part of the code is specific to Data Hub as it takes the input from the upstream operator, calls functions in the script for processing and passes the output to the downstream operator.

It also listens for a notification from the upstream Read File operator to indicate all files have been read and notifies the downstream operator.

import hashlib
import json
import textblob

def force_unicode(m):
    try:
        return unicode(m)
    except UnicodeDecodeError:
        ascii = str(m).encode('string_escape')
        return unicode(ascii)

def parse_review_body(txt):
    """
    Parse reviews provided by textual body (one review per line).
    """
    lines = txt.split("\n")
    records = []
    for line in lines:
        # forcefully handle encoding issues
        line = force_unicode(line.strip())
        if line == "": continue
        records.append(parse_review(line))
    jsonout = json.dumps(records)
    return jsonout

def parse_review(line):
    """
    Parses a review of format: <PRODUCT-ID> Review: <REVIEW-TEXT>.
    Extracts the following attributes and textual features:
    
    Extracts product ID (ID), text length (LENGTH), text (TEXT), sentiment polarity (POLARITY), and
    sentiment subjectivity (SUBJECTIVITY). Returns information as a dictinary.
    """
    try:
        md5 = hashlib.md5(line).hexdigest()
        rid, text = line.split(": ", 1)
        tb = textblob.TextBlob(text)
        return { "ID": rid[:7],
                 "MD5": md5,
                 "LENGTH": len(text), 
                 "TEXT": force_unicode(line),
                 "POLARITY": tb.polarity,
                 "SUBJECTIVITY": tb.subjectivity }
    except ValueError, e:
        raise ValueError("Line does not match expceted format \"<PRODUCT-ID> Review: <REVIEW-TEXT>\"; LINE: \"%s\"; ERROR: %s" % (line, str(e)))
    except Exception, e:
        # just forward
        raise e

# ////////////////////////////////////////////////////////////
# Wrap parser in python operator
# ////////////////////////////////////////////////////////////

def on_input(msg):
    # inform downstream operators about last file:
    # set message.commit.token = 1 for last file
    commit_token = "0"
    if msg.attributes["storage.endOfSequence"]:
        commit_token = "1"
    
    # parse the line-based input    
    parsed_as_json = parse_review_body(msg.body)
    
    output_message = api.Message(parsed_as_json, {"message.commit.token": commit_token})
    api.send("output", output_message)

api.set_port_callback("input", on_input)

38.  Close the script window and go back to the graph.

To pass the contents of the Python2Operator to the next operator, we must add an output port to the Python2Operator.

39.  Click on the Add Port icon.

40.  Set the Name to output.

41.  Set the Type to message.

42.  Click on the Output radio button.

43.  Click OK.

44.  Right click on the Python2Operator and click on Group.

By adding the Python2Operator in a group, we can further define its runtime environment.

The script in the Python2Operator is using the textblob Python library so we need to make sure that it’s available in the runtime environment.

45.  Click on Group to see the icons.

46.  Click on the Configuration icon.

47.  Add a new tag.

48.  Set the tag property to textblob

49.  Set the version to 0.12.0.

50.  Connect the output port of the Python2Operator operator to the in port of the Vora Avro Ingestor operator to store the results of the Python processing in SAP Vora.

51.  Click on the Vora Avro Ingestor operator to see the icons.

52.  Click on the Configuration icon.

The Vora Avro Ingestor can ingest data in Avro, JSON or CSV format.  We’ll be using JSON because that is what the Python script is producing.

53.  Set the format to json.

The defaultAvroSchema property is a JSON string used to define the structure of the table where the data will be stored in SAP Vora.  If the table doesn’t exist, the operator will create it.

54.  In DH2.5 this configuration is changed to a more user-friendly UI, click on edit pen.

55.  Set Avro Schema. Follow the schema in AvroSchema.txt

The JSON mode is also available, you can click the button on upright corner of the window

{    "type": "record",   "namespace": "",   "name": "PRODUCT_REVIEWS",   "fields": [      { "name": "ID", "type": "string", "maxLength": 7 },     { "name": "MD5", "type": "string", "maxLength": 32, "primaryKey": true },     { "name": "LENGTH", "type": "int" },     {"name": "TEXT", "type": "string", "maxLength": 1024 },     { "name": "POLARITY", "type": "float" },     {"name": "SUBJECTIVITY", "type": "float" }   ] }

56.  Open the Connection editor.

57.  Set the Configuration Type to Configuration Manager.

58.  Set the Connection ID to VORA.

59.  Click on Save.

60.  Set the databaseSchema to “default\taXX”, where XX is your assigned group number.

61.  Set the ingestionMode to UPSERT.

62.  In DAT361 exercise, we directly ingest the json created by python operator with “Vora Avro Ingestor”.

Now we want to test the ingestor to consume .csv format. By adding a “Format Converter”

63.  The “Format Converter” can only accept blob as input and output parameters.

We need to add a “ToBlob Converter” at the inbound and a “ToMessage Converter” at the outbound.

64.  Go to configuration of “Format Converter”, set Target Format to CSV.

In Fields, put “POLARITY,TEXT,LENGTH,SUBJECTIVITY,ID,MD5”, this would be the header line of the record fields.

Set CSV Header included to True.

65.  Change the format of Vora Avro Ingestor to CSV, because we are now testing for CSV ingestion.

66.  Scroll down find csvHeaderIncluded*, we need to change this to “True” to match with Format Converter.

67.  Connect the operators we just created, put Format Converters in between Python2 and Vora Avro Ingerstor.

68.  Add a Wiretap operator to the graph.

69.  Run the pipeline, when it is status running, open Wiretap UI to check the output.
You can see all 10 files are processed and loaded into Vora table.

70.  However, the message event counter is missing in these messages, because they were lost during conversion.
If we want to add a terminator to this graph, Message Stop Event will throw an error.
MessageGraph failure: b’operator.com.sap.message.stop_event_handler:messagestopeventhandler1′: Error while executing callback registered on port(s) [‘input’]: u’message.commit.token’ [line 4]|

71.  Let’s change Vora Avro Ingestor back to “json” format

72.  Connect Python operator directly with Vora Avro Ingetsor

73.  Click on the Save icon to save the latest changes.

74.  Click on the Run icon and execute the pipeline.

75.  Wait until the pipeline has executed successfully.  Now it’s time to see the results.

76.  From the SAP Data Hub launchpad, click on the Vora Tools tile.

77.  Double click on the default\taXX schema where XX is your assigned group number.

78.  Right click on PRODUCT_REVIEWS_1_FROMCSV  and click on Data Preview.

79.  Congratulations!  You’ve successfully completed the exercise.

To recap, you created a pipeline that reads product review files from HDFS, uses Python to extract sentiments from those product reviews and load them into SAP Vora.

 

Thanks for reading. Hope this is useful.

Be the first to leave a comment
You must be Logged on to comment or reply to a post.