Disclaimer :

This tutorial is intended as a guide for the creation of demo/test data only. The scripts provided in this blog are not intended for use in a productive system.

Purpose :

This blog explains harvesting of historical twitter data through GNIP. The pre-installed Python Interpreter from the SAP HANA Client is used to execute Python scripts from the SAP HANA Studio. The different scripts (discussed later) are used to harvest historical twitter data from GNIP and store the useful data into Business Suite Foundation tables SOCIALDATA and SOCIALUSERINFO.

Prerequisites :

Make sure the following prerequisites are met before you start.

  • Installation of SAP HANA Studio and SAP HANA Client
    Install SAP HANA Studio and SAP HANA Client and apply for a HANA user with read, write and update authorization for foundation database tables SOCIALDATA and SOCIALUSERINFO.
  • Create a GNIP account
  • Enable Historical PowerTrack API in your GNIP account
    For harvesting historical data, the Historical PowerTrack API should be enabled for you account. If it is not already enabled for your account, contact you account manger to do the same.

Setup :

For initial setup and configuration refer the blog http://scn.sap.com/community/crm/marketing/blog/2014/09/29/twitter-data-harvesting-adapter-using-python-script-for-gnip

Code:

Harvesting historical twitter data from GNIP involves 3 basic steps :

1. Request Historical Job : You can request a Historical PowerTrack job by making a HTTP POST requests to the API endpoint. You will need to include a fromDate, toDate, rules and some additional metadata in the JSON POST body.

To request a Historical PowerTrack job, send a request to the following URL with you user credentials and a POST body similar to the following example.

https://historical.gnip.com/accounts/<GNIP_ACCOUNT_NAME>/jobs.json

create_job.py


import urllib2
import base64
import json
UN = '' # YOUR GNIP ACCOUNT EMAIL ID
PWD = ''
account = '' # YOUR GNIP ACCOUNT USER NAME
def get_json(data):
    return json.loads(data.strip())
def post():
    url = 'https://historical.gnip.com/accounts/' + account + '/jobs.json'
    publisher = "twitter"
    streamType = "track"
    dataFormat = "activity-streams"
    fromDate = "201410140630"
    toDate = "201410140631"
    jobTitle = "job30"
    rules = '[{"value":"","tag":""}]'
    jobString = '{"publisher":"' + publisher + '","streamType":"' + streamType + '","dataFormat":"' + dataFormat + '","fromDate":"' + fromDate + '","toDate":"' + toDate + '","title":"' + jobTitle + '","rules":' + rules + '}'
    base64string = base64.encodestring('%s:%s' % (UN, PWD)).replace('\n', '')
    req = urllib2.Request(url=url, data=jobString)
    req.add_header('Content-type', 'application/json')
    req.add_header("Authorization", "Basic %s" % base64string)
   
    proxy = urllib2.ProxyHandler({'http': 'http://proxy:8080', 'https': 'https://proxy:8080'})
    opener = urllib2.build_opener(proxy)
    urllib2.install_opener(opener)
    try:
        response = urllib2.urlopen(req)
        the_page = response.read()
        the_page = get_json(the_page)
        print 'Job has been created.'
        print 'Job UUID : ' + the_page['jobURL'].split("/")[-1].split(".")[0]
    except urllib2.HTTPError as e:
        print e.read()
       
if __name__=='__main__':
    post()












The above code creates a job and receives a UUID for the job created in a JSON file along with other details.

The UUID is used to run the following scripts.

  • accept_reject.py
  • get_links.py
  • monitor_job.py

Note : put the above UUID in the ‘url’ parameter in each of the above scripts before executing them.

2. Accept/Reject a Historical Job : After delivery of the estimate, you can accept or reject the previously requested job with a HTTP PUT request to the job URL endpoint. To accept or reject the estimate, send a request to the following URL with your user credentials and a POST body (example below) updating the status to accept (or reject).

https://historical.gnip.com/accounts/<GNIP_ACCOUT_NAME>/publishers/twitter/historical/track/jobs/<uuid>.json

The following is an example of a valid POST body:


{
"status": "accept"
}


















accept_reject.py


import urllib2
import base64
import json
uuid = ''
account = ''
url = 'https://historical.gnip.com:443/accounts/'+account+'/publishers/twitter/historical/track/jobs/'+ uuid + '.json'
UN = '' # YOUR GNIP ACCOUNT EMAIL ID
PWD = ''
account = '' # YOUR GNIP ACCOUNT USER NAME
def get_json(data):
    return json.loads(data.strip())
