Skip to Content

RPC is known as remote Remote Procedure Call which is a common approach for communications between services. RabbitMQ can also support RPC and here is a general introduction of RabbitMQ RPC in http://www.rabbitmq.com/tutorials/tutorial-six-java.html. In this blog, we will focus on how to use RPC of RabbitMQ by using RabbitTemplate and AsyncRabbtTemplate provided by Spring.

 

	@Override
	public Message sendAndReceive(final Message message) throws AmqpException {
		return sendAndReceive(message, null);
	}

	public Message sendAndReceive(final Message message, CorrelationData correlationData) throws AmqpException {
		return doSendAndReceive(this.exchange, this.routingKey, message, correlationData);
	}

RabbitTemplate provide method above to support send and receive messages. As you can find in the method definition, you can invoke the method by providing necessary values like exchange, routingkey and correlationData. Correlation data is used for client to valid the response from server.

To take a deep look at what spring did of sendAndReceive, we can find out that it support direct reply to by setting replyAddress to null or usingFastReplyTo to true.

if (this.replyAddress == null || this.usingFastReplyTo) {
    return doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
}
else {
    return doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData);
}

As mentioned in http://www.rabbitmq.com/direct-reply-to.html

To use direct reply-to, an RPC client should:

  • Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to declare this “queue” first, although the client can do so if it wants.

Client can get the message from amq.rabbitmq.reply-to directly.

RabbitTemplate retrieve the message by getting message from queue in PendingReply. If you take a look at the definition of PendingReply, you can find that the queue is a BlockingQueue which means it will wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

private static class PendingReply {
    ......
    private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
    ......
}

Thus, RabbitTemplate couldn’t support asynchronized cases of RPC.

 

Spring provide AsyncRabbitTemplate for async cases since Spring AMQP 1.6 to allow the caller of the send and receive operations (sendAndReceive, convertSendAndReceive) not to block.

public RabbitMessageFuture sendAndReceive(String routingKey, Message message) {
    return sendAndReceive(this.template.getExchange(), routingKey, message);
}

public <C> RabbitConverterFuture<C> convertSendAndReceive(Object message) {
    return convertSendAndReceive(this.template.getExchange(), this.template.getRoutingKey(), message, null);
}

It will return a child class of ListenableFuture and you can simply add ListenableFutureCallback to handle the result.

 

public interface ListenableFuture<T> extends Future<T> {
    void addCallback(ListenableFutureCallback<? super T> callback);
    void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
}

 

To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply