Java – newfixedthreadpool and object pool do not work properly

I was trying to figure out how to concentrate resources, and I began to suspect that my thread might be a problem (not 100%, but experimenting with it) All I have to do is create a channel pool for the server and see if threads are using them I have successfully obtained the number of channels created for multiple projects I uploaded (i.e. it does not collect, but only creates new channels in each thread) and successfully created a channel (i.e. it does not collect or create new channels) as needed)

I think maybe the way threads interact with the pool is, so I try to create newcachedthreadpool to work, but when I do, I get the wrong saying that threads in the channel are not just dead There is a destryobject method in my pool, but I never call it, so I don't understand why it will be triggered (if I comment it out, then it can work, but only create one channel, and the upload speed is very slow, about 300 operations / s. compared with no thread pool, I get 30K / s) I doubt its termination. Is there any way to verify this? If it terminates, can I use a substitute?

This is the code (ignoring everything about rabbitmq, it's just so I can monitor the results):

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {

    private static ExecutorService executor_worker;

    static {
        final int numberOfThreads_ThreadPoolExecutor = 20;
        executor_worker = Executors.newCachedThreadPool();
        executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor,numberOfThreads_ThreadPoolExecutor,1000,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>());
    }

    private static ObjectPool<Channel> pool;

    public static void main(String[] args) throws Exception {
        System.out.println("starting..");           
        ObjectPool<Channel> pool =
                new GenericObjectPool<Channel>(
                new ConnectionPoolableObjectFactory(),50);
        for (int x = 0; x<500000000; x++) {
            executor_worker.submit(new MyRunnable(x,pool));
        }
        //executor_worker.shutdown();
        //pool.close();
    }
}

 class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
     Channel channel;
     Connection connection;

    public ConnectionPoolableObjectFactory() throws IOException {
        System.out.println("hello world");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        channel = connection.createChannel(); 
    }

    @Override
    public Channel makeObject() throws Exception {  
        //channel = connection.createChannel(); 
        return channel; 
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isopen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

class MyRunnable implements Runnable{  
    protected int x = 0;
    protected ObjectPool<Channel> pool;

    public MyRunnable(int x,ObjectPool<Channel> pool) {
        // TODO Auto-generated constructor stub
        this.x = x;
        this.pool = pool;
    }

    public void run(){
        try {
                Channel channel = pool.borrowObject();
                String message = Integer.toString(x);
                channel.basicpublish( "","task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                pool.returnObject(channel);
        } catch (NoSuchElementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

Attachment: I basically asked a few questions, read the document and tried to solve this problem, and I may be completely in the wrong direction, so if you see any questions or tips, please send them in my way

Plot thickening:

In the for loop of the main method (I submit the work to the thread), I add:

Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
    System.out.println(threadSet.size()); //number of threads
    System.out.println(pool.getNumActive());

It shows me 25 threads (although I say 20) and 20 items in the pool But when I look at the rabbitmq UI, I see a connection with only one channel If I create channels and submit them to runnable, it will create many channels (but it will never close them) I don't understand what happened and why the result was not as expected

Solution

I think the problem is that your connectionpoolableobjectfactory only creates a channel object It seems that a new channel should be created every time makeobject is called

So maybe we should do something like this:

public class ConnectionPoolableObjectFactory
        extends BasePoolableObjectFactory<Channel> {

    private final Connection connection;

    private ConnectionPoolableObjectFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
    }

    @Override
    public Channel makeObject() throws Exception {
        return connection.createChannel();
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isopen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

This assumes that each factory creates multiple channels from a single connection

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>