Java EE – hornetq: how to reuse xaconnection and xasession

I encountered some problems when trying to reuse xaconnection and xasession on multiple workers in JBoss applications I have tried to reduce the problem to one method It should be able to generate and consumer messages using the same connections and sessions At present, my application has many queues and workers. Each worker is currently starting and starting each of his own connections and sessions, rather than sharing it Shouldn't it be possible?

Here is my code example:

import org.apache.log4j.Logger;
import javax.annotation.postconstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.jms.*;
import javax.jms.Queue;
import javax.naming.InitialContext;

@Singleton
@Startup
public class QueueTest {

    private Logger logger = Logger.getLogger(QueueTest.class);

    @postconstruct
    public void startup() {
        try {
            String queue = "queue/Queue1";
            String message = "test";

            //setting up connection
            InitialContext iniCtx = new InitialContext();
            XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA");
            XAConnection connection = qcf.createXAConnection();
            connection.start();
            logger.debug("creating connection at " + new java.util.Date());

            //setting up session
            XASession session = connection.createXASession();
            logger.debug("creating session at " + new java.util.Date());

            //find the queue
            Object queueObj = iniCtx.lookup(queue);
            Queue jmsQueue = (javax.jms.Queue)queueObj;

            //adding message to queue
            javax.jms.MessageProducer producer = session.createProducer(jmsQueue);
            javax.jms.TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
            producer.close();
            logger.debug("Message added to queue");

            //receiving message from queue
            javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue);
            javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000);

            if (messageReceived==null)
                throw new Exception("No message reveived");

            logger.debug("Got message:"+messageReceived.getText());
            consumer.close();
        }
        catch(Exception e) {
            logger.debug("Error: " + e.getMessage(),e);
        }
    }

    @PreDestroy
    public void shutdown() {

    }
}

It causes this output:

11:47:17,905 DEBUG [QueueTest] (MSC service thread 1-8) creating connection at Thu Sep 05 11:47:17 CEST 2013
11:47:18,041 DEBUG [QueueTest] (MSC service thread 1-8) creating session at Thu Sep 05 11:47:18 CEST 2013
11:47:18,065 DEBUG [QueueTest] (MSC service thread 1-8) Message added to queue
11:47:23,081 DEBUG [QueueTest] (MSC service thread 1-8) Error: No message reveived

As you can see, the consumer has not received any messages Why?

Edit 1:

package dk.energimidt.uapi.zigbee.services;

import org.apache.log4j.Logger;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Queue;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.naming.InitialContext;

@TransactionAttribute(TransactionAttributeType.required)
@Stateless
public class QueueTestWorkerBean implements QueueTestWorker {

    private Logger logger = Logger.getLogger(QueueTestWorkerBean.class);

    public void run() {
        try {
            String queue = "queue/Queue1";
            String message = "test";

            //setting up connection
            InitialContext iniCtx = new InitialContext();
            XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA");
            XAConnection connection = qcf.createXAConnection();
            connection.start();
            logger.debug("creating connection at " + new java.util.Date());

            //setting up session
            XASession session = connection.createXASession();
            logger.debug("creating session at " + new java.util.Date());

            //find the queue
            Object queueObj = iniCtx.lookup(queue);
            Queue jmsQueue = (javax.jms.Queue)queueObj;

            //adding message to queue
            javax.jms.MessageProducer producer = session.createProducer(jmsQueue);
            javax.jms.TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
            producer.close();
            session.commit();
            logger.debug("Message added to queue");

            //receiving message from queue
            javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue);
            javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000);

            if (messageReceived==null)
                throw new Exception("No message reveived");

            logger.debug("Got message:"+messageReceived.getText());
            consumer.close();

            connection.close();
        }
        catch(Exception e) {
            logger.debug("Error: " + e.getMessage(),e);
        }
    }
}

Now I'm in session Get an exception on commit():

10:46:03,697 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating connection at Tue Sep 17 10:46:03 CEST 2013
10:46:04,343 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating session at Tue Sep 17 10:46:04 CEST 2013
10:46:04,355 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) Error: XA connection: javax.jms.TransactionInProgressException: XA connection
    at org.hornetq.ra.hornetqRASession.commit(hornetqRASession.java:386)
    at QueueTestWorkerBean.run(QueueTestWorkerBean.java:45) [library-1.0.0.jar:]

Solution

I saw some (actually 2) mixes there:

I - you are using an XA session, but you do not declare any transaction boundaries... Usually done on the session bean and MDB I'm not sure you can do that with this stateless man

If you do not use any declarative transactions, you must register XID manually

II - jmsxa is the default resource adapter connection factory There is a swimming pool above it Therefore, whenever you create a new session, you have to take it out of the pool When you close it, you return it to the swimming pool

You can use a regular connection factory Just in invmconnectionfactory (or anything you define in the standalone version, except pooledconnectionfactories, suppose you are on JBoss... And then only use regular JMS

Even a regular connection factory can be used with Xa, but in this case, you need to ensure that it is registered directly using the transaction manager API

If you use a regular connection factory, you can stay connected at any time as needed

Please tell me what's going on and I'll help you I know you started the bounty But I will reply for free:)

I can't find any examples of using transactions with singleton in the EJB tutorial

I suggest you use it through a statless or stateful session bean, and then apply @ transactionattribute to the bean

The Java EE 6 tutorial has some good information:

http://docs.oracle.com/javaee/6/tutorial/doc/bncij.html

Please note that mail will not be available until you submit it Therefore, if you send a message in a transaction, you will not receive a message in the same transaction

In your Edit1 example, you are sending a message and using it on the same transaction This will not work because you need to submit the production method before you can use it In this case, you need two transactions, so Edit1 is broken

Also: make sure that connection is closed at the end Because you are using jmsxa (or pooled connection factory), you will automatically complete polling through the application server

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>