This blog post explains how to write a custom logic to incorporate Limit file Size incase of SLT replication in the SAP Data Intelligence Pipeline Modeler (Version: 3.1-2010) by extending a predefined Base Operator.
For a customer scenario, we were supposed to replicate data from Customer ECC using ABAP System Landscape Transformation (SLT) Connector to Azure Data Lake Storage (ADLS Gen 2) filesystem using SAP Data Intelligence Cloud 3.1-2010..
The Customer requirement was to have the multiple csv files divided based on particular timestamp (YYYYMMDD_HHMM) and size along with Header Information. Currently there is no direct provision in the SAP Data Intelligence SLT Operator to generate multiple csv files based on timestamp (YYYYMMDD_HHMM) and along with headers.
There is an existing Limit File Size operator in SAP Data Intelligence under scenario template which is in GoLang and generates counter for each portion of data coming from SLT.
The default SLT Connector operator supports 3 file formats:
However, the csv format doesn’t contain the actual column names coming from source instead it appends with generic column names like C0,C1 etc.
Hence, we created a python custom operator which incorporates both functionalities (split the files based on timestamp and size) and generates header with a better throughput.
ABAP SLT Connector
The SLT Connector operator establishes a connection between SAP Landscape Transformation Replication Server (SLT) and SAP Data Intelligence. You can then use SLT to replicate tables from a source system into SAP Data Intelligence.
The SLT Connector has different versions. In V0 and V1 the output type was *abap.
Recently, V2 version has been released – the output type is now *message.
For the below use case, we have used V2.
Transfer Mode selected for the use case : Replication (which does Initial load as well Replication)
Python Operator – ‘Limit File Size with Header’
We have built a custom python operator using the base python operator offered by SAP Data Intelligence to exploit the functionality of SLT V2 Operator.
Basically, In SLT V2 operator the message output has two sections:
In this operator, we have extracted the body and attributes of the input message separately and it works as below:
- The attributes of the input message basically contains the metadata information with the column names, data type etc which is used to generate header information for each file.
- We have used a ‘counter’ variable to generate a counter based on file size limit provided.
- Set the ‘maxizekb’ (limit size in kbs) in the porgram: upper limit of the file size that needs to be created in target filesystem.
- Counter will keep on increasing for each data block until the graph terminates/stops.
- Incase, the graphs terminated abnormally – the graphs will restart from 0 with different timestamp.
import sys import io from io import StringIO from io import BytesIO import csv import pandas as pd import json import numpy as np mysize = 0 mycounter = 0 mykblimit = 25000 counterp = 0 def on_input(inData): global counterp global mysize global mycounter global mykblimit counterp += 1 data = StringIO(inData.body) attr = inData.attributes ABAPKEY = attr['ABAP'] col=  for columnname in ABAPKEY['Fields']: col.append(columnname['Name']) if(data=='NULL'): return if(mykblimit == 0): mykblimit = 1024 a = str(inData.body) mysize += sys.getsizeof(a) if (counterp == 1 and mycounter == 0 and mysize < mykblimit * 1024): attr['cnt'] = str(mycounter) df = pd.read_csv(data, index_col=False, names=col, dtype = 'str') df_csv = df.to_csv(index=False, header = True) elif (counterp > 1 and mycounter == 0 and mysize < mykblimit * 1024): attr['cnt'] = str(mycounter) df = pd.read_csv(data, index_col=False, names=col, dtype = 'str') df_csv = df.to_csv(index=False, header = False) elif mysize >= mykblimit * 1024: mycounter += 1 mysize = sys.getsizeof(a) attr['cnt'] = str(mycounter) df = pd.read_csv(data, index_col=False, names=col, dtype = 'str') df_csv = df.to_csv(index=False, header = True) else: attr['cnt'] = str(mycounter) df = pd.read_csv(data, index_col=False, names=col, dtype = 'str') df_csv = df.to_csv(index=False, header = False) api.send("output", api.Message(attributes=attr, body=df_csv)) api.set_port_callback("input1", on_input)
Each operation uses a connection according to the configured Connection. And uses a path according to the configured Path mode.
Under Write file path, specify the counter created in python operator in <header: counter> pleaceholder as below:
Pathmode: Static with Placeholder
Join Batches: False
Customer Specific Implementation
** To keep the explaination clear and simple in this post, code snippet has been provided with the counter based on file size only. Some of the customer specific implemenations done are explained below.
- Generated the counter based on timestamp (Date:YYYYMMDD and time: HHMM) and file size.
- ‘cnt’ is in ‘<counter>_<YYYYMMDD>_<HHMM>’ format.
- A seperate folder will be created on each date.
- Counter will get reset every day and starts from 0.
- To simplify and make the code reusable, we created a custom operator extended from base Python operator – where the size limit is a configuration parameter (‘maxsizekb’)
- Refer the link, if you are interested in creating the custom operator: https://help.sap.com/viewer/1c1341f6911f4da5a35b191b40b426c8/Cloud/en-US/049d2f3cc69c4281a3f4570c0d2d066e.html?q=create%20operator
Files will be created as below (maxsizekb = 50000):
We have learned, how do we create the target files based on the file size provided, how do we get headers and what we can further do to enhance the pipelines specific to the user requiremnents.
Voila! Now, you don’t have any troubles with Limiting the size of the target files with headers in SLT replication 😃
If you are interested to understand how the enhancements were carried out or have ideas for the next blog post, please let me know in the comment section below.
For more information on SAP Data Intelligence, please see: