Alternative for JMS Receiver
Around 6 weeks back I was working on a project that used JMS. Application needed to receive messages through an ActiveMQ queue and then send those as SMS messages to a SMSC. This message sending happens as a batch and I needed to get batch size of messages from ActiveMQ inorder to send. I used JmsTemplate provided by Spring framework for this and used receive method to get messages from the queue.
Initially it was working fine, but later after more testing I found out that sometimes JmsTemplate's receive method do not return messages eventhough there are thousands of messages in the queue. I Googled, followed instructions given in many forum posts and ActiveMQ site with out much success.
Then one of my collegue Romith started searching source codes of some popular open source projects to see how they have used JMS receives. We were lucky... In Mule's code we found out the they have used JMS listners as a receiver. (May be mule people also have faced the same kind of problem as we were with receivers :) )
When we want to receive a message we initialise listener and get the message and then close that listener. We also developed simple class that can do the same, and it did the trick for us. Following is the simple class where we use JmsListerns to work as JmsReceiver.
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jencks.amqpool.PooledConnectionFactory;
import javax.jms.*;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
public class JMSQueueReceiverImpl implements MessageListener {
private QueueConnectionFactory queueConnectionFactory;
private Queue queue;
private QueueConnection queueConnection;
private QueueSession queueSession;
private QueueReceiver queueReceiver;
private final Object messageReceiveWaitLock = new Object();
private MessageConsumer messageConsumer;
private Log logger = LogFactory.getLog(JMSQueueReceiverImpl.class);
private List receivedMessages = new ArrayList(20);
private int requiredMessages = 0;
private int RECEIVE_ALL_TIMEOUT = 1000;
private PooledConnectionFactory pooledConnectionFactory;
private static Map> missedMessages = new Hashtable>();
private static long totalJmsMessageConsumed = 0;
private static long totalJmsMessageReceived =0;
private static long connectionCreateCount = 0;
private static long connectionCloseCount = 0;
public JMSQueueReceiverImpl(QueueConnectionFactory queueConnectionFactory, Queue queue) {
this.queueConnectionFactory = queueConnectionFactory;
this.queue = queue;
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCreateCount + "] Creating the connection to the Queue [" + queue.toString() + "]");
}
queueConnection = (QueueConnection)pooledConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
// doStart(queue);
receiveFromCacheIfExtraMessagesFetched(this.queue.getQueueName());
} catch (JMSException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while creating the connection ", e);
}
}
}
public JMSQueueReceiverImpl(PooledConnectionFactory pooledConnectionFactory, Queue queue) {
this.pooledConnectionFactory = pooledConnectionFactory;
// this.queueConnectionFactory = queueConnectionFactory;
this.queue = queue;
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCreateCount + "] Creating the connection to the Queue [" + queue.toString() + "]");
}
queueConnection = (QueueConnection)pooledConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
// doStart(queue);
receiveFromCacheIfExtraMessagesFetched(this.queue.getQueueName());
} catch (JMSException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while creating the connection ", e);
}
}
}
private void doStart(long timeout) throws JMSException {
messageConsumer.setMessageListener(this);
queueConnection.start();
synchronized (messageReceiveWaitLock) {
try {
messageReceiveWaitLock.wait(timeout);
} catch (InterruptedException e) {
logger.info(e);
}
}
messageConsumer.setMessageListener(null);
}
private void doStop() throws JMSException {
messageConsumer.setMessageListener(null);
//queueConnection.stop();
synchronized (messageReceiveWaitLock) {
messageReceiveWaitLock.notify();
}
if (logger.isDebugEnabled()) {
logger.debug("Message receiving finished");
}
}
public Message receive(long timeout) {
try {
requiredMessages = 1;
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("[pre-single] success jms receive calls [" + ++totalJmsMessageConsumed + "]");
}
return receivedMessages.remove(0);
}
doStart(timeout);
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("[single] success jms receive calls [" + ++totalJmsMessageConsumed + "]");
}
return receivedMessages.remove(0);
}
return null;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
}
}
public List receive(long timeout, int messageCount) {
try {
requiredMessages = messageCount;
doStart(timeout);
totalJmsMessageConsumed += receivedMessages.size();
if (logger.isDebugEnabled()) {
logger.debug("[count] success jms receive calls [" + totalJmsMessageConsumed + "]");
}
return receivedMessages;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
}
}
public List receiveAll(long timeout) {
try {
requiredMessages = -1;
doStart(timeout);
return receivedMessages;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
} finally {
try {
doStop();
} catch (JMSException e) {
logger.error("Error while stopping message receving", e);
}
}
}
public void closeConnection() {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCloseCount + "]Closing connection from queue [" + queue + "]");
}
try {
cacheIfExtraMessagesFetched();
try {
if(messageConsumer!= null){
messageConsumer.close();
}
} catch (JMSException e) {
logger.error("Error while stopping message receiver[consumer] ", e);
}
try {
if (queueSession != null) {
queueSession.close();
}
} catch (JMSException e) {
logger.error("Error while stopping message receiver[session] ", e);
}
queueConnection.close();
}catch (JMSException e) {
logger.error("Error while stopping message receiver[connection] ", e);
}
}
private void cacheIfExtraMessagesFetched() throws JMSException {
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Extra fetched message count is [" + receivedMessages.size() + "]");
}
String queueName = queue.getQueueName();
List queueMissed = missedMessages.get(queueName);
if (queueMissed == null) {
queueMissed = new ArrayList();
missedMessages.put(queueName, queueMissed);
}
while (receivedMessages.size() > 0) {
queueMissed.add(receivedMessages.remove(0));
}
}
}
private void receiveFromCacheIfExtraMessagesFetched(String queueName) throws JMSException {
List extraMessages = missedMessages.get(queueName);
if ( (extraMessages == null) || (extraMessages.size() == 0)) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Extra fetched message count [" + extraMessages.size() + "]");
}
while (extraMessages.size() > 0) {
receivedMessages.add(extraMessages.remove(0));
}
}
public synchronized void onMessage(Message message) {
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++totalJmsMessageReceived + "] Jms message received jms-correlation-id[" + message.getJMSCorrelationID() + "]");
}
receivedMessages.add(message);
message.acknowledge();
if (receivedMessages.size() >= requiredMessages) {
if (logger.isDebugEnabled()) {
logger.debug("all messages had been received ");
}
try {
doStop();
} catch (JMSException e) {
logger.error("Error while stopping message receiving", e);
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
//
}
}
} catch (JMSException e) {
logger.error("Error while acknowledging ", e);
}
}
public void setRECEIVE_ALL_TIMEOUT(int RECEIVE_ALL_TIMEOUT) {
this.RECEIVE_ALL_TIMEOUT = RECEIVE_ALL_TIMEOUT;
}
/* public static void main(String[] args) {
BasicConfigurator.configure();
// PropertyConfigurator propertyConfigurator = new PropertyConfigurator();
// Logger logger = Logger.getLogger("sample");
// LoggerRepository loggerRepository = new Hierarchy(logger);
// propertyConfigurator.doConfigure("log4j.properties", loggerRepository);
org.apache.activemq.ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
PooledConnectionFactory pool = new PooledConnectionFactory();
factory.setBrokerURL("failover://(tcp://localhost:61616?connectionTimeout=1000&soTimeout=1000&wireFormat.maxInactivityDuration=1000&wireFormat.tightEncodingEnabled=false)?initialReconnectDelay=100");
pool.setConnectionFactory(factory);
Thread thread1 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("recharge.subscriber.response"))); thread1.start();
Thread thread2 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("transfer.sender.agent.response"))); thread2.start();
Thread thread3 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("recharge.agent.response"))); thread3.start();
Thread thread4 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("sap.reversedeposit.sms.queue"))); thread4.start();
Thread thread5 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("transfer.receiver.agent.response"))); thread5.start();
Thread thread6 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("checkreloads.agent.response"))); thread6.start();
Thread thread7 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("changepin.agent.response"))); thread7.start();
Thread thread8 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("sap.deposit.sms.queue"))); thread8.start();
}
private static class MessageReceiver implements Runnable {
PooledConnectionFactory pool;
ActiveMQQueue activeMQQueue;
WaitLock waitLock;
private MessageReceiver(PooledConnectionFactory pool, ActiveMQQueue activeMQQueue) {
this.pool = pool;
this.activeMQQueue = activeMQQueue;
waitLock = new WaitLock();
}
public void run() {
for (int i =0; i < 100; i++) {
JMSQueueReceiverImpl listener = new JMSQueueReceiverImpl(pool, activeMQQueue);
for (int j =0; j < 20; j++) {
listener.receive(10);
}
listener.closeConnection();
try {
synchronized (waitLock) {
waitLock.wait(200);
}
} catch (InterruptedException e) {
e.printStackTrace(); //todo implement the method
}
}
}
private class WaitLock {
private WaitLock() {
}
}
}*/
}
We have used Jencks ActiveMQ connection pool here. You can download if from codehaus here.
Initially it was working fine, but later after more testing I found out that sometimes JmsTemplate's receive method do not return messages eventhough there are thousands of messages in the queue. I Googled, followed instructions given in many forum posts and ActiveMQ site with out much success.
Then one of my collegue Romith started searching source codes of some popular open source projects to see how they have used JMS receives. We were lucky... In Mule's code we found out the they have used JMS listners as a receiver. (May be mule people also have faced the same kind of problem as we were with receivers :) )
When we want to receive a message we initialise listener and get the message and then close that listener. We also developed simple class that can do the same, and it did the trick for us. Following is the simple class where we use JmsListerns to work as JmsReceiver.
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jencks.amqpool.PooledConnectionFactory;
import javax.jms.*;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
public class JMSQueueReceiverImpl implements MessageListener {
private QueueConnectionFactory queueConnectionFactory;
private Queue queue;
private QueueConnection queueConnection;
private QueueSession queueSession;
private QueueReceiver queueReceiver;
private final Object messageReceiveWaitLock = new Object();
private MessageConsumer messageConsumer;
private Log logger = LogFactory.getLog(JMSQueueReceiverImpl.class);
private List
private int requiredMessages = 0;
private int RECEIVE_ALL_TIMEOUT = 1000;
private PooledConnectionFactory pooledConnectionFactory;
private static Map
private static long totalJmsMessageConsumed = 0;
private static long totalJmsMessageReceived =0;
private static long connectionCreateCount = 0;
private static long connectionCloseCount = 0;
public JMSQueueReceiverImpl(QueueConnectionFactory queueConnectionFactory, Queue queue) {
this.queueConnectionFactory = queueConnectionFactory;
this.queue = queue;
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCreateCount + "] Creating the connection to the Queue [" + queue.toString() + "]");
}
queueConnection = (QueueConnection)pooledConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
// doStart(queue);
receiveFromCacheIfExtraMessagesFetched(this.queue.getQueueName());
} catch (JMSException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while creating the connection ", e);
}
}
}
public JMSQueueReceiverImpl(PooledConnectionFactory pooledConnectionFactory, Queue queue) {
this.pooledConnectionFactory = pooledConnectionFactory;
// this.queueConnectionFactory = queueConnectionFactory;
this.queue = queue;
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCreateCount + "] Creating the connection to the Queue [" + queue.toString() + "]");
}
queueConnection = (QueueConnection)pooledConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
// doStart(queue);
receiveFromCacheIfExtraMessagesFetched(this.queue.getQueueName());
} catch (JMSException e) {
if (logger.isDebugEnabled()) {
logger.debug("Error while creating the connection ", e);
}
}
}
private void doStart(long timeout) throws JMSException {
messageConsumer.setMessageListener(this);
queueConnection.start();
synchronized (messageReceiveWaitLock) {
try {
messageReceiveWaitLock.wait(timeout);
} catch (InterruptedException e) {
logger.info(e);
}
}
messageConsumer.setMessageListener(null);
}
private void doStop() throws JMSException {
messageConsumer.setMessageListener(null);
//queueConnection.stop();
synchronized (messageReceiveWaitLock) {
messageReceiveWaitLock.notify();
}
if (logger.isDebugEnabled()) {
logger.debug("Message receiving finished");
}
}
public Message receive(long timeout) {
try {
requiredMessages = 1;
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("[pre-single] success jms receive calls [" + ++totalJmsMessageConsumed + "]");
}
return receivedMessages.remove(0);
}
doStart(timeout);
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("[single] success jms receive calls [" + ++totalJmsMessageConsumed + "]");
}
return receivedMessages.remove(0);
}
return null;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
}
}
public List
try {
requiredMessages = messageCount;
doStart(timeout);
totalJmsMessageConsumed += receivedMessages.size();
if (logger.isDebugEnabled()) {
logger.debug("[count] success jms receive calls [" + totalJmsMessageConsumed + "]");
}
return receivedMessages;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
}
}
public List
try {
requiredMessages = -1;
doStart(timeout);
return receivedMessages;
} catch (JMSException e) {
logger.error("Error while starting message receiving", e);
return null;
} finally {
try {
doStop();
} catch (JMSException e) {
logger.error("Error while stopping message receving", e);
}
}
}
public void closeConnection() {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++connectionCloseCount + "]Closing connection from queue [" + queue + "]");
}
try {
cacheIfExtraMessagesFetched();
try {
if(messageConsumer!= null){
messageConsumer.close();
}
} catch (JMSException e) {
logger.error("Error while stopping message receiver[consumer] ", e);
}
try {
if (queueSession != null) {
queueSession.close();
}
} catch (JMSException e) {
logger.error("Error while stopping message receiver[session] ", e);
}
queueConnection.close();
}catch (JMSException e) {
logger.error("Error while stopping message receiver[connection] ", e);
}
}
private void cacheIfExtraMessagesFetched() throws JMSException {
if (receivedMessages.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Extra fetched message count is [" + receivedMessages.size() + "]");
}
String queueName = queue.getQueueName();
List
if (queueMissed == null) {
queueMissed = new ArrayList
missedMessages.put(queueName, queueMissed);
}
while (receivedMessages.size() > 0) {
queueMissed.add(receivedMessages.remove(0));
}
}
}
private void receiveFromCacheIfExtraMessagesFetched(String queueName) throws JMSException {
List
if ( (extraMessages == null) || (extraMessages.size() == 0)) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Extra fetched message count [" + extraMessages.size() + "]");
}
while (extraMessages.size() > 0) {
receivedMessages.add(extraMessages.remove(0));
}
}
public synchronized void onMessage(Message message) {
try {
if (logger.isDebugEnabled()) {
logger.debug("[" + ++totalJmsMessageReceived + "] Jms message received jms-correlation-id[" + message.getJMSCorrelationID() + "]");
}
receivedMessages.add(message);
message.acknowledge();
if (receivedMessages.size() >= requiredMessages) {
if (logger.isDebugEnabled()) {
logger.debug("all messages had been received ");
}
try {
doStop();
} catch (JMSException e) {
logger.error("Error while stopping message receiving", e);
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
//
}
}
} catch (JMSException e) {
logger.error("Error while acknowledging ", e);
}
}
public void setRECEIVE_ALL_TIMEOUT(int RECEIVE_ALL_TIMEOUT) {
this.RECEIVE_ALL_TIMEOUT = RECEIVE_ALL_TIMEOUT;
}
/* public static void main(String[] args) {
BasicConfigurator.configure();
// PropertyConfigurator propertyConfigurator = new PropertyConfigurator();
// Logger logger = Logger.getLogger("sample");
// LoggerRepository loggerRepository = new Hierarchy(logger);
// propertyConfigurator.doConfigure("log4j.properties", loggerRepository);
org.apache.activemq.ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
PooledConnectionFactory pool = new PooledConnectionFactory();
factory.setBrokerURL("failover://(tcp://localhost:61616?connectionTimeout=1000&soTimeout=1000&wireFormat.maxInactivityDuration=1000&wireFormat.tightEncodingEnabled=false)?initialReconnectDelay=100");
pool.setConnectionFactory(factory);
Thread thread1 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("recharge.subscriber.response"))); thread1.start();
Thread thread2 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("transfer.sender.agent.response"))); thread2.start();
Thread thread3 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("recharge.agent.response"))); thread3.start();
Thread thread4 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("sap.reversedeposit.sms.queue"))); thread4.start();
Thread thread5 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("transfer.receiver.agent.response"))); thread5.start();
Thread thread6 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("checkreloads.agent.response"))); thread6.start();
Thread thread7 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("changepin.agent.response"))); thread7.start();
Thread thread8 = new Thread(new MessageReceiver(pool, new ActiveMQQueue("sap.deposit.sms.queue"))); thread8.start();
}
private static class MessageReceiver implements Runnable {
PooledConnectionFactory pool;
ActiveMQQueue activeMQQueue;
WaitLock waitLock;
private MessageReceiver(PooledConnectionFactory pool, ActiveMQQueue activeMQQueue) {
this.pool = pool;
this.activeMQQueue = activeMQQueue;
waitLock = new WaitLock();
}
public void run() {
for (int i =0; i < 100; i++) {
JMSQueueReceiverImpl listener = new JMSQueueReceiverImpl(pool, activeMQQueue);
for (int j =0; j < 20; j++) {
listener.receive(10);
}
listener.closeConnection();
try {
synchronized (waitLock) {
waitLock.wait(200);
}
} catch (InterruptedException e) {
e.printStackTrace(); //todo implement the method
}
}
}
private class WaitLock {
private WaitLock() {
}
}
}*/
}
We have used Jencks ActiveMQ connection pool here. You can download if from codehaus here.