Alternative for JMS Receiver

Around 6 weeks back I was working on a project that used JMS. Application needed to receive messages through an ActiveMQ queue and then send those as SMS messages to a SMSC. This message sending happens as a batch and I needed to get batch size of messages from ActiveMQ inorder to send. I used JmsTemplate provided by Spring framework for this and used receive method to get messages from the queue.
Initially it was working fine, but later after more testing I found out that sometimes JmsTemplate's receive method do not return messages eventhough there are thousands of messages in the queue. I Googled, followed instructions given in many forum posts and ActiveMQ site with out much success.
Then one of my collegue Romith started searching source codes of some popular open source projects to see how they have used JMS receives. We were lucky... In Mule's code we found out the they have used JMS listners as a receiver. (May be mule people also have faced the same kind of problem as we were with receivers :) )
When we want to receive a message we initialise listener and get the message and then close that listener. We also developed simple class that can do the same, and it did the trick for us. Following is the simple class where we use JmsListerns to work as JmsReceiver.

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jencks.amqpool.PooledConnectionFactory;

import javax.jms.*;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;

public class JMSQueueReceiverImpl implements MessageListener {

private QueueConnectionFactory queueConnectionFactory;
private Queue queue;
private QueueConnection queueConnection;
private QueueSession queueSession;
private QueueReceiver queueReceiver;
private final Object messageReceiveWaitLock = new Object();
private MessageConsumer messageConsumer;
private Log logger = LogFactory.getLog(JMSQueueReceiverImpl.class);
private List receivedMessages = new ArrayList(20);
private int requiredMessages = 0;
private int RECEIVE_ALL_TIMEOUT = 1000;
private PooledConnectionFactory pooledConnectionFactory;
private static Map> missedMessages = new Hashtable>();
private static long totalJmsMessageConsumed = 0;
private static long totalJmsMessageReceived =0;
private static long connectionCreateCount = 0;
private static long connectionCloseCount = 0;


public JMSQueueReceiverImpl(QueueConnectionFactory queueConnectionFactory, Queue queue) {
this.queueConnectionFactory = queueConnectionFactory;
this.queue = queue;
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCreateCount + "] Creating the connection to the Queue [" + queue.toString() + "]");
}
queueConnection = (QueueConnection)pooledConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
// doStart(queue);
receiveFromCacheIfExtraMessagesFetched(this.queue.getQueueName());
} catch (JMSException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while creating the connection ", e);
}
}
}


public JMSQueueReceiverImpl(PooledConnectionFactory pooledConnectionFactory, Queue queue) {
this.pooledConnectionFactory = pooledConnectionFactory;
// this.queueConnectionFactory = queueConnectionFactory;
this.queue = queue;
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCreateCount + "] Creating the connection to the Queue [" + queue.toString() + "]");
}
queueConnection = (QueueConnection)pooledConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
// doStart(queue);
receiveFromCacheIfExtraMessagesFetched(this.queue.getQueueName());
} catch (JMSException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while creating the connection ", e);
}
}
}

private void doStart(long timeout) throws JMSException {
messageConsumer.setMessageListener(this);
queueConnection.start();
synchronized (messageReceiveWaitLock) {
try {
messageReceiveWaitLock.wait(timeout);
} catch (InterruptedException e) {
logger.info(e);
}
}
messageConsumer.setMessageListener(null);
}

private void doStop() throws JMSException {
messageConsumer.setMessageListener(null);
//queueConnection.stop();
synchronized (messageReceiveWaitLock) {
messageReceiveWaitLock.notify();
}
if (logger.isDebugEnabled()) {
logger.debug("Message receiving finished");
}

}

public Message receive(long timeout) {
try {
requiredMessages = 1;
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("[pre-single] success jms receive calls [" + ++totalJmsMessageConsumed + "]");
}
return receivedMessages.remove(0);
}
doStart(timeout);
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("[single] success jms receive calls [" + ++totalJmsMessageConsumed + "]");
}
return receivedMessages.remove(0);
}
return null;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
}
}

public List receive(long timeout, int messageCount) {
try {
requiredMessages = messageCount;
doStart(timeout);
totalJmsMessageConsumed += receivedMessages.size();
if (logger.isDebugEnabled()) {
logger.debug("[count] success jms receive calls [" + totalJmsMessageConsumed + "]");
}
return receivedMessages;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
}
}

public List receiveAll(long timeout) {
try {
requiredMessages = -1;
doStart(timeout);
return receivedMessages;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
} finally {
try {
doStop();
} catch (JMSException e) {
logger.error("Error while stopping message receving", e);
}
}
}

public void closeConnection() {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCloseCount + "]Closing connection from queue [" + queue + "]");
}

try {
cacheIfExtraMessagesFetched();
try {
if(messageConsumer!= null){
messageConsumer.close();
}
} catch (JMSException e) {
logger.error("Error while stopping message receiver[consumer] ", e);
}
try {
if (queueSession != null) {
queueSession.close();
}
} catch (JMSException e) {
logger.error("Error while stopping message receiver[session] ", e);
}
queueConnection.close();
}catch (JMSException e) {
logger.error("Error while stopping message receiver[connection] ", e);
}
}

