Skip to Content

Introduction

With the latest release of SAP Cloud Platform Integration, we provide a new API to hook into the Message Processing Log and Adapter Trace feature. With this newly released API, adapter developers are also able to contribute the knowledge about the HTTP endpoints opened by the adapter and display them in the Manage Integration Content view of the Web application.

In this blog, I create an adapter to demonstrate the installation and usage of the new Adapter API.

Prerequisites

You must install the Adapter Development Kit for SAP Cloud Platform Integration. For detailed instructions refer to the documentation.

Basic Adapter

The initial step is to create a new Adapter Project, called BlogADKAdapter in Eclipse. In the Eclipse New Adapter Project wizard enter the details of the new adapter as shown in the next screenshot.

Clicking on Finish creates the adapter. The project contains the following files:

Component Description
BlogADKAdapterComponent.java Sample runtime component.
BlogADKAdapterConsumer.java Sample consumer.
BlogADKAdapterEndpoint.java Sample endpoint.
BlogADKAdapterProducer.java Sample producer.
metadata.xml Component configuration properties and other meta-data.
BlogADKAdapterComponentTest.java Sample JUnit test.
pom.xml Contains configuration details and dependencies.

Compiling the Adapter

To compile the adapter project, use standard Maven command mvn install or the Maven M2Eclipse plugin. The output of the compilation process is shown in the Terminal window or Console view of Eclipse, and the path to the generated adapter is shown at the end of the build command output.

Using the Adapter

We have now created a simple polling mechanism adapter. For this blog post, we modify the adapter as it would act like receiving HTTP messages.

To use the new adapter, you must deploy the adapter to the tenant and create an Integration Flow.

Deploy the Adapter

First, go to the Integration Operations Eclipse perspective and select the tenant from the Node Explorer view. Then open the context menu of your tenant and select Deploy Artifacts… From the appearing wizard select Integration Adapter to deploy your adapter by selecting the above generated BlogADKAdapter.esa file.

You can see the result of the deployment in the Component Status View Eclipse view.

Create an Integration Flow

Create a new Blog integration package and add a new Integration Flow called BlogADKAdpaterIFlow. For more information on creating Integration Flows refer to the documentation.

Within the Integration Flow, you need to connect the Sender and Receiver components and select the new BlogADKAdapter as the adapter type of the connection.

The Integration Flow should look like:

Configure the Sender Adapter with the values shown in the table below:

Parameter Value Description
First URI Part firstURIPartOfSender Specifies the HTTP path of the sender adapter
Greetings Message Hello from Sender Adapter Example of a Sender adapter parameter
Delay 6048000000 Delay for sending further messages. We set a very high value to trigger only an initial message.

Configure the Receiver Adapter with the values shown in the table below:

Parameter Value Description
First URI Part firstURIPartOfReceiver Specifies the path of the receiver adapter
Greetings Message Hello from Receiver Adapter Example of a Receiver adapter parameter

Afterward, deploy the Integration Flow.

Go to the Manage Integration Content view of the Web application, to view the state of your Integration Flow. You should see your deployed Integration Flow:

By clicking on Monitor Message Processing link, you can see the processed message of BlogADKAdapter Integration Flow.

Note: A redeployment or restart of the Integration Flow triggers a new message processing.

Note: To view the detailed Message Processing Log, you must execute the Integration Flow with, at least, Debug log level. For more information about log levels refer to the documentation.

We now have completed the initial setup of the adapter.

Adapter API

So far, we have developed a very basic adapter and an Integration Flow for its execution. If you want to contribute to the Message Processing Log, Tracing or Endpoint Information then you must use the new Adapter API.

Installation