def accept_reject():
   
    choice = 'accept' # Switch to 'reject' to reject the job.
    payload = '{"status":"' + choice + '"}'
   
    base64string = base64.encodestring('%s:%s' % (UN, PWD)).replace('\n', '')
    req = urllib2.Request(url=url, data=payload)
    req.add_header('Content-type', 'application/json')
    req.add_header("Authorization", "Basic %s" % base64string)
    req.get_method = lambda: 'PUT'
   
    proxy = urllib2.ProxyHandler({'http': 'http://proxy:8080', 'https': 'https://proxy:8080'})
    opener = urllib2.build_opener(proxy)
    urllib2.install_opener(opener)
   
    try:
        response = urllib2.urlopen(req)
        the_page = response.read()
        the_page = get_json(the_page)
        uuid = the_page['jobURL'].split("/")[-1].split(".")[0]
        print 'Job has been accepted.'
        print 'UUID : ' + uuid
    except urllib2.HTTPError as e:
        print e.read()
   
if __name__=='__main__':
    accept_reject()

















If the job has not been created successfully before running this script. you will get the below error message

{“status”:”error”,”reason”:”Invalid state transition: Job cannot be accepted”}‘.

After accepting the job, you can monitor its status by sending a request to the following URL with your user credentials.

https://historical.gnip.com/accounts/<GNIP_ACCOUNT_NAME>/publishers/twitter/historical/track/jobs/<uuid>.json

A request to the above URL gives us a JSON file containing “percentComplete” field along with other details about the running job. This field can be used to check whether the job has been completed or not.

monitor_job.py


import urllib2
import base64
import json
uuid = ''
account = ''
url = 'https://historical.gnip.com:443/accounts/'+ account +'/publishers/twitter/historical/track/jobs/' + uuid + '.json'
UN = '' # YOUR GNIP ACCOUNT EMAIL ID
PWD = ''
account = '' # YOUR GNIP ACCOUNT USER NAME
def get_json(data):
    return json.loads(data.strip())
class RequestWithMethod(urllib2.Request):
    def __init__(self, url, method, headers={}):
        self._method = method
        urllib2.Request.__init__(self, url, headers)
    def get_method(self):
        if self._method:
            return self._method
        else:
            return urllib2.Request.get_method(self)
def monitor():
    base64string = base64.encodestring('%s:%s' % (UN, PWD)).replace('\n', '')
   
    req = RequestWithMethod(url, 'GET')
    req.add_header('Content-type', 'application/json')
    req.add_header("Authorization", "Basic %s" % base64string)
   
    proxy = urllib2.ProxyHandler({'http': 'http://proxy:8080', 'https': 'https://proxy:8080'})
    opener = urllib2.build_opener(proxy)
    urllib2.install_opener(opener)
   
    try:
        response = urllib2.urlopen(req)
        the_page = response.read()
        the_page = get_json(the_page)
        print the_page
        print 'Statu Message : ' + str(the_page['statusMessage'])
        print 'Percent Complete : ' + str(the_page['percentComplete']) + ' %'
        if 'quote' in the_page:
            print 'Estimated File Size in Mb : ' + str(the_page['quote']['estimatedFileSizeMb'])
            print 'Expires At : ' + str(the_page['quote']['expiresAt'])     
    except urllib2.HTTPError as e:
        print e.read()
           
if __name__ == '__main__':
    print url
    monitor()













3. Retrieve Historical Data : When the job has completed, GNIP will provide a dataURL endpoint containing a list of file URLs that can be downloaded in parallel. To retrieve this list of data files, send a HTTP GET request to the following URL with your user credentials :

https://historical.gnip.com/accounts/<GNIP_ACCOUNT_NAME>/publishers/twitter/historical/track/jobs/<uuid>/results.json

get_links.py


import urllib2
import base64
import json
uuid = ''
account = ''
url = final_url = 'https://historical.gnip.com/accounts/'+ account +'/publishers/twitter/historical/track/jobs/'+ uuid +'/results.json'
UN = '' # YOUR GNIP ACCOUNT EMAIL ID
PWD = ''
account = '' # YOUR GNIP ACCOUNT USER NAME
def get_json(data):
    return json.loads(data.strip())
