Supply Chain Management Blogs by SAP
Expand your SAP SCM knowledge and stay informed about supply chain management technology and solutions with blog posts by SAP. Follow and stay connected.
cancel
Showing results for 
Search instead for 
Did you mean: 
This being my first blog post, I would like to share my experience working on the coldstore - downloader application and also the ways to use the application to achieve the file download.

 

Background


Basically, the timeseries data once received from the sensor device is stored into SAP HANA and the warm store (Cassandra). After the retention period of the respective tenant, the data moves to the coldstore (AWS S3, Azure Blob Store). Hence, the data in the coldstore is considerably old. You can re-use this data for further processing and analysis according to your business needs.

The request to export the coldstore timeseries data can be created using the coldstore-export application. The application generates a request ID and processes the data into a readable format and bundles them into a zip file. Later, you can download the zip file using the coldstore-downloader application against the request ID generated by the coldstore-export application. This blog post is to introduce the coldstore-downloader application and explain how it works.

Please refer to this help portal for further information: Coldstore-Export-Doc.

The tutorial on Bulk Export publicized here can be further helpful for the application hands-on. 

 

Now coming to the file download:


The Cloud Foundry platform closes the HTTP request after 15 minutes and any large file cannot be downloaded to its fullest. Hence, the application is enabled with byte-range based download support.

You can download the file by placing a set of sequential requests for consecutive byte-ranges or a set of parallel requests running on different threads for different byte-ranges. The sample codes for the same are provided below:

 
System Env Required to run the below programs:
DOWNLOAD_API=<APPLICATION_BASE_URL>
ID=<DEFAULT_REQUEST_ID>
AUTH_TOKEN_API=<AUTH_TOKEN_FETCH_API>
CLIENT_ID=<CLIENT_ID>
CLIENT_SECRET=<CLIENT_SECRET>
FILE_DOWNLOAD_PATH=<LOCAL_FILE_DOWNLOAD_ABSOLUTE_PATH>


 

Sample Env:
DOWNLOAD_API=coldstore-downloader-sap.cfapps.eu20.hana.ondemand.com
ID=CC4558843FE44A6F8B3271E593BF5700
AUTH_TOKEN_API=sampleTenant.authentication.eu20.hana.ondemand.com
CLIENT_ID=sb-sample-client-id-1234|iotae_service!a230
CLIENT_SECRET=AuBhB3PJTgfYupWkajZPutMKnXo=
FILE_DOWNLOAD_PATH=C:\Users\Downloads\

 

Sequential File Download


package com.sap.coldstore.downloader;

import org.json.JSONObject;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.Scanner;