Execute the following steps to install the Adapter APIs in your Adapter project:

  1. Download the Adapter API and Generic API from https://tools.hana.ondemand.com/#cloudintegration
  2. Note the file version of both JAR files and rename the files to generic-api.jar and adapter-api.jar
  3. Move both files to the BlogADKAdapter\src\main\resources directory and refresh your Eclipse project. The Eclipse project should look similar to:
  4. To make the JAR files available in your project, add the following snippet to the end of the dependencies section of the pom.xml file, located in your project root directory:
  5. <!-- replace XXX with the file version -->
    <dependency>
         <groupId>com.sap.it.public</groupId>
         <artifactId>adapter.api</artifactId>
         <version>XXX</version>
         <scope>system</scope>
         <systemPath>${project.basedir}/src/main/resources/adapter-api.jar</systemPath>
     </dependency>
     <dependency>
         <groupId>com.sap.it.public</groupId>
         <artifactId>generic.api</artifactId>
         <version>XXX</version>
         <scope>system</scope>
         <systemPath>${project.basedir}/src/main/resources/generic-api.jar</systemPath>
     </dependency>​
  6. Add com.sap.it.public to the excludeGroupIds section in the configuration settings of the maven-dependency-plugin section.
    <excludeGroupIds>com.sap.cloud.adk,org.apache.camel,org.slf4j,log4j,com.sap.it.public</excludeGroupIds>​

Verify that everything was applied correctly by expanding the Maven Dependencies section of your Eclipse project and reviewing the referenced JAR files.

Note: If you edit the pom.xml file outside of Eclipse, then you must update the Maven project definition in Eclipse, to apply the changes to the Eclipse project settings.

Integrate into Message Processing Log

If your adapter shall add information to the Message Processing Log, then you use the classes of the com.sap.it.api.msglog.adapter package delivered with the Adapter API.

To add to the Message Log of the sender adapter you modify the existing poll() method of the sender adapter that is implemented in the BlogADKAdapterConsumer class. Following listing shows the updated poll() method of the BlogADKAdapterConsumer class:

@Override
protected int poll() throws Exception {
	Exchange exchange = endpoint.createExchange();

	// get adapter message log factory
	AdapterMessageLogFactory msgLogFactory = ITApiFactory.getService(AdapterMessageLogFactory.class, null);

	 // create a message body
	String greetingsMessage = endpoint.getGreetingsMessage();
	Date now = new Date();
	if(greetingsMessage == null || greetingsMessage.isEmpty()){
		LOG.error("The message is empty! Default one will be used");
		greetingsMessage = "Hello There!! ";
	}
	StringBuilder builder = new StringBuilder(greetingsMessage);
	builder.append(" Now it is ");
	builder.append(now.toString());

	// add content to the MPL
	AdapterMessageLogWithStatus msgLog = null;
	try {
		msgLog = msgLogFactory.getMessageLogWithStatus(exchange, "BlogADKAdapterConsumer Inbound Log Text ",
				"BlogADKAdapter-ID", "SomeRandomString" + System.currentTimeMillis() + "-IN");

		msgLog.putAdapterAttribute("GreetingsMessage", greetingsMessage);
	} finally {
		if (msgLog != null) {
			msgLog.close();
		}
	}

	exchange.getIn().setBody(builder.toString());

	try {
		// send message to next processor in the route
		getProcessor().process(exchange);
		return 1; // number of messages polled
	} finally {
		// log exception if an exception occurred and was not handled
		if (exchange.getException() != null) {
			getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
		}
	}
}

The implementation of the AdapterMessageLogFactory is provided by the adapter runtime, which is not available for unit-tests. So the factory has to be either mocked for the unit-test execution or has to be skipped for the maven build via the skipTests Maven option.

After compiling the adapter, you must deploy the adapter.

Then set the log level to Debug and restart your Integration Flow.

The following screenshot shows the new step in the Message Processing Log:

Going to the step details of the highlighted step shows the new header content:

Integrate into Tracing

If your adapter transforms the payload or adds headers before sending or receiving a message, then those changes will not be reflected in the regular trace. However, it might be very useful information in troubleshooting to see the payload and headers exactly how they were received or transmitted. So if this is the case for your adapter, you can provide that information to the tracing mechanism.

