Sending Scheduled/delayed messages with RabbitMQ through java client
The requirement to send delayed/scheduled message is to publish any message with a delay time. To achieve this, earlier we had to use dead letter exchange but now we can send scheduled/delayed messages from RabbitMQ with “rabbitmq_delayed_message_exchange” plugin. we can send scheduled/ delayed messages from rabbitMQ by following these steps :-
Step 1– Install rabbitMQ server to 3.5.3 or above on your machine. Follow this guide to install rabbitMQ http://www.rabbitmq.com/download.html
Step 2 – Download “rabbitmq_delayed_message_exchange” plugin from rabbitMQ official website. Follow this link to download “rabbitmq_delayed_message_exchange” http://www.rabbitmq.com/community-plugins.html
Step 3 – Place “rabbitmq_delayed_message_exchange” plugin into plugins directory of rabbitMQ server directory (in linux ‘/usr/lib/rabbitmq/lib/rabbitmq_server-3.5.3/plugins’ path)
Step 4 – Execute “rabbitmq-plugins enable rabbitmq_delayed_message_exchange” command to enable “rabbitmq_delayed_message_exchange” plugin.
Step 5 – Now, after configuring rabbitMQ server we are ready to send scheduled or delayed messages. To send scheduled message we have to declare an exchange with ‘x-delayed-message’ which is indirectly mapped with the any of the exchange types i.e topic,direct,fanout or headers.
Follow this code to publish message to delayed exchange:-
[java]
String readyToPushContent = "Hello this is the delayed message.";
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("exChangeName", "x-delayed-message", true, false, args);
Map<String, Object> headers = new HashMap<String, Object>()
headers.put("x-delay", 10000); //delay in miliseconds i.e 10secs
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("exChangeName", "", props.build(), readyToPushContent.bytes);
[/java]
a. This plugin allows flexible routing behavior via “x-delayed-type” which can be passed at the time of exchange declaration. i.e this exchange will provide routing behavior like “direct” exchange, and exchange with type “x-delayed-message” will act as proxy.
b. Here we declare an exchange with “x-delayed-message” type. Then we attach a header with “x-delay” at the time of publish message, which accepts integer value as time (in milliseconds).
Step 6 – To Subscribe message from delayed exchange follow this code :-
[java]
QueueingConsumer queueingConsumer = new QueueingConsumer(channel) // creating a consumer from channel
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("exChangeName", "x-delayed-message", true, false, args); // declare delayed exchange
String queueName = channel.queueDeclare().getQueue(); // declaring dynamic queue in channel
channel.queueBind(queueName, "exChangeName", ""); // binding queue with delayed exchnage
channel.basicConsume(queueName, true, queueingConsumer) // consuming message from queue in consumer object
try {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery() //
String message = (new String(delivery.getBody()))
} catch (Exception exception) {
exception.printStackTrace()
}
[/java]
Hope, this will help you to send scheduled/delayed messages with rabbitMQ.
Bouquets and brickbats are welcome.
~vishal(d0t)kumar(at)tothenew(d0t)com~
Helpful thanks