public class SequentialDownload {
public static void main(String[] args) throws Exception {
Scanner scan = new Scanner(System.in);
System.out.println("Enter request Id ");
String id = scan.nextLine();
id = (id == null || id.isEmpty())? System.getenv("ID"): id;
String downloadUrl = "https://"
+ System.getenv("DOWNLOAD_API")
+ "/v1/DownloadData('"+ id +"')";
String token = new StringBuilder()
.append("Bearer ")
.append(generateToken())
.toString();
HttpURLConnection connection = null;
Long max = 1L;
String ifMatch = null;
byte buffer[]=new byte[20480];
int read,count = 0 ;
File file = new File(System.getenv("FILE_DOWNLOAD_PATH") + "Sequential_" + id + ".zip");
FileOutputStream fileOutputStream = null;
LocalDateTime now = LocalDateTime.now();
LocalDateTime later = null;
int i = 1;
long before = 0;
int prevDownloaded = 0;
try {
fileOutputStream = new FileOutputStream(file);
System.out.println("Download Request Part "+ i +" started now : " + now);

URL url = new URL(downloadUrl);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Accept", "application/octet-stream");
connection.setRequestProperty("Authorization", token);
connection.setDoOutput(true);
//fileOutputStream.

InputStream cis = connection.getInputStream();
max = Long.parseLong(connection.getHeaderField("Content-Length"));
ifMatch = connection.getHeaderField("Etag");

while ((read = cis.read(buffer)) != -1) {
count += read;
fileOutputStream.write(buffer, 0, read);
long after = System.currentTimeMillis();
if((after - before) > 10000) {
calculatePercentageOfCompletion(before, count, prevDownloaded, max);
prevDownloaded = count;
before =System.currentTimeMillis();
}
}
later = LocalDateTime.now();
if(count < max)
System.out.println("Download of the first part completed : "+later);
} catch (Exception e) {
e.printStackTrace();
}
while(count < max){
try {
i++;
System.out.println("Download Request Part "+ i +" started now : " + LocalDateTime.now());
URL url2 = new URL(downloadUrl);
connection = (HttpURLConnection) url2.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Authorization", token);
connection.setRequestProperty("Range", "bytes=" + count + "-"+ max);
connection.setRequestProperty("If-Match", ifMatch);
connection.setDoOutput(true);

if(connection.getResponseCode() == 401) {
token = new StringBuilder("Bearer ").append(generateToken()).toString();
i--;
continue;
}
InputStream cis2 = connection.getInputStream();
while ((read = cis2.read(buffer)) != -1) {
fileOutputStream.write(buffer, 0, read);
count += read;
long after = System.currentTimeMillis();
if((after - before) > 10000) {
calculatePercentageOfCompletion(before, count, prevDownloaded, max);
prevDownloaded = count;
before =System.currentTimeMillis();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Total Time taken for the file download for request id " + id +
" in minutes: " + ChronoUnit.MINUTES.between(now, later));
fileOutputStream.close();
connection.disconnect();
}

public static void calculatePercentageOfCompletion(long before, long downloaded, long prevDownloaded, long targetLength){
long after = System.currentTimeMillis();
double speed = ((downloaded - prevDownloaded)/1024)/ ((after - before)/ 1000); // speed in byte per second
double percentage = ((double) downloaded / (double) targetLength) * 100;
System.out.println("Downloaded " + String.format("%.3f",percentage) +"% Speed is "+speed+" KB per second");
}

public static String generateToken() throws Exception {
String authURL = "https://" + System.getenv("AUTH_TOKEN_API") + "/oauth/token?response_type=token&grant_type=client_credentials";
String clientID = System.getenv("CLIENT_ID");
String clientSecret = System.getenv("CLIENT_SECRET");
URL url = new URL(authURL);
HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
urlConn.setRequestMethod("POST");
urlConn.setRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString((clientID+":"+clientSecret).getBytes()));
urlConn.setDoOutput(true);
if(urlConn.getResponseCode() != 200)
throw new Exception("Exception occured while fetching authentication token");
BufferedReader in = new BufferedReader(
new InputStreamReader(urlConn.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
JSONObject json = new JSONObject(response.toString());
return json.getString("access_token");
}
}

 

Parallel File Download


package com.sap.coldstore.downloader;

import org.json.JSONObject;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

public class ParallelDownload {
protected static final int BUFFER_SIZE = 4096;
static String id = System.getenv("ID");
String appUrl = "https://"
+ System.getenv("DOWNLOAD_API");

public static void main(String args[]) throws Exception {
Scanner scanner = new Scanner(System.in);
System.out.println("Enter request Id ");
String val = scanner.nextLine();
id = (val == null || val.isEmpty())? id: val;

System.out.println("Enter the number of parallel requests to be triggered");
int numberOfThreads = scanner.nextInt();
String outputFile = System.getenv("FILE_DOWNLOAD_PATH") + "Parallel_"+id+".zip";
File myObj = new File(outputFile);
if(myObj.exists()) myObj.delete();
myObj.createNewFile();
ParallelDownload test = new ParallelDownload();
long startTime = System.currentTimeMillis();
Map<String,String> intialDetails = test.execute(id,outputFile,0,0,true,null,null,0,startTime);
long totalSize = Long.parseLong(intialDetails.get("Content-Length"));
String ifMatch = intialDetails.get("If-Match");
long initalDownload = Integer.parseInt(intialDetails.get("InitalDownload"));
long remaining = totalSize - initalDownload;
long part = (long) (remaining / numberOfThreads);
System.out.println("Each part of "+part+" bytes");
long start = 0 ,end = initalDownload;
for(int i =0; i< numberOfThreads ;i++){
Thread.sleep(1000);
start = end + 1;
end = start + part;
if(i == (numberOfThreads - 1)){
end = totalSize;
}
long finalEnd = end;
int finalI = i;
long finalStart = start;
new Thread(() -> {
try {
test.execute(id,outputFile, finalStart, finalEnd,false,ifMatch, finalI,finalEnd - finalStart, startTime);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
long endTime = System.currentTimeMillis();
}

public Map<String,String> execute(String id, String outputFile, long startByte, long endByte, boolean isInital, String ifMatch, Integer threadId,long targetLength,long startTime) throws Exception {
int downloaded = 0;
String byteRange = "";
BufferedInputStream in = null;
RandomAccessFile raf = null;
Map<String,String> rObj = new HashMap<>();
if(!isInital) {
if(endByte == 0){
byteRange = startByte + "-";
}
else {
byteRange = startByte + "-" + endByte;
}
}
if(threadId != null) {
System.out.println("Thread: "+threadId+" for " + byteRange);
}
else{
threadId = 99999;
System.out.println("Initial Request");
}
String downloadUrl = appUrl+"/v1/DownloadData('"+id+"')";
String jwt = generateToken();
String token = new StringBuilder()
.append("Bearer ")
.append(jwt)
.toString();
HttpURLConnection connection = null;
Long max = 0L;
byte buffer[]=new byte[20480];
int read,count = 0 ;
try {
URL url = new URL(downloadUrl);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Accept", "application/octet-stream");
connection.setRequestProperty("Authorization", token);
if(!isInital) {
connection.setRequestProperty("Range", "bytes=" + byteRange);
if(ifMatch != null) {
connection.setRequestProperty("If-Match", ifMatch);
}
}
connection.setDoOutput(true);

// get the input stream
in = new BufferedInputStream(connection.getInputStream());
if(isInital){
Thread.sleep(4000);
}
// open the output file and seek to the start location
raf = new RandomAccessFile(outputFile, "rw");
raf.seek(startByte);

byte data[] = new byte[BUFFER_SIZE];
int numRead;
long before = 0;
int prevDownloaded = 0;
while(((numRead = in.read(data,0,BUFFER_SIZE)) != -1))
{
// write to buffer
raf.write(data,0,numRead);
// increase the startByte for resume later
startByte += numRead;
downloaded += numRead;
long after = System.currentTimeMillis();
if((after - before) > 10000) {
double speed = ((downloaded - prevDownloaded)/1024)/ ((after - before)/ 1000); // speed in byte per second
prevDownloaded = downloaded;
double percentage = ((double) downloaded / (double) targetLength) * 100;
System.out.println("[Thread " +threadId+"] Downloded "+percentage+"% Speed is "+speed+" KB per second");
before = System.currentTimeMillis();
}
if(isInital && startByte>10000){
max = Long.parseLong(connection.getHeaderField("Content-Length"));
rObj.put("Content-Length",max+"");
ifMatch = connection.getHeaderField("Etag");
rObj.put("If-Match",ifMatch);
rObj.put("InitalDownload",startByte+"");
connection.disconnect();
}
}
System.out.println("[Thread" +threadId+"] Content-Range " + connection.getHeaderField("Content-Range"));
System.out.println("[Thread " +threadId+"]" +" Downloaded for byte range " + byteRange);
} catch (Exception e) {
System.out.println("Response Msg: "+connection.getResponseMessage());
System.out.println("Content "+connection.getContent().toString());
e.printStackTrace();
}
finally {
max = Long.parseLong(connection.getHeaderField("Content-Length"));
if(!rObj.containsKey("Content-Length")) rObj.put("Content-Length",max+"");
ifMatch = ifMatch == null ? connection.getHeaderField("Etag") : ifMatch;
if(!rObj.containsKey("If-Match")) rObj.put("If-Match",ifMatch);
if(!rObj.containsKey("InitalDownload")) rObj.put("InitalDownload",startByte+"");
long endTime = System.currentTimeMillis();
long diff = (endTime - startTime)/1000;
System.out.println("[Thread " +threadId+"]" +" Downloaded for bytes: " +downloaded+ " in "+diff+" seconds");
if (raf != null) {
try {
raf.close();
} catch (IOException e) {}
}

if (in != null) {
try {
in.close();
} catch (IOException e) {}
}
if(endByte == 0){
downloaded++;
}
System.out.println("[Thread " +threadId+"] Target: " + targetLength+"KB / Downloaded: "+downloaded+ " in "+diff+" seconds");
if(!isInital && (targetLength != (downloaded - 1)) && (startByte != endByte)){
long finalStartByte = startByte;
String finalIfMatch = ifMatch;
Integer finalThreadId = threadId;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
try {
new ParallelDownload().execute(id,outputFile, finalStartByte, endByte,false, finalIfMatch, finalThreadId + 1000,endByte-finalStartByte,startTime);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
return rObj;
}

public static String generateToken() throws Exception {
String authURL = "https://" + System.getenv("AUTH_TOKEN_API") + "/oauth/token?response_type=token&grant_type=client_credentials";
String clientID = System.getenv("CLIENT_ID");
String clientSecret = System.getenv("CLIENT_SECRET");
URL url = new URL(authURL);
HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
urlConn.setRequestMethod("POST");
urlConn.setRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString((clientID+":"+clientSecret).getBytes()));
urlConn.setDoOutput(true);
if(urlConn.getResponseCode() != 200)
throw new Exception("Exception occured while fetching authentication token");
BufferedReader in = new BufferedReader(
new InputStreamReader(urlConn.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
JSONObject json = new JSONObject(response.toString());
return json.getString("access_token");
}
}

 

For further insights into this feature, you can check out Coldstore-Export-Doc help portal and try out for better understanding.

Feel free to point out and improvise the download feature.

 

Thank you.