Saturday, July 31, 2021

Working with Liferay Message Bus

Liferay Message Bus is Java Message Service (JMS) implementation which works like publish and subscribe to model. This is light weight component integrated in Liferay Portal.

Multiple senders will send messages on destination and other-end receivers listens the messages.

It is useful for batch process in the back ground or multi-threaded message processing and send message to multiple receivers when event is generated.


Software Stack





  • Send email to thousands of users in the background.
  • It will be integrated with scheduler which run jobs periodically in the background.
  • Notify to multiple receivers when events are generated.
  • If any user request takes longer time to process then use message bus to assign task in the back-ground and update user response once job is completed.
  • Liferay internally uses many places like deployment event notifier. When any module is deployed it will send message to message bus.




Following are important building blocks of Message Bus

  • Destination
  • Senders
  • Listeners


Destination is uniquely identifying name space in message bus to send messages. Its like topic or channel used by sender to send messages. Message bus contains as many as destinations.


Senders are responsible to send messages to message bus on specific destination. Sender may send messages to multiple destinations.


Listeners are receiving the messages which send by senders. Every listener they must have register with at least one destination to receive the messages. Listeners may subscribe to multiple destinations.


Liferay is providing the 3 types of destination and based on requirement it can be used.


  • Parallel Destination
  • Serial Destination
  • Synchronous Destination

Parallel Destination

Parallel Destination, process the messages asynchronous model using multiple workers threads. Each message and listener subscribed to destination, process as separate worker thread asynchronously. Messages are in the queue if it reaches the max threads assigned in the pool. Worker thread per message per listener.


Serial Destination

Serial destination similar to parallel destination but here each message process as separate thread. Worker thread per message. Messages are in the queue if it reaches max threads assigned in the pool.


Synchronous Destination

Synchronous destination will send messages directly to the listeners no messages are in the queue.


Asynchronous model generally has following important parameters. Parallel or Serial destination we can set following parameters.

Maximum Queue Size

This parameter decides the number of messages that can put in the queue.


Workers Core Size

Initial number of workers threads to create thread pool.


Workers Max Size

Maximum number of worker threads in the pool if the worker threads reach the value, messages are in the queue for next available worker thread.


Rejected Execution Handler

Rejection handle is mechanism to handle failed messages when the queue is full.


Message Bus Implementation steps in the applications


  • Create Destination Configuration
  • Create Destination
  • Register Destination as OSGi Service
  • Manage the Destination Object
  • Register Listener
  • Send Message to Destination


Create Destination Configuration

Message Bus API is providing DestinationConfiguration class to create destination configuration. We can create different types of destination as it specified in the above.


Parallel Destination Configuration


DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(






Serial Destination Configuration


DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(




Synchronous Destination Configuration


DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(






Create Destination

DestinationFactory will create destination based on configuration



Destination destination = _destinationFactory.createDestination(





Register Destination as OSGi Service

ServiceRegistration<Destination> is used to register destination as OSGi service.


_destinationServiceRegistration = _bundleContext.registerService(

              Destination.class, destination, destinationProperties);
"Destination is registred with Service Regisration ..");





Manage the Destination Object

We have to manage destination object so that it can deregister when bundle is deactivated.


Dictionary<String, Object> destinationProperties =

              HashMapDictionaryBuilder.<String, Object>put(

                  "", destination.getName()).build();




Destroy Destination



     protected void deactivate() {

         if (_destinationServiceRegistration != null) {

              Destination destination = _bundleContext.getService(










Setting Thread Pool for destination








Rejection Handler to Handle Failed Messages



RejectedExecutionHandler rejectedExecutionHandler =

              new ThreadPoolExecutor.CallerRunsPolicy() {



                  public void rejectedExecution(

                       Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {


     if (_log.isWarnEnabled()) {

         _log.warn("The current thread will handle the request " + "because the rules engine's task queue is at " +"its maximum capacity");



super.rejectedExecution(runnable, threadPoolExecutor);










Register Listener

Listener should implement MessageListener interface. We have to implement receive(..) method and there we implement our business logic. Listeners are registered with destination to received messages from senders.



There are different ways to register listener


Automatic Registration

Create MessageListener component and pass destination name as property so that it will be register with destination automatically when component is created.


@Component (

         immediate = true,

         property = {""},

         service = MessageListener.class


public class AutomaticRegisteredSynchronousMessageListener implements MessageListener {


     public void receive(Message message) {


         try {




         catch (Exception e) {








     private static final Log _log = LogFactoryUtil.getLog(






Message Bus Registration

Listeners can register using Message Bus.



private MessageBus _messageBus;



_messageListenerParallel = new MessageBusRegisteredParallelMessageListener();

        _messageBus.registerMessageListener(LiferayMessageBusPortletKeys.DESTINATION_PARALLEL, _messageListenerParallel);





Destination Registration

Listeners can also register with Destination



private MessageListener _messageListenerParallel;


@Reference(target = "("+LiferayMessageBusPortletKeys.DESTINATION_PARALLEL+")")

    private Destination _destinationParellel;



_messageListenerParallel = new DestinationRegisteredParallelMessageListener();





Send Message to Destination

Several ways to send message to destination and all are asynchronous process. We can also send messages synchronously.


Directly with Message Bus



private MessageBus _messageBus;


Message messageobj = new Message();

messageobj.put("message", message);

_messageBus.sendMessage(destination, messageobj);



Using Message Bus Util




Message messageobj = new Message();

messageobj.put("message", message);

MessageBusUtil.sendMessage(destinationName, message);





Send Messages Synchronously

We can send message Synchronously using Message Bus Util. Message Bus block the message until it received the response or timeout.



Message messageobj = new Message();


try {

   MessageBusUtil.sendSynchronousMessage(destinationName, messageobj);

   //MessageBusUtil.sendSynchronousMessage(destinationName, message, timeout)

} catch (MessageBusException e) {

   // TODO Auto-generated catch block





Find Message Bus Source in Git Hub





  1. Many IPTV set-top boxes will incorporate a Personal Video Recorder (PVR) that will allow programs and interactive content to be recorded buy iptv. While watching linear TV programming, viewers will be able to pause and rewind live television.

  2. This is a great post I seen because of offer it. It is truly what I needed to see seek in future you will proceed after sharing such a magnificent post.

  3. Looking forward to reading more. Great post. Much thanks again. Cool.
    note online

  4. Wow! Such an amazing and helpful post this is. I really really love it. It's so good and so awesome. I am just amazed. I hope that you continue to do your work like this in the future also bus trips

  5. Mobile phone is the greatest invention of modern science. It helps us with many ways in our daily life. So if you want to maintain your daily life with necessary technical product, Then Handy Zubehör Stuttgart(mobile phone shop) will help you to make the right choice with a cheap price.


Recent Posts

Recent Posts Widget

Popular Posts