public static void publish(String client_id, String client_secret, String token_url, String service_url,
String topic_name) throws Exception {
MessagingService messagingService = new MessagingService();
messagingService.setClientId(client_id);
messagingService.setClientSecret(client_secret);
messagingService.setOAuthTokenEndpoint(token_url);
messagingService.setProtocol("amqp10ws");
messagingService.setServiceUrl(service_url);
MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings();
MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator
.createFactory(messagingService);
MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory
.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Destination destination = session.createTopic("topic:" + topic_name);
MessageProducer producer = session.createProducer(destination);
for (double i = 0; i < 1000000; i++) {
String text = "test " + i;
TextMessage message = session.createTextMessage(text);
producer.send(message);
}
}
public static void consume(String client_id, String client_secret, String token_url, String service_url,String queue_name) throws Exception {
MessagingService messagingService = new MessagingService();
messagingService.setClientId(client_id);
messagingService.setClientSecret(client_secret);
messagingService.setOAuthTokenEndpoint(token_url);
messagingService.setProtocol("amqp10ws");
messagingService.setServiceUrl(service_url);
MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings();
MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator.createFactory(messagingService);
MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Queue queue = session.createQueue("queue:" + queue_name);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
String messageText = null;
if (message instanceof TextMessage) {
messageText = ((TextMessage) message).getText();
} else if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
byte[] data = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
messageText = new String(data);
}
System.out.println("Message received:" + messageText);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
do {
} while (true);
}
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
6 | |
5 | |
5 | |
4 | |
4 | |
4 | |
4 | |
3 | |
3 | |
3 |