Saturday, July 31, 2021

Liferay Message Bus Implementation

Follow below Article to understand more about Message Bus in Liferay.




Implementation Steps

  • Create Destination Configuration
  • Create Destination
  • Register Destination as OSGi Service
  • Manage the Destination Object
  • Register Listener
  • Send Message to Destination



GitHub Project



Module Implementation


Module demonstrates to create different types of message bus destinations and different ways to register listeners with message bus.

Module have simple UI screens to send messages on Message Bus and view the statistics of each message bus destination.


Software Stack



Liferay Developer Studio-3.9.3-ga4





Ready with Portal Server and Liferay Workspace.

Deploy and Run


Import Liferay Module in your Liferay Workspace. Run build and deploy gradle tasks



Deploy module jar file in your OSGi deployment directory by running gradle deploy task or also copy manually to “osgi\modules” directory






Access Liferay Portal from Browser





Login as Liferay Admin


Create Page in Liferay and Add Widget (LiferayMessageBus) to the page






Access “Send Message” screen.


Send Message with below UI screen. Message will be sent to Message Bus on specified destination.



Once message sent on message bus then listeners will be receiving the messages.

We can see the logs of listeners on console logs which prints the message object. We will implemented listeners based on real time requirements.





Access Destination Statistics UI screen.


Select Destination from dropdown, it will show the statistics of destination.





Working with Liferay Message Bus

Liferay Message Bus is Java Message Service (JMS) implementation which works like publish and subscribe to model. This is light weight component integrated in Liferay Portal.

Multiple senders will send messages on destination and other-end receivers listens the messages.

It is useful for batch process in the back ground or multi-threaded message processing and send message to multiple receivers when event is generated.


Software Stack





  • Send email to thousands of users in the background.
  • It will be integrated with scheduler which run jobs periodically in the background.
  • Notify to multiple receivers when events are generated.
  • If any user request takes longer time to process then use message bus to assign task in the back-ground and update user response once job is completed.
  • Liferay internally uses many places like deployment event notifier. When any module is deployed it will send message to message bus.




Following are important building blocks of Message Bus

  • Destination
  • Senders
  • Listeners


Destination is uniquely identifying name space in message bus to send messages. Its like topic or channel used by sender to send messages. Message bus contains as many as destinations.


Senders are responsible to send messages to message bus on specific destination. Sender may send messages to multiple destinations.


Listeners are receiving the messages which send by senders. Every listener they must have register with at least one destination to receive the messages. Listeners may subscribe to multiple destinations.


Liferay is providing the 3 types of destination and based on requirement it can be used.


  • Parallel Destination
  • Serial Destination
  • Synchronous Destination

Parallel Destination

Parallel Destination, process the messages asynchronous model using multiple workers threads. Each message and listener subscribed to destination, process as separate worker thread asynchronously. Messages are in the queue if it reaches the max threads assigned in the pool. Worker thread per message per listener.


Serial Destination

Serial destination similar to parallel destination but here each message process as separate thread. Worker thread per message. Messages are in the queue if it reaches max threads assigned in the pool.


Synchronous Destination

Synchronous destination will send messages directly to the listeners no messages are in the queue.


Asynchronous model generally has following important parameters. Parallel or Serial destination we can set following parameters.

Maximum Queue Size

This parameter decides the number of messages that can put in the queue.


Workers Core Size

Initial number of workers threads to create thread pool.


Workers Max Size

Maximum number of worker threads in the pool if the worker threads reach the value, messages are in the queue for next available worker thread.


Rejected Execution Handler

Rejection handle is mechanism to handle failed messages when the queue is full.


Message Bus Implementation steps in the applications


  • Create Destination Configuration
  • Create Destination
  • Register Destination as OSGi Service
  • Manage the Destination Object
  • Register Listener
  • Send Message to Destination


Create Destination Configuration

Message Bus API is providing DestinationConfiguration class to create destination configuration. We can create different types of destination as it specified in the above.


Parallel Destination Configuration


DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(






Serial Destination Configuration


DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(




Synchronous Destination Configuration


DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(






Create Destination

DestinationFactory will create destination based on configuration



Destination destination = _destinationFactory.createDestination(





Register Destination as OSGi Service

ServiceRegistration<Destination> is used to register destination as OSGi service.


_destinationServiceRegistration = _bundleContext.registerService(

              Destination.class, destination, destinationProperties);
"Destination is registred with Service Regisration ..");





Manage the Destination Object

We have to manage destination object so that it can deregister when bundle is deactivated.


Dictionary<String, Object> destinationProperties =

              HashMapDictionaryBuilder.<String, Object>put(

                  "", destination.getName()).build();




Destroy Destination



     protected void deactivate() {

         if (_destinationServiceRegistration != null) {

              Destination destination = _bundleContext.getService(










Setting Thread Pool for destination








Rejection Handler to Handle Failed Messages



RejectedExecutionHandler rejectedExecutionHandler =

              new ThreadPoolExecutor.CallerRunsPolicy() {



                  public void rejectedExecution(

                       Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {


     if (_log.isWarnEnabled()) {

         _log.warn("The current thread will handle the request " + "because the rules engine's task queue is at " +"its maximum capacity");



super.rejectedExecution(runnable, threadPoolExecutor);










Register Listener

Listener should implement MessageListener interface. We have to implement receive(..) method and there we implement our business logic. Listeners are registered with destination to received messages from senders.



There are different ways to register listener


Automatic Registration

Create MessageListener component and pass destination name as property so that it will be register with destination automatically when component is created.


@Component (

         immediate = true,

         property = {""},

         service = MessageListener.class


public class AutomaticRegisteredSynchronousMessageListener implements MessageListener {


     public void receive(Message message) {


         try {




         catch (Exception e) {








     private static final Log _log = LogFactoryUtil.getLog(






Message Bus Registration

Listeners can register using Message Bus.



private MessageBus _messageBus;



_messageListenerParallel = new MessageBusRegisteredParallelMessageListener();

        _messageBus.registerMessageListener(LiferayMessageBusPortletKeys.DESTINATION_PARALLEL, _messageListenerParallel);





Destination Registration

Listeners can also register with Destination



private MessageListener _messageListenerParallel;


@Reference(target = "("+LiferayMessageBusPortletKeys.DESTINATION_PARALLEL+")")

    private Destination _destinationParellel;



_messageListenerParallel = new DestinationRegisteredParallelMessageListener();





Send Message to Destination

Several ways to send message to destination and all are asynchronous process. We can also send messages synchronously.


Directly with Message Bus



private MessageBus _messageBus;


Message messageobj = new Message();

messageobj.put("message", message);

_messageBus.sendMessage(destination, messageobj);



Using Message Bus Util




Message messageobj = new Message();

messageobj.put("message", message);

MessageBusUtil.sendMessage(destinationName, message);





Send Messages Synchronously

We can send message Synchronously using Message Bus Util. Message Bus block the message until it received the response or timeout.



Message messageobj = new Message();


try {

   MessageBusUtil.sendSynchronousMessage(destinationName, messageobj);

   //MessageBusUtil.sendSynchronousMessage(destinationName, message, timeout)

} catch (MessageBusException e) {

   // TODO Auto-generated catch block





Find Message Bus Source in Git Hub




Recent Posts

Recent Posts Widget

Popular Posts