Skip to Content
Technical Articles
Author's profile photo Maximiliano Colman

The easiest way to publish & consume events to/from SAP Event Mesh using java applications

Hi experts,

I want to share the easiest way to publish & consume events to/from SAP Event Mesh using java applications, so let’s publish 1 million of events in a topic and later on consume them from a queue 🙂

The only pre-requisite is to use the libraries explained here

1-Publish events in a topic:

	public static void publish(String client_id, String client_secret, String token_url, String service_url,
			String topic_name) throws Exception {

		MessagingService messagingService = new MessagingService();
		messagingService.setClientId(client_id);
		messagingService.setClientSecret(client_secret);
		messagingService.setOAuthTokenEndpoint(token_url);
		messagingService.setProtocol("amqp10ws");
		messagingService.setServiceUrl(service_url);

		MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings();

		MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator
				.createFactory(messagingService);
		MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory
				.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings);

		Connection connection = connectionFactory.createConnection();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		connection.start();

		Destination destination = session.createTopic("topic:" + topic_name);
		MessageProducer producer = session.createProducer(destination);

		for (double i = 0; i < 1000000; i++) {

			String text = "test " + i;
			TextMessage message = session.createTextMessage(text);
			producer.send(message);

		}
	}

2-Consume them from a queue:

	public static void consume(String client_id, String client_secret, String token_url, String service_url,String queue_name) throws Exception {
		
		MessagingService messagingService = new MessagingService();
		messagingService.setClientId(client_id);
		messagingService.setClientSecret(client_secret);
		messagingService.setOAuthTokenEndpoint(token_url);
		messagingService.setProtocol("amqp10ws");
		messagingService.setServiceUrl(service_url);

		MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings();

		MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator.createFactory(messagingService);
		MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings);

		Connection connection = connectionFactory.createConnection();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		connection.start();
		Queue queue = session.createQueue("queue:" + queue_name);

		MessageConsumer consumer = session.createConsumer(queue);
		consumer.setMessageListener(new MessageListener() {

			@Override
			public void onMessage(Message message) {
				// TODO Auto-generated method stub
				try {
					String messageText = null;

					if (message instanceof TextMessage) {
						messageText = ((TextMessage) message).getText();
					} else if (message instanceof BytesMessage) {
						BytesMessage bytesMessage = (BytesMessage) message;
						byte[] data = new byte[(int) bytesMessage.getBodyLength()];
						bytesMessage.readBytes(data);
						messageText = new String(data);
					}

					System.out.println("Message received:" + messageText);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		do {

		} while (true);
	}

3-Test it

Execute your java application and you will see that after 7 minutes all the events were consumed:

Queue cleaned:

 

I can see events everywhere in our future with SAP Business Technology Platform, and how about you, will you use them?, share your experience 🙂

 

Max.

Assigned Tags

      Be the first to leave a comment
      You must be Logged on to comment or reply to a post.