Java – newfixedthreadpool and object pool do not work properly
I was trying to figure out how to concentrate resources, and I began to suspect that my thread might be a problem (not 100%, but experimenting with it) All I have to do is create a channel pool for the server and see if threads are using them I have successfully obtained the number of channels created for multiple projects I uploaded (i.e. it does not collect, but only creates new channels in each thread) and successfully created a channel (i.e. it does not collect or create new channels) as needed)
I think maybe the way threads interact with the pool is, so I try to create newcachedthreadpool to work, but when I do, I get the wrong saying that threads in the channel are not just dead There is a destryobject method in my pool, but I never call it, so I don't understand why it will be triggered (if I comment it out, then it can work, but only create one channel, and the upload speed is very slow, about 300 operations / s. compared with no thread pool, I get 30K / s) I doubt its termination. Is there any way to verify this? If it terminates, can I use a substitute?
This is the code (ignoring everything about rabbitmq, it's just so I can monitor the results):
import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.PoolableObjectFactory; import org.apache.commons.pool.impl.GenericObjectPool; import java.io.IOException; import java.util.NoSuchElementException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class PoolExample { private static ExecutorService executor_worker; static { final int numberOfThreads_ThreadPoolExecutor = 20; executor_worker = Executors.newCachedThreadPool(); executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor,numberOfThreads_ThreadPoolExecutor,1000,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>()); } private static ObjectPool<Channel> pool; public static void main(String[] args) throws Exception { System.out.println("starting.."); ObjectPool<Channel> pool = new GenericObjectPool<Channel>( new ConnectionPoolableObjectFactory(),50); for (int x = 0; x<500000000; x++) { executor_worker.submit(new MyRunnable(x,pool)); } //executor_worker.shutdown(); //pool.close(); } } class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> { Channel channel; Connection connection; public ConnectionPoolableObjectFactory() throws IOException { System.out.println("hello world"); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); channel = connection.createChannel(); } @Override public Channel makeObject() throws Exception { //channel = connection.createChannel(); return channel; } @Override public boolean validateObject(Channel channel) { return channel.isopen(); } @Override public void destroyObject(Channel channel) throws Exception { channel.close(); } @Override public void passivateObject(Channel channel) throws Exception { //System.out.println("sent back to queue"); } } class MyRunnable implements Runnable{ protected int x = 0; protected ObjectPool<Channel> pool; public MyRunnable(int x,ObjectPool<Channel> pool) { // TODO Auto-generated constructor stub this.x = x; this.pool = pool; } public void run(){ try { Channel channel = pool.borrowObject(); String message = Integer.toString(x); channel.basicpublish( "","task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); pool.returnObject(channel); } catch (NoSuchElementException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalStateException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
Attachment: I basically asked a few questions, read the document and tried to solve this problem, and I may be completely in the wrong direction, so if you see any questions or tips, please send them in my way
Plot thickening:
In the for loop of the main method (I submit the work to the thread), I add:
Set<Thread> threadSet = Thread.getAllStackTraces().keySet(); System.out.println(threadSet.size()); //number of threads System.out.println(pool.getNumActive());
It shows me 25 threads (although I say 20) and 20 items in the pool But when I look at the rabbitmq UI, I see a connection with only one channel If I create channels and submit them to runnable, it will create many channels (but it will never close them) I don't understand what happened and why the result was not as expected
Solution
I think the problem is that your connectionpoolableobjectfactory only creates a channel object It seems that a new channel should be created every time makeobject is called
So maybe we should do something like this:
public class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> { private final Connection connection; private ConnectionPoolableObjectFactory() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); } @Override public Channel makeObject() throws Exception { return connection.createChannel(); } @Override public boolean validateObject(Channel channel) { return channel.isopen(); } @Override public void destroyObject(Channel channel) throws Exception { channel.close(); } @Override public void passivateObject(Channel channel) throws Exception { //System.out.println("sent back to queue"); } }
This assumes that each factory creates multiple channels from a single connection