Liferay Message Bus is Java Message Service (JMS) implementation which works like publish and subscribe to model. This is light weight component integrated in Liferay Portal.
Multiple senders will send messages on destination and
other-end receivers listens the messages.
It is useful for batch process in the back ground or multi-threaded
message processing and send message to multiple receivers when event is generated.
Software Stack
Liferay-ce-portal-7.4.1-ga2
|
Example
- Send email to thousands of users in the background.
- It will be integrated with scheduler which run jobs periodically in the background.
- Notify to multiple receivers when events are generated.
- If any user request takes longer time to process then use message bus to assign task in the back-ground and update user response once job is completed.
- Liferay internally uses many places like deployment event notifier. When any module is deployed it will send message to message bus.
Following are important building blocks of Message Bus
- Destination
- Senders
- Listeners
Destination
Destination is uniquely identifying name space in
message bus to send messages. Its like topic or channel used by sender to send
messages. Message bus contains as many as destinations.
Senders
Senders are responsible to send messages to message
bus on specific destination. Sender may send messages to multiple destinations.
Listeners
Listeners are receiving the messages which send by
senders. Every listener they must have register with at least one destination
to receive the messages. Listeners may subscribe to multiple destinations.
Liferay is providing the 3 types of destination and
based on requirement it can be used.
- Parallel Destination
- Serial Destination
- Synchronous Destination
Parallel Destination
Parallel Destination, process the messages asynchronous
model using multiple workers threads. Each message and listener subscribed to
destination, process as separate worker thread asynchronously. Messages are in
the queue if it reaches the max threads assigned in the pool. Worker thread per
message per listener.
Serial Destination
Serial destination similar to parallel destination but
here each message process as separate thread. Worker thread per message. Messages
are in the queue if it reaches max threads assigned in the pool.
Synchronous Destination
Synchronous destination will send messages directly to
the listeners no messages are in the queue.
Asynchronous model generally has following important parameters.
Parallel or Serial destination we can set following parameters.
Maximum Queue Size
This parameter decides the number of messages that can
put in the queue.
Workers Core Size
Initial number of workers threads to create thread pool.
Workers Max Size
Maximum number of worker threads in the pool if the
worker threads reach the value, messages are in the queue for next available
worker thread.
Rejected Execution Handler
Rejection handle is mechanism to handle failed messages
when the queue is full.
Message Bus Implementation steps in the applications
- Create Destination Configuration
- Create Destination
- Register Destination as OSGi Service
- Manage the Destination Object
- Register Listener
- Send Message to Destination
Create Destination Configuration
Message Bus API is providing DestinationConfiguration
class to create destination configuration. We can create different types of destination
as it specified in the above.
Parallel Destination Configuration
DestinationConfiguration destinationConfiguration = new DestinationConfiguration( DestinationConfiguration.DESTINATION_TYPE_PARALLEL,LiferayMessageBusPortletKeys.DESTINATION_PARALLEL); |
Serial Destination Configuration
DestinationConfiguration destinationConfiguration = new DestinationConfiguration( DestinationConfiguration.DESTINATION_TYPE_SERIAL,LiferayMessageBusPortletKeys.DESTINATION_SERIAL); |
Synchronous Destination Configuration
DestinationConfiguration destinationConfiguration = new DestinationConfiguration( DestinationConfiguration.DESTINATION_TYPE_SYNCHRONOUS,LiferayMessageBusPortletKeys.DESTINATION_SYNCHRONOUS); |
Create Destination
DestinationFactory
will create destination based on configuration
Destination destination = _destinationFactory.createDestination( destinationConfiguration); |
Register Destination as OSGi Service
ServiceRegistration<Destination>
is used to register destination as OSGi service.
_destinationServiceRegistration = _bundleContext.registerService( Destination.class, destination, destinationProperties); _log.info("Destination is registred with Service Regisration
.."); |
Manage the Destination Object
We have to manage destination object so that it can deregister
when bundle is deactivated.
Dictionary<String, Object> destinationProperties = HashMapDictionaryBuilder.<String,
Object>put( "destination.name", destination.getName()).build(); |
Destroy Destination
@Deactivate protected void deactivate() { if (_destinationServiceRegistration != null) { Destination
destination = _bundleContext.getService( _destinationServiceRegistration.getReference()); _destinationServiceRegistration.unregister(); destination.destroy(); } |
Setting Thread Pool for destination
destinationConfiguration.setMaximumQueueSize(_MAXIMUM_QUEUE_SIZE); destinationConfiguration.setWorkersCoreSize(_CORE_SIZE); destinationConfiguration.setWorkersMaxSize(_MAX_SIZE); |
Rejection Handler to Handle Failed Messages
RejectedExecutionHandler rejectedExecutionHandler = new
ThreadPoolExecutor.CallerRunsPolicy() { @Override public void rejectedExecution( Runnable
runnable,
ThreadPoolExecutor threadPoolExecutor) { if (_log.isWarnEnabled()) { _log.warn("The current thread will handle the request " + "because the
rules engine's task queue is at " +"its maximum capacity"); } super.rejectedExecution(runnable, threadPoolExecutor); } }; destinationConfiguration.setRejectedExecutionHandler(rejectedExecutionHandler); |
Register Listener
Listener should implement MessageListener
interface. We have to implement receive(..) method and there we implement our business
logic. Listeners are registered with destination to received messages from senders.
There are different ways to register listener
Automatic Registration
Create MessageListener component and pass destination name
as property so that it will be register with destination automatically when
component is created.
@Component ( immediate = true, property = {"destination.name=liferaysavvy/synchronous-destination"}, service = MessageListener.class ) public class AutomaticRegisteredSynchronousMessageListener implements MessageListener { @Override public void receive(Message
message) { try { _log.info("Message::"+message); } catch (Exception e) { e.printStackTrace(); } } private static final Log _log = LogFactoryUtil.getLog( AutomaticRegisteredSynchronousMessageListener.class); } |
Message Bus Registration
Listeners can register using Message Bus.
@Reference private MessageBus _messageBus; _messageListenerParallel = new MessageBusRegisteredParallelMessageListener(); _messageBus.registerMessageListener(LiferayMessageBusPortletKeys.DESTINATION_PARALLEL, _messageListenerParallel); |
Destination Registration
Listeners can also register with Destination
private MessageListener _messageListenerParallel; @Reference(target = "(destination.name="+LiferayMessageBusPortletKeys.DESTINATION_PARALLEL+")") private Destination _destinationParellel; _messageListenerParallel = new DestinationRegisteredParallelMessageListener(); _destinationParellel.register(_messageListenerParallel); |
Send Message to Destination
Several ways to send message to destination and all
are asynchronous process. We can also send messages synchronously.
Directly with Message Bus
@Reference private MessageBus _messageBus; Message messageobj = new Message(); messageobj.put("message", message); _messageBus.sendMessage(destination, messageobj); |
Using Message Bus Util
Message messageobj = new Message(); messageobj.put("message", message); MessageBusUtil.sendMessage(destinationName,
message); |
Send Messages Synchronously
We can send message Synchronously using Message Bus Util.
Message Bus block the message until it received the response or timeout.
Message messageobj = new Message(); messageobj.put("message", message); try {
MessageBusUtil.sendSynchronousMessage(destinationName, messageobj); //MessageBusUtil.sendSynchronousMessage(destinationName,
message, timeout) } catch (MessageBusException e) { // TODO Auto-generated catch block e.printStackTrace(); } |
Find Message Bus Source in Git Hub
https://github.com/LiferaySavvy/liferay-messagebus-example
Many IPTV set-top boxes will incorporate a Personal Video Recorder (PVR) that will allow programs and interactive content to be recorded buy iptv. While watching linear TV programming, viewers will be able to pause and rewind live television.
ReplyDeleteThis is a great post I seen because of offer it. It is truly what I needed to see seek in future you will proceed after sharing such a magnificent post.
ReplyDelete안전놀이터
Looking forward to reading more. Great post. Much thanks again. Cool.
ReplyDeletenote online
Mobile phone is the greatest invention of modern science. It helps us with many ways in our daily life. So if you want to maintain your daily life with necessary technical product, Then Handy Zubehör Stuttgart(mobile phone shop) will help you to make the right choice with a cheap price.
ReplyDelete