The following changes to the BlogADKAdapterConsumer class set the payload and define header entries in the trace information.

package blogadkadapter;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sap.it.api.ITApiFactory;
import com.sap.it.api.msglog.adapter.AdapterMessageLogFactory;
import com.sap.it.api.msglog.adapter.AdapterMessageLogWithStatus;
import com.sap.it.api.msglog.adapter.AdapterTraceMessage;
import com.sap.it.api.msglog.adapter.AdapterTraceMessageType;

/**
 * The Sample.com consumer.
 */
public class BlogADKAdapterConsumer extends ScheduledPollConsumer {
    private Logger LOG = LoggerFactory.getLogger(BlogADKAdapterConsumer.class);
    private final BlogADKAdapterEndpoint endpoint;
    private Map<String, String> headerElements = new HashMap<>();

    public BlogADKAdapterConsumer(final BlogADKAdapterEndpoint endpoint, final Processor processor) {
        super(endpoint, processor);
        this.endpoint = endpoint;

        // initialize a map for storing a sample header for demo purposes only. In typical scenarios
        // e.g. providing own HTTP endpoints with your adapter, this is not required and will be automatically defined
        this.headerElements.put("protocolType", "foo");
        this.headerElements.put("protocolVersion", "1");
        this.headerElements.put("applicationMessageFormat", "bar");
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
    }

    @Override
    protected int poll() throws Exception {
        DefaultExchange exchange = (DefaultExchange) endpoint.createExchange();

        // as we just created our camel exchange we need to assign the header values
        // for this example we will add all non protocol specific headers
        for (String key : headerElements.keySet()) {
            if (!key.startsWith("protocol")) {
                exchange.getIn().setHeader(key, headerElements.get(key));
            }
        }

        // get adapter message log factory
        AdapterMessageLogFactory msgLogFactory = ITApiFactory.getService(AdapterMessageLogFactory.class, null);

        // create a message body
        String greetingsMessage = endpoint.getGreetingsMessage();
        Date now = new Date();
        if (greetingsMessage == null || greetingsMessage.isEmpty()) {
            LOG.error("The message is empty! Default one will be used");
            greetingsMessage = "Hello There!! ";
        }
        StringBuilder builder = new StringBuilder(greetingsMessage);
        builder.append(" Now it is ");
        builder.append(now.toString());

        AdapterMessageLogWithStatus msgLog = null;
        try {
            // add content to the MPL
            msgLog = msgLogFactory.getMessageLogWithStatus(exchange, "BlogADKAdapterConsumer Inbound Log Text ", "BlogADKAdapter-ID",
                    "SomeRandomString" + System.currentTimeMillis() + "-IN");

            msgLog.putAdapterAttribute("GreetingsMessage", greetingsMessage);

            // set the body of the in message
            exchange.getIn().setBody(builder.toString());

            // if trace is active, write the sender inbound content to the trace log
            if (msgLog.isTraceActive()) {
                writeTraceMessage(exchange, headerElements, msgLog, AdapterTraceMessageType.SENDER_INBOUND);
            }

            // send message to next processor in the route
            getProcessor().process(exchange);
            return 1; // number of messages polled
        } finally {
            // if trace is active, write the sender outbound content to the trace log
            if (msgLog.isTraceActive()) {
                Map<String, String> outboundHeaderElements = new HashMap<>();
                // set the outbound headers to the once originally retrieved in the message
                outboundHeaderElements.putAll(convertMap(exchange.getIn().getHeaders()));
                // for demo purposes add an additional header entries to the outbound header list
                outboundHeaderElements.put("isProcessed", "true");

                writeTraceMessage(exchange, outboundHeaderElements, msgLog, AdapterTraceMessageType.SENDER_OUTBOUND);
            }

            if (msgLog != null) {
                msgLog.close();
            }

            // log exception if an exception occurred and was not handled
            if (exchange.getException() != null) {
                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
            }
        }
    }

    /**
     * Convert a Map<String, Object> to a new Map<String, String>
     *
     * @param inputMap
     */
    private Map<String, String> convertMap(Map<String, Object> inputMap) {
        Map<String, String> result = new HashMap<String, String>();
        for (String key : inputMap.keySet()) {
            result.put(key, inputMap.get(key).toString());
        }
        return result;
    }

    /**
     * Write a trace message.
     *
     * @param msgLog
     *            message log
     * @param type
     *            message type
     */
    private void writeTraceMessage(final Exchange exchange, Map<String, String> headers, final AdapterMessageLogWithStatus msgLog,
            final AdapterTraceMessageType type) {

        // creates a trace message to write to the log
        // in this example we write our generated payload to the trace
        // typically you want to do this only if you changed the payload by your adapter
        Object payload = exchange.getIn().getBody();
        byte[] payloadBytes = new byte[0];
        if (payload != null) {
            try {
                payloadBytes = payload.toString().getBytes("UTF-8");
            } catch (UnsupportedEncodingException uee) {
                getExceptionHandler().handleException("Error reading payload", exchange, uee);
            }
        }

        AdapterTraceMessage traceMsg = msgLog.createTraceMessage(type, payloadBytes, false);
        traceMsg.setHeaders(headers);

        msgLog.writeTrace(traceMsg);
    }
}

If you activate Trace log level and restart the Integration Flow, the trace information is written and shown in the step details view of the Message Monitor, as shown below.

Show Endpoint Information

If your adapter opens HTTP endpoints, that can be called from outside, the URL of the endpoints depends on the tenant on which the adapter is deployed and used. Depending on the way the URL’s path is built, it can be hard for the user to get to the correct URL to be called. In order to simplify this, there is the option to display the URLs as part of the Integration Flow details of the Monitor Integration Content view so that the user can copy the URL and use it to configure a client calling that endpoint for instance.

To expose the URLs of your adapter, you need to expose the AdapterEndpointInformationService. In this service, your adapter can return for each Integration Flow the path of the endpoints opened relative to the root of the CPI worker node. The root path will be added automatically when displaying the endpoint information.

To implement this, we require the following implementation tasks:

  • Create a Registry for storing a list of endpoints for an Integration Flow
  • Add and remove endpoints from the Registry as endpoints are started or stopped
  • Implement the AdapterEndpointInformationService service definition to provide a list of endpoints to the backend

Preparation

We need to store the path of the endpoint to make it accessible to the other classes in our adapter implementtion.

package blogadkadapter;

import java.net.URISyntaxException;

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultPollingEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Represents a www.Sample.com Camel endpoint.
 */
@UriEndpoint(scheme = "sap-sample", syntax = "", title = "")
public class BlogADKAdapterEndpoint extends DefaultPollingEndpoint {
    private BlogADKAdapterComponent component;

    private transient Logger logger = LoggerFactory.getLogger(BlogADKAdapterEndpoint.class);

    // used to store our First URI Part adapter parameter, per default it
    // is not part of the adapter properties
    private String path;

    public String getPath() {
        return path;
    }

    public void setPath(String path) {
        this.path = path;
    }

    @UriParam
    private String greetingsMessage;

	public String getGreetingsMessage() {
		return greetingsMessage;
	}

	public void setGreetingsMessage(String greetingsMessage) {
		this.greetingsMessage = greetingsMessage;
	}

	public BlogADKAdapterEndpoint() {
    }

    public BlogADKAdapterEndpoint(final String endpointUri, final BlogADKAdapterComponent component) throws URISyntaxException {
        super(endpointUri, component);
        this.component = component;
        this.path = "/";
    }

    public BlogADKAdapterEndpoint(final String uri, final String remaining, final BlogADKAdapterComponent component) throws URISyntaxException {
        this(uri, component);
        // the First URI Part adapter parameter is set in the remaining method parameter
        this.path = remaining;
    }

    public Producer createProducer() throws Exception {
        return new BlogADKAdapterProducer(this);
    }

    public Consumer createConsumer(Processor processor) throws Exception {
        final BlogADKAdapterConsumer consumer = new BlogADKAdapterConsumer(this, processor);
        configureConsumer(consumer);
        return consumer;
    }

    public boolean isSingleton() {
        return true;
    }
}

Implementation of a Registry

The new endpoint service, that we will create next, must return the endpoints for each Integration Flow. So we have to implement a Registry for storing this information. The following class is used to store the endpoints of an Integration Flow:

package blogadkadapter;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * Registry for storing an Integration Flow together with its endpoints.
 */
public class EndpointRegistry {
    private static final Map<String, List<BlogADKAdapterEndpoint>> registry = new HashMap<>();

    /**
     * Adds an endpoint for an Integration Flow
     *
     * @param iflow
     *            the Integration Flow
     * @param endpoint
     *            the endpoint
     * @see BlogADKAdapterEndpoint
     */
    public static void add(String iflow, BlogADKAdapterEndpoint endpoint) {
        if (registry.containsKey(iflow)) {
            List<BlogADKAdapterEndpoint> endpoints = registry.get(iflow);
            endpoints.add(endpoint);
        } else {
            List<BlogADKAdapterEndpoint> endpoints = new LinkedList<>();
            endpoints.add(endpoint);
            registry.put(iflow, endpoints);
        }
    }

    /**
     * Returns all endpoints for all iflows.
     *
     * @return List of iflows and their endpoints
     * @see BlogADKAdapterEndpoint
     */
    public static Map<String, List<BlogADKAdapterEndpoint>> getEndpoints() {
        return registry;
    }

    /**
     * Returns the endpoints for an Integration Flow
     *
     * @param iflow
     *            the Integration Flow
     * @return List of Endpoints
     * @see BlogADKAdapterEndpoint
     */
    public static List<BlogADKAdapterEndpoint> getEndpoints(String iflow) {
        if (registry.containsKey(iflow)) {
            return registry.get(iflow);
        }
        return new LinkedList<>();
    }

    /**
     * Removes an endpoint from an Integration Flow
     *
     * @param iflow
     *            the integration flow
     * @param endpoint
     *            the endpoint to remove
     * @see BlogADKAdapterEndpoint
     */
    public static void remove(String iflow, BlogADKAdapterEndpoint endpoint) {
        if (registry.containsKey(iflow)) {
            List<BlogADKAdapterEndpoint> endpoints = registry.get(iflow);
            endpoints.remove(endpoint);
            if (endpoints.size() == 0) {
                registry.remove(iflow);
            }
        }
    }
}

Using the EndpointRegistry

We need to add and remove endpoints in the Registry when a new Sender endpoint is started or stopped. To implement this, we must change the BlogADKAdapterConsumer class to modify the existing doStart() method and also overwrite and change the doStop() method:

@Override
protected void doStart() throws Exception {
	super.doStart();
	EndpointRegistry.add(endpoint.getCamelContext().getName(), endpoint);
}

@Override
protected void doStop() throws Exception {
	super.doStop();
	EndpointRegistry.remove(endpoint.getCamelContext().getName(), endpoint);
}

Implementation of AdapterEndpointInformationService

Next, we implement the AdapterEndpointInformationService service for our adapter. The service implementation is done like the BlogADKAdapterEndpointInformationService class shown below.

package blogadkadapter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.camel.ServiceStatus;

import com.sap.it.api.adapter.monitoring.AdapterEndpointInformation;
import com.sap.it.api.adapter.monitoring.AdapterEndpointInformationService;
import com.sap.it.api.adapter.monitoring.AdapterEndpointInstance;
import com.sap.it.api.adapter.monitoring.EndpointCategory;

/**
 * Expose the endpoints used by this adapter.
 */
public class BlogADKAdapterEndpointInformationService implements AdapterEndpointInformationService {

    /**
     * Returns the list of all endpoints of all IntegrationFlows used by the
     * adapter.
     *
     * @return the list of endpoints
     */
    @Override
    public List<AdapterEndpointInformation> getAdapterEndpointInformation() {
        List<AdapterEndpointInformation> endpointInfos = new ArrayList<>();
        Map<String, List<BlogADKAdapterEndpoint>> knownEndpoints = EndpointRegistry.getEndpoints();
        for (String iflow : knownEndpoints.keySet()) {
            List<AdapterEndpointInformation> iflowEndpoints = this.getAdapterEndpointInformationByIFlow(iflow);
            endpointInfos.addAll(iflowEndpoints);
        }
        return endpointInfos;
    }

    /**
     * Returns the list of endpoints for a given IntegrationFlow. If the given
     * Integration Flows does not use the adapter, it has to return an empty
     * instance list.
     *
     * @param integrationFlowId
     *            the id (bundle symbolic name) of the Integration Flow
     * @return the endpoint information for the given Integration Flow
     */
    @Override
    public List<AdapterEndpointInformation> getAdapterEndpointInformationByIFlow(String iflow) {
        List<AdapterEndpointInformation> endpointInfos = new ArrayList<>();
        List<BlogADKAdapterEndpoint> endpoints = EndpointRegistry.getEndpoints(iflow);

        if (endpoints != null && endpoints.size() > 0) {
            for (BlogADKAdapterEndpoint endpoint : endpoints) {
                String camelEndpoint = endpoint.getPath();
                if (endpoint.getStatus() == ServiceStatus.Started) {
                    // in this example the endpoint should be available via: <host>/adk/<first uri part>
                    // note that a channel can have one entry endpoint and multiple definition endpoints
                    // our BlogADKAdapter uses only the entry endpoint and not the definition endpoints
                    AdapterEndpointInstance mainEndpointInstance = new AdapterEndpointInstance(EndpointCategory.ENTRY_POINT,
                            "/adk/" + camelEndpoint, null);
                    AdapterEndpointInformation adapterEndpointInformation = new AdapterEndpointInformation();
                    adapterEndpointInformation.setIntegrationFlowId(iflow);
                    adapterEndpointInformation.setAdapterEndpointInstances(Arrays.asList(mainEndpointInstance));
                    endpointInfos.add(adapterEndpointInformation);
                }
            }
        }
        return endpointInfos;
    }
}

Exposing the BlogADKAdapterEndpointInformationService Service

To expose our new BlogADKAdapterEndpointInformationService OSGi service we need to create the beans.xml file in the BlogADKAdapter\src\main\resources\OSGI-INF\blueprint directory, to make our service available to the runtime.

<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
	<bean id="iAdapterEndpointInformationService" class="blogadkadapter.BlogADKAdapterEndpointInformationService"
		activation="eager"></bean>
	<service id="IAdapterEndpointInformationService" ref="iAdapterEndpointInformationService"
		interface="com.sap.it.api.adapter.monitoring.AdapterEndpointInformationService">
	</service>
</blueprint>

After implementing all changes above, compile and deploy the adapter. Then restart your Integration Flow to see the endpoint information in the Manage Integration Content view of Web application.

Summary

We have seen how to use the Adapter API to hook into the Message Processing Log, Adapter Tracing and endpoint information. I hope you liked the post. If you have any questions or feedback, don’t hesitate to comment on this blog post.

References

Develop adapters

Java API

Creating Integration Flows

Managing Integration Content

Setting Log Levels

Tracing

Message Processing Log – Adapter Tracing

Monitoring Message Processing

Blog: Enhancements to Message Processing Log Viewer in the Web Application

 

To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply