Java EE – hornetq: how to reuse xaconnection and xasession
I encountered some problems when trying to reuse xaconnection and xasession on multiple workers in JBoss applications I have tried to reduce the problem to one method It should be able to generate and consumer messages using the same connections and sessions At present, my application has many queues and workers. Each worker is currently starting and starting each of his own connections and sessions, rather than sharing it Shouldn't it be possible?
Here is my code example:
import org.apache.log4j.Logger; import javax.annotation.postconstruct; import javax.annotation.PreDestroy; import javax.ejb.Singleton; import javax.ejb.Startup; import javax.jms.*; import javax.jms.Queue; import javax.naming.InitialContext; @Singleton @Startup public class QueueTest { private Logger logger = Logger.getLogger(QueueTest.class); @postconstruct public void startup() { try { String queue = "queue/Queue1"; String message = "test"; //setting up connection InitialContext iniCtx = new InitialContext(); XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA"); XAConnection connection = qcf.createXAConnection(); connection.start(); logger.debug("creating connection at " + new java.util.Date()); //setting up session XASession session = connection.createXASession(); logger.debug("creating session at " + new java.util.Date()); //find the queue Object queueObj = iniCtx.lookup(queue); Queue jmsQueue = (javax.jms.Queue)queueObj; //adding message to queue javax.jms.MessageProducer producer = session.createProducer(jmsQueue); javax.jms.TextMessage textMessage = session.createTextMessage(message); producer.send(textMessage); producer.close(); logger.debug("Message added to queue"); //receiving message from queue javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue); javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000); if (messageReceived==null) throw new Exception("No message reveived"); logger.debug("Got message:"+messageReceived.getText()); consumer.close(); } catch(Exception e) { logger.debug("Error: " + e.getMessage(),e); } } @PreDestroy public void shutdown() { } }
It causes this output:
11:47:17,905 DEBUG [QueueTest] (MSC service thread 1-8) creating connection at Thu Sep 05 11:47:17 CEST 2013 11:47:18,041 DEBUG [QueueTest] (MSC service thread 1-8) creating session at Thu Sep 05 11:47:18 CEST 2013 11:47:18,065 DEBUG [QueueTest] (MSC service thread 1-8) Message added to queue 11:47:23,081 DEBUG [QueueTest] (MSC service thread 1-8) Error: No message reveived
As you can see, the consumer has not received any messages Why?
Edit 1:
package dk.energimidt.uapi.zigbee.services; import org.apache.log4j.Logger; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.jms.Queue; import javax.jms.XAConnection; import javax.jms.XAConnectionFactory; import javax.jms.XASession; import javax.naming.InitialContext; @TransactionAttribute(TransactionAttributeType.required) @Stateless public class QueueTestWorkerBean implements QueueTestWorker { private Logger logger = Logger.getLogger(QueueTestWorkerBean.class); public void run() { try { String queue = "queue/Queue1"; String message = "test"; //setting up connection InitialContext iniCtx = new InitialContext(); XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA"); XAConnection connection = qcf.createXAConnection(); connection.start(); logger.debug("creating connection at " + new java.util.Date()); //setting up session XASession session = connection.createXASession(); logger.debug("creating session at " + new java.util.Date()); //find the queue Object queueObj = iniCtx.lookup(queue); Queue jmsQueue = (javax.jms.Queue)queueObj; //adding message to queue javax.jms.MessageProducer producer = session.createProducer(jmsQueue); javax.jms.TextMessage textMessage = session.createTextMessage(message); producer.send(textMessage); producer.close(); session.commit(); logger.debug("Message added to queue"); //receiving message from queue javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue); javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000); if (messageReceived==null) throw new Exception("No message reveived"); logger.debug("Got message:"+messageReceived.getText()); consumer.close(); connection.close(); } catch(Exception e) { logger.debug("Error: " + e.getMessage(),e); } } }
Now I'm in session Get an exception on commit():
10:46:03,697 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating connection at Tue Sep 17 10:46:03 CEST 2013 10:46:04,343 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating session at Tue Sep 17 10:46:04 CEST 2013 10:46:04,355 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) Error: XA connection: javax.jms.TransactionInProgressException: XA connection at org.hornetq.ra.hornetqRASession.commit(hornetqRASession.java:386) at QueueTestWorkerBean.run(QueueTestWorkerBean.java:45) [library-1.0.0.jar:]
Solution
I saw some (actually 2) mixes there:
I - you are using an XA session, but you do not declare any transaction boundaries... Usually done on the session bean and MDB I'm not sure you can do that with this stateless man
If you do not use any declarative transactions, you must register XID manually
II - jmsxa is the default resource adapter connection factory There is a swimming pool above it Therefore, whenever you create a new session, you have to take it out of the pool When you close it, you return it to the swimming pool
You can use a regular connection factory Just in invmconnectionfactory (or anything you define in the standalone version, except pooledconnectionfactories, suppose you are on JBoss... And then only use regular JMS
Even a regular connection factory can be used with Xa, but in this case, you need to ensure that it is registered directly using the transaction manager API
If you use a regular connection factory, you can stay connected at any time as needed
Please tell me what's going on and I'll help you I know you started the bounty But I will reply for free:)
I can't find any examples of using transactions with singleton in the EJB tutorial
I suggest you use it through a statless or stateful session bean, and then apply @ transactionattribute to the bean
The Java EE 6 tutorial has some good information:
http://docs.oracle.com/javaee/6/tutorial/doc/bncij.html
Please note that mail will not be available until you submit it Therefore, if you send a message in a transaction, you will not receive a message in the same transaction
In your Edit1 example, you are sending a message and using it on the same transaction This will not work because you need to submit the production method before you can use it In this case, you need two transactions, so Edit1 is broken
Also: make sure that connection is closed at the end Because you are using jmsxa (or pooled connection factory), you will automatically complete polling through the application server