def get_links():
    base64string = base64.encodestring('%s:%s' % (UN, PWD)).replace('\n', '')
    req = urllib2.Request(url)
    req.add_header('Content-type', 'application/json')
    req.add_header("Authorization", "Basic %s" % base64string)
   
    proxy = urllib2.ProxyHandler({'http': 'http://proxy:8080', 'https': 'https://proxy:8080'})
    opener = urllib2.build_opener(proxy)
    urllib2.install_opener(opener)
    try:
        response = urllib2.urlopen(req)
        the_page = response.read()
        the_page = get_json(the_page)
        print 'Total URL count : ' + str(the_page['urlCount']) + '\n'
        for item in the_page['urlList']:
            print item
    except urllib2.HTTPError:
        print 'Job is not yet delivered.'
if __name__=='__main__':
    get_links()















The above code returns a list of URLs using which data can be downloaded. For each URL returned by get_links.py, we can run get_data.py to get the data from that URL and store the data into SOCIALDATA and SOCIALUSERINFO tables.

Note : Copy the links returned(one at a time) by get_links.py to the ‘url’ variable before executing get_data.py.

get_data.py


import urllib2
import zlib
import threading
from threading import Lock
import sys
import ssl
import json
from datetime import datetime
import calendar
import dbapi
from wsgiref.handlers import format_date_time
CHUNKSIZE = 4*1024
GNIPKEEPALIVE = 30
NEWLINE = '\r\n'
# GNIP ACCOUNT DETAILS
url = ''
HEADERS = { 'Accept': 'application/json',
           'Connection': 'Keep-Alive',
            'Accept-Encoding' : 'gzip' }
server = ''
port =
username_hana = ''
password_hana = ''
schema = ''
client = ''
socialmediachannel = 'TW'
print_lock = Lock()
err_lock = Lock()
hdb_target = dbapi.connect(server, port, username_hana, password_hana)
cursor_target = hdb_target.cursor()
class procEntry(threading.Thread):
    def __init__(self, buf):
        self.buf = buf
        threading.Thread.__init__(self)
    def unicodeToAscii(self, word):
        return word.encode('ascii', 'ignore')
   
    def run(self):
        for rec in [x.strip() for x in self.buf.split(NEWLINE) if x.strip() <> '']:
            try:
                jrec = json.loads(rec.strip())
                with print_lock:
                    res = ''
                    if 'verb' in jrec:
                        verb = jrec['verb']
                        verb = self.unicodeToAscii(verb)
                        # SOCIALUSERINFO DETAILS
                        socialUser = jrec['actor']['id'].split(':')[2]
                        socialUser = self.unicodeToAscii(socialUser)
                        socialUserProfileLink = jrec['actor']['link']
                        socialUserProfileLink = self.unicodeToAscii(socialUserProfileLink)
                        socialUserAccount = jrec['actor']['preferredUsername']
                        socialUserAccount = self.unicodeToAscii(socialUserAccount)
                        friendsCount = jrec['actor']['friendsCount']
                        followersCount = jrec['actor']['followersCount']
                        postedTime = jrec['postedTime']
                        postedTime = self.unicodeToAscii(postedTime)
                        displayName = jrec['actor']['displayName']
                        displayName = self.unicodeToAscii(displayName)
                        image = jrec['actor']['image']
                        image = self.unicodeToAscii(image)
                       
                        # SOCIALDATA DETAILS
                        socialpost = jrec['id'].split(':')[2]
                        socialpost = self.unicodeToAscii(socialpost)
                        createdbyuser = socialUser
                        creationdatetime = postedTime
                        socialpostlink = jrec['link']
                        creationusername = displayName
                        socialpostsearchtermtext = jrec['gnip']['matching_rules'][0]['value']
                        socialpostsearchtermtext = self.unicodeToAscii(socialpostsearchtermtext)
                       
                        d = datetime.utcnow()
                        time = d.strftime("%Y%m%d%H%M%S")
                       
                        creationdatetime_utc = datetime.strptime(postedTime[:-5], "%Y-%m-%dT%H:%M:%S")
                        creationdatetime_utc = creationdatetime_utc.strftime(("%Y%m%d%H%M%S"))
                       
                        stamp = calendar.timegm(datetime.strptime(creationdatetime[:-5], "%Y-%m-%dT%H:%M:%S").timetuple())
                        creationdatetime = format_date_time(stamp)
                        creationdatetime = creationdatetime[:-4] + ' +0000'
                       
                        if verb == 'post':
                            socialdatauuid = jrec['object']['id'].split(':')[2]
                            socialdatauuid = self.unicodeToAscii(socialdatauuid)
                            socialposttext = jrec['object']['summary']
                            socialposttext = self.unicodeToAscii(socialposttext)
                            res = socialUser + '\t'  + socialUserAccount + '\t' + str(friendsCount) + '\t' + str(followersCount) + '\t' + postedTime + '\t' + displayName + '\t' + displayName.upper() + '\t' + socialUserProfileLink + '\t' +image
                        elif verb == 'share':
                            socialdatauuid = jrec['object']['object']['id'].split(':')[2]
                            socialdatauuid = self.unicodeToAscii(socialdatauuid)                           
                            socialposttext = jrec['object']['object']['summary']
                            socialposttext = self.unicodeToAscii(socialposttext)
                            res = socialposttext + '\t' +socialUser + '\t'  + socialUserAccount + '\t' + str(friendsCount) + '\t' + str(followersCount) + '\t' + postedTime + '\t' + displayName + '\t' + displayName.upper() + '\t' + socialUserProfileLink + '\t' +image
                        print(res)
                        sql = 'upsert ' + schema + '.SOCIALUSERINFO(CLIENT, SOCIALMEDIACHANNEL, SOCIALUSER, SOCIALUSERPROFILELINK, SOCIALUSERACCOUNT, NUMBEROFSOCIALUSERCONTACTS, SOCIALUSERINFLUENCESCOREVALUE, CREATIONDATETIME, SOCIALUSERNAME, SOCIALUSERNAME_UC, SOCIALUSERIMAGELINK, CREATEDAT) values(?,?,?,?,?,?,?,?,?,?,?,?) with primary key'
                        cursor_target.execute(sql, (client, socialmediachannel, socialUser, socialUserProfileLink, socialUserAccount, friendsCount, followersCount, creationdatetime, displayName, displayName.upper(), image, time))
                        hdb_target.commit()
                           
                        sql = 'upsert ' + schema + '.SOCIALDATA(CLIENT, SOCIALDATAUUID, SOCIALPOST, SOCIALMEDIACHANNEL, CREATEDBYUSER, CREATIONDATETIME, SOCIALPOSTLINK, CREATIONUSERNAME, SOCIALPOSTSEARCHTERMTEXT, SOCIALPOSTTEXT, CREATEDAT, CREATIONDATETIME_UTC) VALUES(?,?,?,?,?,?,?,?,?,?,?,?) WITH PRIMARY KEY'                   
                        cursor_target.execute(sql, (client, socialdatauuid, socialpost, socialmediachannel, createdbyuser, creationdatetime, socialpostlink, creationusername, socialpostsearchtermtext, socialposttext, time, creationdatetime_utc))
                        hdb_target.commit()
            except ValueError, e:
                with err_lock:
                    sys.stderr.write("Error processing JSON: %s (%s)\n"%(str(e), rec))
