Skip to Content
Technical Articles

SAP Data Intelligence – Amazon Rekognition

As you may know SAP Data Intelligence has the capability of creating, deploying and running Machine Learning projects allowing you to to bring our own machine learning expertise using technologies such as TensorFlow to solve a particular business problem (see this excellent post by Frank Schuler ).

But what if you do not have the in house expertise? What if you would like to leverage externally developed and hosted Machine Learning services?

 

This blog post shows you how to leverage the AWS Rekognition classification and image processing and analysis services for classification by gender, emotional expressions and age range. We will go through the requirements, steps taken to implement a simple pipeline using custom Python operator and custom Python code without any time spend on developing and testing ML models.

SAP Data Intelligence also allows you to orchestrate processes and leverage various cloud based services such as AWS Rekognition ML classification and image analysis.

 

In our test scenario we are harvesting images tweeted for a specific hashtag on Twitter, images contains faces or individuals or groups of people.

 

Table of contents

Requirements

You need an AWS account with Console access, for more detail see Rekognition documentation you also need an SAP Data Intelligence instance or trial edition see this blog post posted by Dimitri Vorobiev.

We will be using an existing AWS account and credentials within our pipeline in order to access S3 and Rekognition services. To obtain your AWS account credentials see the AWS documentation.

When it comes to storing and managing the results from our pipeline we will be using SAP HANA Cloud. To start a SAP Hana Cloud trial you can click here.

If you are not familiar with SAP Hana Cloud, then please look at the excellent blog post on Getting Started with SAP HANA Cloud II | Basics by Denys van Kempen.

Next we will need SAP HANA Cloud connection configured in Data Intelligence Connection Manager, this connection will be used by the SAP HANA Client Operator to write the pipeline results to a table we will be creating.

We can see our SAP HANA Cloud connection definition below:

 

We will also need a custom Docker file to satisfy our Python requirements specifically the boto3 AWS library , we build the Docker image via the data Intelligence Modeler. It will take some time to build:

FROM $com.sap.sles.base

RUN python3.6 -m pip install --user tweepy
RUN python3.6 -m pip install --user pandas
RUN python3.6 -m pip install --user boto3
RUN python3.6 -m pip install --user tornado==5.0.2
RUN python3.6 -m pip install --user wget

With the requirements fulfilled we can move on to the next step and that will be to explore the AWS Rekognition services via the AWS Console.

 

Exploring AWS Rekognition

This service provides many other services such as text analysis – we will look at Facial Analysis specifically for our scenario.

The console allows you to explore attributes that are returned in Json format for easy processing. An sample image is provided by AWS, but you can upload your own where you can look at all facial analysis attribute result set.

To get to the required service we will simply search for the “Rekognition” service once you have logged into your AWS console there are many other services that AWS provides:

 

I took my own profile picture as an example, on the righthand side we can see various facial attributes extracted via Rekognition ML:

 

We can also look at the Json formatted Response and view the request to the API – This is a sample of the results that we will be processing in our data pipeline, the key interests for us is gender confidence,  facial emotional expression confidence levels.

 

 

We can now focus on the Data Intelligence pipeline after exploring the Rekognition Facial Analysis service – we now have better understanding of the result set and can formulate ideas on how to best process it in our data pipeline.

SAP Data Intelligence Pipeline

We have chosen Twitter as our image “delivery” platform, but this could be any other platform or even cloud based storage such as S3 Buckets or Azure Data Lake depending on the use case and scenario.

Our tweeted images are harvested from Twitter hash tag #digitalfactorydatatest using a custom Python operator tagged to the custom Docker image specified in the requirements, we will be using this tag/image in all subsequent Python operator in this pipeline to exchange images, data to and from S3 and the Rekognition ML services (via boto3 library).

The data pipeline consists out of four primary Python operators and their functions are very specific to tasks that must be executed:

  1. Harvest Tweeted image(s) using the Tweepy streaming Python library.
  2. Copy downloaded images from local temporary file system to an S3 bucket (boto3 client).
  3. Image classification – submit S3 bucket based images to AWS Rekognition facial analysis ML services for classification. We also deal with the result set here and prepare the results in CSV format. The CSV formatted results will be sent downstream to the HANA Client operator.
  4. SAP HANA Client operator uses the connection to SAP HANA Cloud defined in the Data Intelligence configuration manager.

 

Tagging Docker image to Python operator.

Each of the Python operator’s code is listed below:

Twitter Image harvest operator

We will be using Tweepy an easy-to-use library, Tweepy makes it easier to use the twitter streaming api by handling authentication, connection, creating and destroying the session, reading incoming messages. Our hashtag in this test is #digitalfactorydatatest our operator will be downloading images and associated meta data tweeted.

import sys
import os
import shutil
import pathlib
import string
import time
import json
import csv
import wget
from datetime import datetime
from tweepy import Stream
from tweepy.streaming import StreamListener
from tweepy import API
from tweepy import OAuthHandler
# from twitter_client import get_twitter_auth

# export PYTHONIOENCODING=UTF-8 avoid UnicodeEncodeError: 'ascii' codec can't encode character errors

def get_twitter_auth():
    """Setup Twitter authentication.
    Return: tweepy.OAuthHandler object"""
    try:
        
        
        consumer_key = 'USE YOUR OWN'
        consumer_secret = 'USE YOUR OWN'
        access_token = 'USE YOUR OWN'
        access_secret = 'USE YOUR OWN'
        
        
    except KeyError:
        sys.stderr.write("TWITTER_* environment variables not set\n")
        sys.exit(1)
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)
    return auth


def get_twitter_client():
    """Setup Twitter API client.

    Return: tweepy.API object"""
    auth = get_twitter_auth()
    client = API(auth,wait_on_rate_limit=True)
    return client

api.send("debug", str("Before class"))


class CustomListener(StreamListener):
    """Custom StreamListener for streaming Twitter data."""

    def on_data(self, data):
        try:
            all_data = json.loads(data)
            
            api.send("debug", str("Before if statement"))
    

            # If image is tweeted then entities is > 4 and download image
            
            if len(all_data["entities"]) > 4:
                
                # l = len(all_data["entities"]
                
                api.send("debug", "Inside if")
                

                # extract meta-data we need

                id_str = all_data["id_str"]
                date = all_data["created_at"]
                tweet = all_data["text"]
                user_id = all_data["user"]["id_str"]
                username = all_data["user"]["screen_name"]
                location = all_data["user"]["location"]
                followers_count = all_data["user"]["followers_count"]
                friends_count = all_data["user"]["friends_count"]
                listed_count = all_data["user"]["listed_count"]
                
                # api.send("output", str(id_str))
                
                


                output_folder = '/vrep/vflow/scripts/twitter/twitter_pictures/' + id_str
                
                api.send("debug", str("Saved output_folder"))

                # Save Tweet meta-data to json file for futher processing later
                file = pathlib.Path(output_folder)
                
                if file.exists ():
                    print("Recreate folder")
                    shutil.rmtree(output_folder)
                    os.makedirs(output_folder)

                else:
                    print("Create folder")
                    os.makedirs(output_folder)

                for media_ in all_data["entities"]["media"]:
                    download_ = media_["media_url"]
                    # filter_ = media_["description"]
                    
                    try:
                        
                        filter_ = media_["description"]
                        
                    except:
                        
                        filter_ = None
                        # print("pass", filter_)

                    # print(id_str, date, username, filter_, download_, location)

                    filename = filename = download_.split('media/')[1]
                    file_datetime = datetime.strftime(datetime.strptime(date,'%a %b %d %H:%M:%S +0000 %Y'), '%Y-%m-%d%H:%M:%S')

                    wget.download(download_, out=output_folder+'/'+ file_datetime + "_" + filename)
                    
                    
                    
                    # api.send("output", str(id_str))
                    
                    
                # Save tweet meta-data
                with open(output_folder + '/all_data.json', 'w') as outfile:
                    json.dump(all_data, outfile)
                    
                    api.send("debug", str("Downloaded image"))
                
                api.send("output", str(id_str))
                
            else:
                    
                api.send("output1", str("No image..."))


        except BaseException as e:
            sys.stderr.write("Error on_data: {}\n".format(e))
            time.sleep(5)
        return True

    def on_error(self, status):
        
        api.send("debug", str(status))
        
        if status == 420:
            sys.stderr.write("Rate limit exceeded\n".format(status))
            return False
        else:
            sys.stderr.write("Error {}\n".format(status))
            return True
            
        


if __name__ == '__main__':
    # query = sys.argv[1:]  # list of CLI arguments
    query = ['#digitalfactorydatatest']
    query_fname = ' '.join(query)  # string
    auth = get_twitter_auth()
    twitter_stream = Stream(auth, CustomListener(query_fname))
    # twitter_stream.filter(track=query, encoding = 'unicode-escape')
    twitter_stream.filter(track=query)
    

 

Managing image storage via S3

In order to manage image storage we will be using an AWS S3 bucket for image uploads, we need our AWS account credentials (as mentioned in the requirements) to facilitate this.

Amazon Rekognition can analyse images that are stored in an Amazon S3 bucket or images that are supplied as image bytes.

import time
import shutil
import os
# import urllib.request
import boto3
import csv

def on_input(data):
    id_str = data
    
    aws_credentials = '/vrep/vflow/scripts/twitter/credentials.csv'

    # Setup AWS permissions
    with open(aws_credentials, 'r') as input:
        reader = csv.reader(input)
    
        for line in reader:
            access_key_id = line[0]
            secret_access_key = line[1]
    
    # Setup Boto client
    s3client = boto3.client('s3',
    aws_access_key_id = access_key_id, 
    aws_secret_access_key = secret_access_key,
    region_name = 'eu-west-1')
    
    
    # will be dynamic from upstream twitter operator
    # id_str = '1288097597010444295'
    bucket = 'USE-YOUR-OWN-BUCKET-HERE'
    output_folder = '/vrep/vflow/scripts/twitter/twitter_pictures/' + id_str
   
    
    os.chdir(output_folder)
    
    uploads = os.listdir(output_folder)
    
    imgs = []
    for file in uploads:
    #     print(file)
        imgs.append(file)
    print(imgs)
        
    for files in imgs:
        file = id_str + '/' + files
        s3client.upload_file(files, bucket,file)
    
    api.send("output", str(id_str))

api.set_port_callback("input", on_input)

 

Image classification operator

In this operator we are leveraging AWS Rekognition ML services, push images to Rekognition ML service.

We then process the results and finally send results downstream to HANA Client operator to persist data in SAP HANA Cloud.

 

import os.path
import json
import time
import pandas as pd
from pandas.io.json import json_normalize
import boto3
import csv



def on_input(data):
    
  
    
    # Tweet metadata
    id_str = data
    
    
   
    raw_data = '/vrep/vflow/scripts/twitter/twitter_pictures/' + id_str + '/all_data.json'
    data = json.load(open(raw_data))
    
    # Tweet Data
    tweet_created_at = data['created_at']
    tweet_id_str = data['id_str']
    tweet_text = data['text']
    tweet_geo = data['geo']
    tweet_coordinates = data['coordinates']
    tweet_place = data['place']
    
    # User data
    user_id = data['user']['id_str']
    user_name = data['user']['name']
    screen_name = data['user']['screen_name']
    location = data['user']['location']
    followers_count = data['user']['followers_count']
    friends_count = data['user']['friends_count']
    created_at = data['user']['created_at']
    time_zone = data['user']['time_zone']
    geo_enabled = data['user']['geo_enabled']
    lang = data['user']['lang']
    following = data['user']['following']
    
    
    aws_credentials = '/vrep/vflow/scripts/twitter/credentials.csv'

    # Setup AWS permissions
    with open(aws_credentials, 'r') as input:
        reader = csv.reader(input)
    
        for line in reader:
            access_key_id = line[0]
            secret_access_key = line[1]
    
    # setup boto aws S3 client
    s3client = boto3.client('s3',
    aws_access_key_id = access_key_id, 
    aws_secret_access_key = secret_access_key,
    region_name = 'eu-west-1')
    
    # setup boto Rekognition aws client
    facesclient = boto3.client('rekognition',
    aws_access_key_id = access_key_id, 
    aws_secret_access_key = secret_access_key,
    region_name = 'eu-west-1')
    
    # Reading files
    image_data = s3client.list_objects(
        Bucket='USE-YOUR-OWN-BUCKET-HERE',
        Prefix= id_str + '/20'
    )
    
    # list of images 
    images = []
    gender_90 = []
    age = []
    shortcodes = []
    em = dict
    mood = []
    moods = []
    emotions = []
    
    list_all = []
    
    
    
    df_results = pd.DataFrame()
    df_tweet = pd.DataFrame()  
    
 
    
    # check if 'media' key exists (only created when image is uploaded)
    try:
        
        api.send("debug", str("Image - AWS processing required..."))
        
        media_id = data['entities']['media'][0]['id_str']   
        media_media_url = data['entities']['media'][0]['media_url']
    
        tweet_data = str(tweet_created_at) + ";" + str(tweet_id_str) + ";" + str(tweet_text).replace(";", " ").replace("\n", " ") + ";" + str(tweet_geo) + ";" + str(tweet_coordinates) + ";" + str(tweet_place)
        user_data = str(user_id) + ";" + str(user_name) + ";" + str(screen_name) + ";" + str(location) + ";" + str(followers_count) + ";" + str(friends_count) + ";" + str(created_at) + ";" + str(time_zone) + ";" + str(geo_enabled) + ";" + str(lang) + ";" + str(following)
        media_data = str(media_id) + ";" + str(media_media_url)
    
        all_data = tweet_data + ";" + user_data + ";" + media_data
        cols = ['tweet_created_at', 'tweet_id_str', 'tweet_text', 'tweet_geo', 'tweet_coordinates', 'tweet_place', 'user_id', 'user_name', 'screen_name', 'location', 'followers_count', 'friends_count', 'created_at', 'time_zone','geo_enabled', 'lang', 'following', 'media_id', 'media_media_url']
    
        df_tweet = pd.DataFrame([x.split(';') for x in all_data.split('\n')], columns=cols)
    
    
        # Append images uploaded to bucket to list (data returns dict)
        for key, value in image_data.items():
            if key == 'Contents':
                for obj in value:
                    images.append(obj['Key'])
    
    
        # print(images)
        # 
        # Main loop, upload each image in bucket to Rekognition for classification
        #    
        for image in images:
            photo = image
    
            # Submit photo, get all image attributes from classification 
            response = facesclient.detect_faces(
                Image={
                'S3Object': {
                    'Bucket': 'USE-YOUR-OWN-BUCKET-HERE',
                    'Name': photo,
                },
            },
             Attributes=['ALL']
             )
    
            for key, value in response.items():
                if key == 'FaceDetails':
                    for people_attr in value:
                        gender_value = people_attr['Gender']
                        box_pos = people_attr['BoundingBox']
                        age_range = people_attr['AgeRange']
                        confidence = gender_value['Confidence']
                        gender = gender_value['Value']
                        emotion_value = people_attr['Emotions']
                        age_low = str(age_range['Low'])
                        age_high = str(age_range['High'])
    
                        box_width = box_pos['Width']
                        box_height = box_pos['Height']
                        box_left = box_pos['Left']
                        box_top = box_pos['Top']
                        age_ranges = age_low + "-" + age_high
    
    
    
                        for ec in emotion_value:
                            if ec['Confidence'] >= 20 and confidence >= 20:
                                ec_data = ec['Confidence'], ec['Type'], age_ranges, photo, gender, confidence,box_width, box_height,box_left, box_top 
                                list_all.append(ec_data)
    
    
        cols = ['emotion_confidence', 'emotion', 'age_ranges', 'shortcode', 'gender', 'gender_confidence', 'box_width', 'box_height','box_left', 'box_top']
        
        df_data = pd.DataFrame(list_all)
        df_data.columns = cols
    
    
        split_df = df_data["shortcode"].str.split('/', expand=True)
        split_df.columns = [f"shortcode_{id_}" for id_ in range(len(split_df.columns))]
        df_data = pd.merge(df_data, split_df, how="left", left_index=True, right_index=True)
        df_data = df_data.rename(columns={'shortcode_0': 'tweet_id_str'})
        df_results = pd.merge(df_data, df_tweet, how='inner', on=['tweet_id_str'])
        # df_results
        
        to_sac = df_results.to_csv(sep = ";", index=False, header=False)
    

        api.send("output", to_sac)
        api.send("debug", to_sac)
        


    except Exception as e:
        api.send('debug', str(e))

        
api.set_port_callback("input", on_input)    



 

Persisting results in SAP HANA Cloud

We create a table in our schema with the following structure:

 

CREATE COLUMN TABLE "DBADMIN"."TWITTER"(
	"EMOTIONAL_CONFIDENCE" DOUBLE,
	"EMOTION" NVARCHAR(20),
	"AGE_RANGE" NVARCHAR(10),
	"SHORTCODE" NVARCHAR(200),
	"GENDER" NVARCHAR(10),
	"GENDER_CONFIDENCE" DOUBLE,
	"BOX_WIDTH" NVARCHAR(100),
	"BOX_HEIGHT" NVARCHAR(100),
	"BOX_LEFT" NVARCHAR(100),
	"BOX_TOP" NVARCHAR(100),
	"TWEET_ID_STR" NVARCHAR(100),
	"SHORTCODE_1" NVARCHAR(100),
	"TWEET_CREATED_AT" NVARCHAR(100),
	"TWEET_TEXT" NVARCHAR(500),
	"TWEET_GEO" NVARCHAR(100),
	"TWEET_COORDINATES" NVARCHAR(100),
	"TWEET_PLACE" NVARCHAR(100),
	"USER_ID" NVARCHAR(100),
	"USER_NAME" NVARCHAR(100),
	"SCREEN_NAME" NVARCHAR(100),
	"LOCATION" NVARCHAR(100),
	"FOLLOWERS_COUNT" NVARCHAR(100),
	"FRIENDS_COUNT" NVARCHAR(100),
	"CREATED_AT" NVARCHAR(100),
	"TIME_ZONE" NVARCHAR(100),
	"GEO_ENABLED" NVARCHAR(100),
	"LANG" NVARCHAR(100),
	"FOLLOWING" NVARCHAR(100),
	"MEDIA_ID" NVARCHAR(100),
	"MEDIA_MEDIA_URL" NVARCHAR(100)
)
UNLOAD PRIORITY 5 AUTO MERGE;

 

 

Putting it all together – Demonstration

First we start the Pipeline once running we will continue to tweet an image using our target hashtag:

 

 

You can tweet any image for testing the pipeline, in this case I am tweeting my profile picture using #digitalfactorydatatest once the tweeted the pipeline will harvest the image within a second or two. Once the image is harvested the pipeline will submit the image to AWS Rekognition for classification.

 

 

We will then query our SAP HANA Cloud resident table to view the results using SAP HANA Database Explorer:

Data is now persisted in a real-time frequency of tweets streaming in via our data pipeline, data can now be further processed in as needed by reporting, applications.

This now concludes our test.

 

Summary

You have used SAP Data Intelligence and AWS Rekognition image recognition and analysis machine learning services to classify and identify gender, age range and emotional expression from images tweeted, all without any time spend developing, training and testing machine learning models!

An example would be businesses can leverage SAP Data Intelligence AWS Rekognition and SAP HANA Cloud to quickly build, deploy and manage machine learning services to measure brand coverage, discover content syndication and more.

 

 

I hope you have found this article useful. For any further product related information please feel free to look at  SAP Data Intelligence , SAP HANA Cloud  and AWS Rekognition or use the comments below.

 

1 Comment
You must be Logged on to comment or reply to a post.