Java – fix thread pool thread blocking when enough tasks are submitted

I have a process that needs to calculate many small tasks in parallel, and then process the results according to the natural order of the tasks For this purpose, I have the following settings:

A simple executorservice and a blocking queue, which I will use to keep the future object returned when submitting the callable to the executor:

ExecutorService exec = Executors.newFixedThreadPool(15);
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64);

Some debugging code is used to calculate the number of submitted and processed tasks and write them out regularly (note that the processing will increase at the end of the task code itself):

AtomicLong processed = new AtomicLong(0);
AtomicLong submitted = new AtomicLong(0);

Timer statusTimer = new Timer();
statusTimer.schedule(new TimerTask() {
      @Override
      public void run() {
        l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get())));
      }             
},60 * 1000,60 * 1000);

Get tasks from the queue (actually the generator) and submit them to the thread executing the program, and put the generated future into the futures queue (this is how I ensure that I don't commit too many tasks out of memory):

Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.poll()) != null) {
            futures.put(exec.submit(task));
            submitted.incrementAndGet();
        }
    } catch (Exception e) {l .error("Unexpected Exception",e);}
},"SubmitTasks");
submitThread.start();

Then, the current thread completes the task from the futures queue and processes the results:

while (!futures.isEmpty() || submitThread.isAlive()) {
    MyTask task = futures.take().get();
    //process result
}

When I run it on a server with 8 cores (note that the code currently uses 15 threads), the CPU utilization is only about 60% I see my debug output as follows:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950

The thread dump shows that many thread pools are blocked as follows:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f2fbb0001b0> (a java.util.concurrent.locks.reentrantlock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.reentrantlock.lockInterruptibly(reentrantlock.java:335)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

My explanation of debugging output is that at any given point in time, I have at least hundreds of tasks that have been submitted to the executor service but have not been processed (I can also confirm in the stack trace that the submittasks thread is blocked on linkedblockingqueue. Put) However, the stack trace (and server utilization statistics) shows me that the executor service is in linkedblockingqueue Blocked on take (I assume the internal task queue is empty)

What did I read wrong?

Solution

Threads involving blockingqueues are always tricky Just look at the code and don't run at the scale you use I have some suggestions Many industry experts like Jessica Kerr advise you never to stop What you can do is use the method with timeout in linkedblockingqueue

Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.peek()) != null) {
            boolean success = futures.offer(exec.submit(task),1000,TimeUnit.MILLISECONDS);
            if(success) {
                submitted.incrementAndGet();
                taskQueue.remove(task);
            }
        }
    } catch (Exception e) {l .error("Unexpected Exception","SubmitTasks");
submitThread.start();

And here

while (!futures.isEmpty() || submitThread.isAlive()) {
    Future<MyTask> f = futures.poll(1000,TimeUnit.MILLISECONDS);
    if(f != null) {
        MyTask task = f.get();
    }
    //process result
}

Watch this video on concurrency tools in JVM by Jessica Kerr

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>