private void cacheIfExtraMessagesFetched() throws JMSException {
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Extra fetched message count is [" + receivedMessages.size() + "]");
}
String queueName = queue.getQueueName();
List queueMissed = missedMessages.get(queueName);
if (queueMissed == null) {
queueMissed = new ArrayList();
missedMessages.put(queueName, queueMissed);
}
while (receivedMessages.size() > 0) {
queueMissed.add(receivedMessages.remove(0));
}
}
}

private void receiveFromCacheIfExtraMessagesFetched(String queueName) throws JMSException {
List extraMessages = missedMessages.get(queueName);
if ( (extraMessages == null) || (extraMessages.size() == 0)) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Extra fetched message count [" + extraMessages.size() + "]");
}
while (extraMessages.size() > 0) {
receivedMessages.add(extraMessages.remove(0));
}
}

public synchronized void onMessage(Message message) {
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++totalJmsMessageReceived + "] Jms message received jms-correlation-id[" + message.getJMSCorrelationID() + "]");
}
receivedMessages.add(message);
message.acknowledge();
if (receivedMessages.size() >= requiredMessages) {
if (logger.isDebugEnabled()) {
logger.debug("all messages had been received ");
}
try {
doStop();
} catch (JMSException e) {
logger.error("Error while stopping message receiving", e);
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
//
}
}
} catch (JMSException e) {
logger.error("Error while acknowledging ", e);
}
}

public void setRECEIVE_ALL_TIMEOUT(int RECEIVE_ALL_TIMEOUT) {
this.RECEIVE_ALL_TIMEOUT = RECEIVE_ALL_TIMEOUT;
}


/* public static void main(String[] args) {
BasicConfigurator.configure();

// PropertyConfigurator propertyConfigurator = new PropertyConfigurator();
// Logger logger = Logger.getLogger("sample");
// LoggerRepository loggerRepository = new Hierarchy(logger);
// propertyConfigurator.doConfigure("log4j.properties", loggerRepository);

org.apache.activemq.ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
PooledConnectionFactory pool = new PooledConnectionFactory();
factory.setBrokerURL("failover://(tcp://localhost:61616?connectionTimeout=1000&soTimeout=1000&wireFormat.maxInactivityDuration=1000&wireFormat.tightEncodingEnabled=false)?initialReconnectDelay=100");
pool.setConnectionFactory(factory);






Thread thread1 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("recharge.subscriber.response"))); thread1.start();
Thread thread2 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("transfer.sender.agent.response"))); thread2.start();
Thread thread3 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("recharge.agent.response"))); thread3.start();
Thread thread4 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("sap.reversedeposit.sms.queue"))); thread4.start();
Thread thread5 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("transfer.receiver.agent.response"))); thread5.start();
Thread thread6 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("checkreloads.agent.response"))); thread6.start();
Thread thread7 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("changepin.agent.response"))); thread7.start();
Thread thread8 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("sap.deposit.sms.queue"))); thread8.start();

}

private static class MessageReceiver implements Runnable {
PooledConnectionFactory pool;
ActiveMQQueue activeMQQueue;
WaitLock waitLock;


private MessageReceiver(PooledConnectionFactory pool, ActiveMQQueue activeMQQueue) {
this.pool = pool;
this.activeMQQueue = activeMQQueue;
waitLock = new WaitLock();
}

public void run() {

for (int i =0; i < 100; i++) {
JMSQueueReceiverImpl listener = new JMSQueueReceiverImpl(pool, activeMQQueue);

for (int j =0; j < 20; j++) {
listener.receive(10);
}
listener.closeConnection();
try {
synchronized (waitLock) {
waitLock.wait(200);
}
} catch (InterruptedException e) {
e.printStackTrace(); //todo implement the method
}
}

}

private class WaitLock {
private WaitLock() {
}
}

}*/
}

We have used Jencks ActiveMQ connection pool here. You can download if from codehaus here.


, , ,

 

OpenXava - Easiest way to create DB driven websites in Java

If you are a someone interested in computer programming, at some point of time you may have created a DB driven website. If you have done that you may know how difficult it is to create even a small website. Even with code generation facilities provided by IDEs and ORM tools like Hibernate you have to do quite a lot to get your web app running.
This is where OpenXava (OX) comes in handy. It is really easy to create DB driven web app using this framework. With OX to create Db driven web app only thing you have to do is to define your business entities. Yes, that is the only thing you have to do, and OX will create you nice AJAX driven website with all CRUD operations implemented and also with reporting capabilities with PDF and Excel. It is amazing right....

How much time do you think will take to create above simple web app?, an hour... Actually it only took me 10 minutes to create it with all those features, what I did was implemented simple pojo class Customer and everything else was done by OX.
OX provides you lot of customization capabilities as well. So I think this is a good time saver tool.
Download OX from here and see for your self....
 

Wishing you a Very Happy New Year


Wish you all a very happy new year.
May all your dreams come true in this wonderful year....
 

Tribune to Heros

Tribune to Heros
We will never forget the sacrifices you made to protect our country....

Subscribe


Get Google Toolbar Button

Enter your email address:

Delivered by FeedBurner