Saturday, July 31, 2021

Working with Liferay Message Bus

Liferay Message Bus is Java Message Service (JMS) implementation which works like publish and subscribe to model. This is light weight component integrated in Liferay Portal.


Multiple senders will send messages on destination and other-end receivers listens the messages.


It is useful for batch process in the back ground or multi-threaded message processing and send message to multiple receivers when event is generated.

 

Software Stack


 

Liferay-ce-portal-7.4.1-ga2

 



Example


  • Send email to thousands of users in the background.
  • It will be integrated with scheduler which run jobs periodically in the background.
  • Notify to multiple receivers when events are generated.
  • If any user request takes longer time to process then use message bus to assign task in the back-ground and update user response once job is completed.
  • Liferay internally uses many places like deployment event notifier. When any module is deployed it will send message to message bus.

 

 




 


Following are important building blocks of Message Bus


  • Destination
  • Senders
  • Listeners

Destination


Destination is uniquely identifying name space in message bus to send messages. Its like topic or channel used by sender to send messages. Message bus contains as many as destinations.




Senders


Senders are responsible to send messages to message bus on specific destination. Sender may send messages to multiple destinations.




Listeners


Listeners are receiving the messages which send by senders. Every listener they must have register with at least one destination to receive the messages. Listeners may subscribe to multiple destinations.

 


Liferay is providing the 3 types of destination and based on requirement it can be used.

 


  • Parallel Destination
  • Serial Destination
  • Synchronous Destination


Parallel Destination


Parallel Destination, process the messages asynchronous model using multiple workers threads. Each message and listener subscribed to destination, process as separate worker thread asynchronously. Messages are in the queue if it reaches the max threads assigned in the pool. Worker thread per message per listener.

 



Serial Destination


Serial destination similar to parallel destination but here each message process as separate thread. Worker thread per message. Messages are in the queue if it reaches max threads assigned in the pool.

 



Synchronous Destination


Synchronous destination will send messages directly to the listeners no messages are in the queue.

 


Asynchronous model generally has following important parameters. Parallel or Serial destination we can set following parameters.




Maximum Queue Size


This parameter decides the number of messages that can put in the queue.

 


Workers Core Size


Initial number of workers threads to create thread pool.

 


Workers Max Size


Maximum number of worker threads in the pool if the worker threads reach the value, messages are in the queue for next available worker thread.

 


Rejected Execution Handler


Rejection handle is mechanism to handle failed messages when the queue is full.

 

Message Bus Implementation steps in the applications

 


  • Create Destination Configuration
  • Create Destination
  • Register Destination as OSGi Service
  • Manage the Destination Object
  • Register Listener
  • Send Message to Destination

 



Create Destination Configuration


Message Bus API is providing DestinationConfiguration class to create destination configuration. We can create different types of destination as it specified in the above.

 


Parallel Destination Configuration


 


DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(

                   DestinationConfiguration.DESTINATION_TYPE_PARALLEL,LiferayMessageBusPortletKeys.DESTINATION_PARALLEL);

 


 

 

 

Serial Destination Configuration


 

DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(

                   DestinationConfiguration.DESTINATION_TYPE_SERIAL,LiferayMessageBusPortletKeys.DESTINATION_SERIAL);

 

 


Synchronous Destination Configuration


 

DestinationConfiguration destinationConfiguration =

              new DestinationConfiguration(

                   DestinationConfiguration.DESTINATION_TYPE_SYNCHRONOUS,LiferayMessageBusPortletKeys.DESTINATION_SYNCHRONOUS);

 

 

 

 


Create Destination


DestinationFactory will create destination based on configuration

 


 

Destination destination = _destinationFactory.createDestination(

              destinationConfiguration);

 

 

 


Register Destination as OSGi Service


ServiceRegistration<Destination> is used to register destination as OSGi service.


 

_destinationServiceRegistration = _bundleContext.registerService(

              Destination.class, destination, destinationProperties);

         _log.info("Destination is registred with Service Regisration ..");

 

 

 

 

Manage the Destination Object



We have to manage destination object so that it can deregister when bundle is deactivated.



 

Dictionary<String, Object> destinationProperties =

              HashMapDictionaryBuilder.<String, Object>put(

                  "destination.name", destination.getName()).build();

 

 

 

Destroy Destination


 

@Deactivate

     protected void deactivate() {

         if (_destinationServiceRegistration != null) {

              Destination destination = _bundleContext.getService(

                   _destinationServiceRegistration.getReference());

 

              _destinationServiceRegistration.unregister();

 

              destination.destroy();

         }

 

 

 

Setting Thread Pool for destination



 

destinationConfiguration.setMaximumQueueSize(_MAXIMUM_QUEUE_SIZE);

destinationConfiguration.setWorkersCoreSize(_CORE_SIZE);

destinationConfiguration.setWorkersMaxSize(_MAX_SIZE);

 

 

 


Rejection Handler to Handle Failed Messages


 

 

RejectedExecutionHandler rejectedExecutionHandler =

              new ThreadPoolExecutor.CallerRunsPolicy() {

 

                  @Override

                  public void rejectedExecution(

                       Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

 

     if (_log.isWarnEnabled()) {

         _log.warn("The current thread will handle the request " + "because the rules engine's task queue is at " +"its maximum capacity");

       }

 

super.rejectedExecution(runnable, threadPoolExecutor);

 }

 

};

 

destinationConfiguration.setRejectedExecutionHandler(rejectedExecutionHandler);

 

 

 

 

Register Listener



Listener should implement MessageListener interface. We have to implement receive(..) method and there we implement our business logic. Listeners are registered with destination to received messages from senders.

 

 


There are different ways to register listener

 


Automatic Registration


Create MessageListener component and pass destination name as property so that it will be register with destination automatically when component is created.



 

@Component (

         immediate = true,

         property = {"destination.name=liferaysavvy/synchronous-destination"},

         service = MessageListener.class

     )

public class AutomaticRegisteredSynchronousMessageListener implements MessageListener {

     @Override

     public void receive(Message message) {

 

         try {

              _log.info("Message::"+message);

             

         }

         catch (Exception e) {

              e.printStackTrace();

         }

        

     }

 

    

 

     private static final Log _log = LogFactoryUtil.getLog(

          AutomaticRegisteredSynchronousMessageListener.class);

 

}

 

 



Message Bus Registration


Listeners can register using Message Bus.


 

@Reference

private MessageBus _messageBus;

 

 

_messageListenerParallel = new MessageBusRegisteredParallelMessageListener();

        _messageBus.registerMessageListener(LiferayMessageBusPortletKeys.DESTINATION_PARALLEL, _messageListenerParallel);

 

       

 

 

Destination Registration


Listeners can also register with Destination


 

 

private MessageListener _messageListenerParallel;

 

@Reference(target = "(destination.name="+LiferayMessageBusPortletKeys.DESTINATION_PARALLEL+")")

    private Destination _destinationParellel;

 

 

_messageListenerParallel = new DestinationRegisteredParallelMessageListener();

_destinationParellel.register(_messageListenerParallel);

 

 

 



Send Message to Destination


Several ways to send message to destination and all are asynchronous process. We can also send messages synchronously.

 


Directly with Message Bus



 

@Reference

private MessageBus _messageBus;

 

Message messageobj = new Message();

messageobj.put("message", message);

_messageBus.sendMessage(destination, messageobj);

 

 


Using Message Bus Util

 


 

 

Message messageobj = new Message();

messageobj.put("message", message);

MessageBusUtil.sendMessage(destinationName, message);

 

 

 


 

Send Messages Synchronously



We can send message Synchronously using Message Bus Util. Message Bus block the message until it received the response or timeout.

 


 

Message messageobj = new Message();

messageobj.put("message"message);

try {

   MessageBusUtil.sendSynchronousMessage(destinationName, messageobj);

   //MessageBusUtil.sendSynchronousMessage(destinationName, message, timeout)

} catch (MessageBusException e) {

   // TODO Auto-generated catch block

   e.printStackTrace();

}

 

 


Find Message Bus Source in Git Hub

 


https://github.com/LiferaySavvy/liferay-messagebus-example

 





Author


 

0 comments :

Post a Comment

Recent Posts

Recent Posts Widget

Popular Posts