def getStream():
    proxy = urllib2.ProxyHandler({'http': 'http://proxy:8080', 'https': 'https://proxy:8080'})
    opener = urllib2.build_opener(proxy)
    urllib2.install_opener(opener)
    req = urllib2.Request(url, headers=HEADERS)
    response = urllib2.urlopen(req, timeout=(1+GNIPKEEPALIVE))
    decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
    remainder = ''
    while True:
        tmp = decompressor.decompress(response.read(CHUNKSIZE))
        if tmp == '':
            return
        [records, remainder] = ''.join([remainder, tmp]).rsplit(NEWLINE,1)
        procEntry(records).start()
if __name__ == "__main__":
    print('Started...')
    try:
        getStream()
    except ssl.SSLError, e:
        with err_lock:
            sys.stderr.write("Connection failed: %s\n"%(str(e)))















When you run the above script, data from the location specified by ‘url’ field will be downloaded and stored into SOCIALDATA and SOCIALUSERINFO tables.

References :

1. Gnip Support

2. Harvesting real time Tweets from GNIP into Social Intelligence tables using a Python Script : http://scn.sap.com/community/crm/marketing/blog/2014/09/29/twitter-data-harvesting-adapter-using-python-script-for-gnip

3. Demo Social and Sentiment data generation using Python script :

http://scn.sap.com/community/crm/marketing/blog/2015/01/12/demo-social-and-sentiment-data-generation-using-python-script

4. Harvesting Tweets into Social Intelligence tables using a Python Script : http://scn.sap.com/docs/DOC-53824

To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply