Technology Blogs by Members
Explore a vibrant mix of technical expertise, industry insights, and tech buzz in member blogs covering SAP products, technology, and events. Get in the mix!
cancel
Showing results for 
Search instead for 
Did you mean: 
maxi1555
Contributor
Hi experts,

I want to share the easiest way to publish & consume events to/from SAP Event Mesh using java applications, so let's publish 1 million of events in a topic and later on consume them from a queue 🙂


The only pre-requisite is to use the libraries explained here

1-Publish events in a topic:


	public static void publish(String client_id, String client_secret, String token_url, String service_url,
String topic_name) throws Exception {

MessagingService messagingService = new MessagingService();
messagingService.setClientId(client_id);
messagingService.setClientSecret(client_secret);
messagingService.setOAuthTokenEndpoint(token_url);
messagingService.setProtocol("amqp10ws");
messagingService.setServiceUrl(service_url);

MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings();

MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator
.createFactory(messagingService);
MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory
.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings);

Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

connection.start();

Destination destination = session.createTopic("topic:" + topic_name);
MessageProducer producer = session.createProducer(destination);

for (double i = 0; i < 1000000; i++) {

String text = "test " + i;
TextMessage message = session.createTextMessage(text);
producer.send(message);

}
}

2-Consume them from a queue:


	public static void consume(String client_id, String client_secret, String token_url, String service_url,String queue_name) throws Exception {

MessagingService messagingService = new MessagingService();
messagingService.setClientId(client_id);
messagingService.setClientSecret(client_secret);
messagingService.setOAuthTokenEndpoint(token_url);
messagingService.setProtocol("amqp10ws");
messagingService.setServiceUrl(service_url);

MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings();

MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator.createFactory(messagingService);
MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings);

Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

connection.start();
Queue queue = session.createQueue("queue:" + queue_name);

MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
String messageText = null;

if (message instanceof TextMessage) {
messageText = ((TextMessage) message).getText();
} else if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
byte[] data = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
messageText = new String(data);
}

System.out.println("Message received:" + messageText);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
do {

} while (true);
}

3-Test it


Execute your java application and you will see that after 7 minutes all the events were consumed:


Queue cleaned:


 

I can see events everywhere in our future with SAP Business Technology Platform, and how about you, will you use them?, share your experience 🙂

 

Max.
Labels in this area