TeamSite Eventsubsystem:
- As you all know TeamSite uses Java Message Service (JMS) implementation done by ActiveMQ for the EventSubsystem.
- JMS works on a publisher <-> subscriber model which can be asynchronous.
- JMS uses two concepts namely, "Queues" and "Topics". Queues can be used for one<->one connectivity and Topics is used for one<->many.
- Both can support the Asynchronous model. Here what Asynchronous means the publisher doesnt need to be aware of the consumer (who, where and how). Similarly consumers doesnt need to be aware of the publisher.
- The publisher can send messages (in TeamSite's case events) even when the client is not up and running.
- Both publisher and consumer makes use of a broker to connect to and send/receive messages.
- JMS broker takes care of recieving and temporarily storing the messages and delivering it to the consumers.
- <ApplicationContainer>/standalone/deployments/eventsubsystem.war/WEB-INF/iw_bridge_cfg.xml. Check this file for the below section: We are interested in the "url" and the "topic" attributes.
<iwovJMS classpath="/app/iw-home/TeamSite/eventsubsystem/lib/activemq-all-5.3.2.jar"
initialContextFactory="org.apache.activemq.jndi.ActiveMQInitialContextFactory"
url="tcp://localhost:3035"
factoryName="java:/JmsTopicConnectionFactory"
topic="Interwoven"
waitTime="300000"
expiryTimeInDays = "4">
</iwovJMS>
- url="tcp://localhost:3035"
- topic="Interwoven"
- TeamSite's EventSubsystem publishes all the events through a Topic named "Interwoven" and the communication channel is on TCP port "3035".
Simple Message Listener:
Here is a simple standalone Java Subscriber code snippet:
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQBytesMessage;
/**
* @author mp
*
*/
public class Subscriber {
public static void main(String[] args) {
Connection connection;
Session session;
MessageConsumer consumer;
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://<your-teamsite-server-ip-or-hostname>:3035");
// Create a Connection
connection = connectionFactory.createConnection();
connection.setClientID("test"); //any unique clientID
// /connection.setExceptionListener(this);
// Create a Session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createTopic("Interwoven");
MessageProducer replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a MessageConsumer from the Session to the Topic or Queue
// MessageConsumer consumer = session.createConsumer(destination);
consumer = session.createDurableSubscriber((Topic) destination, "test"); //again a unique name for the durable subscription.
// Wait for a message
connection.start();
Message message = consumer.receiveNoWait();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
}else if(message instanceof ActiveMQBytesMessage) {
System.out.println(message);
System.out.println(((ActiveMQBytesMessage) message).getProperty("name"));
byte[] content = ((ActiveMQBytesMessage) message).getContent().getData();
String messageString = new String(content, "utf-8");
System.out.println("Message Content: "+messageString);
}else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
Important steps in the above snippet:
- Create an ActiveMQ connection factory with the TeamSite URL and the port which we identified on step 8.1 in previous section.
- Create the connection and set an unique client id.
- Create a Topic ("Interwoven" - case sensitive) and set it to the session.
- Create a Durable Subscriber: This is important as we need a durable subscriber for the asynchronous reconnection. Durable subscription makes sure the broker resends all the missed messages to the client incase of client is down for an intermediate period of time.
- TeamSite sends all the messages of typeActiveMQBytesMessage. We are just capturing the message and printing it to the console.
With the above simple code ready there are some disadvantages or complexity involved when we want to continously capture all the messages real time as and when the message is available. Also we need to take care of creating and closing the connection, session etc. This is where we will make use of Spring JMS' DefaulMessageListener Container. consider this as the equivalent of Spring JDBC where spring takes care of all the connectivity and resource pooling etc and allowing us to only concentrate on our business logic.
Asynchronous Spring based solution to listen to TS events realtime (Our Implementation):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd">
<!-- JMS/ActiveMQ connection factory
brokerURL: URL to eventsubsystem broker running on TeamSite server.
clientID: unique identified required to establish a durable subscription with eventsubsystem broker.
-->
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://<my-teamsite-server>:3035"></property>
<property name="clientID" value="MyClientID" />
</bean>
<!-- Custom Message Listener which will be invoked for every message received. -->
<bean id="messageListener" class="com.myapp.events.listeners.Listener" />
<!--Initialize a topic bean with the topic name passed as constructor argument.
TeamSite events are sent only to 'Interwoven' topic.
-->
<bean id="Interwoven" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="Interwoven" />
</bean>
<!--
Spring message listener which does the heavy lifting of handling connections and invoking the message listeners.
Properties:
connectionFactory: Refer the jmsFactory bean created above
destination: Destination to listen for messages/events. Refer topic created above.
messageListener: refer message listener bean above.
concurrentConsumers: Should always be set to 1 as we are listening to a Topic.
sessionTransacted: Boolean. Messages are transacted internally from message arrival till end of execution of message listener.
durableSubScriptionName: unique identifier of the subscription. clientID combined with this value to create a unique durable subscriber.
autoStartUp: Boolean.To start this container upon initialization.
-->
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="Interwoven" />
<property name="messageListener" ref="messageListener" />
<property name="concurrentConsumers" value="1" />
<property name="sessionTransacted" value="true" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="MySubscribtionName" />
<property name="autoStartup" value="true" />
</bean>
</beans>
Hope the comments in the above spring config xml will be self explanatory. In the above spring config xml the only piece of code we need to write is the message listener class (our business logic)
<!-- Custom Message Listener which will be invoked for every message received. -->
<bean id="messageListener" class="com.myapp.events.listeners.Listener" />
Here is the Java snippet which gets called by the Spring DefaultMessageListenerContainer for evey message received:
import java.util.Enumeration;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
/**
* @author mp
*
*/
public class Listener implements MessageListener {
private Log m_log = LogFactory.getLog(Listener.class);
static Logger logger = Logger.getLogger(Listener.class.getName());
/* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message message) {
logger.info("\n\nInside: onMessage");
System.out.println("\n\nInside: onMessage");
logger.info("Message Object As string: "+message+"\n\n");
try{
if (message instanceof TextMessage) {
}else if(message instanceof ActiveMQBytesMessage) {
Enumeration e = message.getPropertyNames();
while(e.hasMoreElements())
{
String propName = (String)e.nextElement();
logger.info(propName+"= "+message.getStringProperty(propName));
System.out.println(propName+"= "+message.getStringProperty(propName));
}
}else {
logger.info("Received: " + message);
}
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
Please be aware the code inside the above snippet needs to be thread safe.
The main advantage of using Spring JMS are below:
1. Connection handling and connection pooling is handled by Spring in a much much better and efficient way.
2. Almost real time message retrieval as Spring takes care of spawning multiple receive operations in a loop (talking about time in milliseconds).
3. You can easily run this as a standalone app or a web app however you wish to.
Further reading and performance tuning on Spring JMS can be found here and here.
HP is capturing these events on the Basic Report Center, Search and Workflow Modeler without using Spring.
Please Note: HP will NOT provide any support for issues or clarification on the above as the solution discussed is not supported as well as not documented in any of the manuals.