Skip to Content
Technical Articles
Author's profile photo Muniyappan Marasamy

Integrate SAP PO and KAFKA using Rest Proxy

Recently I have worked on integrating SAP PO and KAFKA by consuming confluent rest apis. There are two types of APIs available which are producer(publish) and consumer(subscribe) apis. Kafka rest APIs are exposed to using Azure api management in our Landscape.

This blog does not explain sap po configuration in detail. But explains only overview.

Since there is no in-built adapter available in sap po, to connect to kafka, we have used apis to connect.

To know more kafka , check Intro to Apache Kafka: How Kafka Works

 

API

KAFKA team has shared us the api postman collection, after testing and playing with the apis, we could get an idea on how to desingn the api using rest adapter in sap po.

 

Producer

Producer api will be used, when one wants to produce the messages to kafka topic. Basically you will make api call with post method to send the data to kakfa. I find it producing message using API is straight forward and easy.

docker-compose exec rest-proxy curl -X POST \
     -H "Content-Type: application/json" \
     -d "{\"topic_name\":\"test1\",\"partitions_count\":6,\"replication_factor\":3,\"configs\":[]}" \
     "http://localhost:8082/v3/clusters/${KAFKA_CLUSTER_ID}/topics" | jq .

REST Proxy: Example for Apache Kafka® — Confluent Documentation

Consumer

Consumer api is used to read the data from kafka topic. Messages will remain in topic for example 2 days or one week based on the configuration.  Consuming messages get little complicated as we need to call almost 5 APIs, when we need to to read the messages from kakfa topic. I ended up creating more ICOs to orchestrate all api calls. Consumer apis have lifecycle which can be found in the documentation. SAP PO interface has to be scheduled to poll the api, as kafka is not capable of pushing the data to clients.

REST Proxy: Example for Apache Kafka® — Confluent Documentation

Once consumer is created, client can continue to poll the topic using read record api, no need to recreate the consumer again as long as consumer instance is not destroyed. When you restart kafka server, consumer will be destroyed and should be recreated again.

Commit offset concept is used to differentiate and track the number of read messages by each client. It has two types, automatic and manual. When manual method is used, we dont have to make commit offset api call.

Apache Kafka Offset Management – Learning Journal

Kafka Consumer — Confluent Documentation

There are five api calls needed to consume message from kafka topic.

 

  1. Create consumer instance
  2. Subscribe topic
  3. Read records(poll)
  4. commit records
  5. delete consumer

 

I tried to use automatic commit option, but it has resulted in duplicate processing. Hence I had to choose manual commit option. while creating consumer, parameter “auto.commit.enable” has to be set to false, in order to use consumer in manual commit mode. 

Confluent REST Proxy API Reference — Confluent Documentation

{
    "name": "ci1",
    "format": "binary",
    "auto.offset.reset": "earliest",
    "auto.commit.enable": false
}

 

We can outline the steps like this flow diagram.

 

 

 

 

 

ICO design

There can be many ways, you can achieve this requirement to consume messages from consumer. You can handle everything in adapter module or java mapping or using separate ico to orchestrate the api.

 

ICO 1

This will be scheduled to run once in 10 mins and sender adapter reads the dummy file in test mode. Rest receiver adapter will call the read records api and send the response to ICO2 via bridge.

Readrecords api will produce 3 possible outputs in json format.

  1. response will contain data(records). Kafka will return the records in json format and it can have 1 to n xml messages which are base64 encoded. Client has to be do the splitting
  2. Response will contain no data(empty json file). When no data is present in kafka to consumer, it will return empty payload.
  3. Error message incase any exceptions

Using response code, you can handle which kind exceptions you would like to handle and not handle.

ICO 2

ICO2 will receive the data and it has 3 possible inputs. Based on the incoming payload type it will do the mapping and send to 3 different receiver. Using receiver determination condition, input will be routed to respective receiver.

  1. Empty : Create empty file in receiver with overwrite option. you can also ignore if you prefer.
  2. With Records: I have used two mappings. First mapping will do lookup to commit records in kafka and once commit is success, second mapping will split and send it to receiver. each xml file is base64 encoded  and sent in one field from kafka. SAP PO will decode and create an xml file.
  3. Exception : When exceptions occurs, This receiver channel will perform consumer delete operation and then call ICO3 via bridge. You could also add one more receiver interface to send email notification. So that you can find out if error happens in loop.

ICO 3

When ICO 1 runs for the first time, it will return exception with 404 code saying the consumer does not exist. Based on input payload,  ICO2 will determine the receiver which will delete the consumer and call ICO3 to initiate consumer creation. Delete call will not succeed as initially no consumer is present. You can ingore this by handling custom exception handling.

 

More complexities in my implementation.

 

In my case, i had to store the cookie value in value mapping and use it for consecutive calls. In all API call (except Create consumer instance api) cookie has to be used in header.  This is because, rest calls will be handled in kafka rest proxy, using two nodes and consumer instance will always be created with particulate node.

Incoming api call will be routed to node using dispatcher. Without cookie, dispatcher will not know, in which node (Either node 1 or node 2) the instance is created.  Let us say, consumer is created with node1 and when incoming api call is directed to node 2, it will throw error saying that consume does not exist.

But if we make api call with cookie set in header, then It will be able to route to correct node and we dont get error like consumer does not exist.

 

Due to handling cookie and cookie is valid for only 1 day, i had to refresh it once in a day. That is the reason, When ICO 3 runs, it will get the cookie from create consumer api and will store in value mapping. In ICO1, We will check the cookie date, if date is equal to current day, then proceed with read api call else do a loopback to initiate the consumer creation process.

 

We are in the process of moving to cpi, hopefully, we can make use of cpi kafka adapter.

 

Few points

    1. We could have used sender rest adapter with polling option and It comes with inbuilt split and duplicate handling features. To handle, duplicate, we could use offset value as identifier. But read records api returns multiple offsets, hence would be difficult to use duplicate check. And there is no custom error handling in REST polling message protocol. In our case if rest call ends with error, we have to initiate create consumer api call. If we use split, then it will be difficult to make commit call for each record. There is no option to set headers dynamically in sender poll option. We can use adapter, module, but since we are in the process to move interfaces to cpi, We dont want to use modules.
    2. current kafka adapter for cpi, released by sap, can only connect to host which is available in public. Our kafka server is not available in public . looks like cpi adapter supports only automatic commit.
    3. Was wondering, why can’t kafka simply push the data to client. Why should a consumer take care of all creation reading and commiting. This is the fundamental constraint kafka has. If you have thought about this then welcome to pub/sub world. check Publish–subscribe pattern – Wikipedia and (313) Cloud Pub/Sub Overview – ep. 1 – YouTube
    4. for this interface, Kafka was sending around 30k messages per day and it has 10 consumers

References

  1. Keeping the Web API Layer in Kafka With a REST Proxy – DZone Big Data
  2. Cloud Integration – What You Need to Know About the Kafka Adapter | SAP Blogs
  3. examples/clients/cloud at 6.1.1-post · confluentinc/examples (github.com)

Assigned Tags

      Be the first to leave a comment
      You must be Logged on to comment or reply to a post.