Java – call executorservice. Java from completable future shutdownNow

When one of the already running tasks throws an exception, I need to cancel all the scheduled but not yet running completable future tasks

Try the following example, but in most cases the main method will not exit (possibly due to some type of deadlock)

public static void main(String[] args) {
    ExecutorService executionService = Executors.newFixedThreadPool(5);

    Set< CompletableFuture<?> > tasks = new HashSet<>();

    for (int i = 0; i < 1000; i++) {
        final int id = i;
        CompletableFuture<?> c = CompletableFuture

        .runAsync( () -> {
            System.out.println("Running: " + id); 
            if ( id == 400 ) throw new RuntimeException("Exception from: " + id);
        },executionService )

        .whenComplete( (v,ex) -> { 
            if ( ex != null ) {
                System.out.println("Shutting down.");
                executionService.shutdownNow();
                System.out.println("shutdown.");
            }
        } );

        tasks.add(c);
    }

    try{ 
        CompletableFuture.allOf( tasks.stream().toArray(CompletableFuture[]::new) ).join(); 
    }catch(Exception e) { 
        System.out.println("Got async exception: " + e); 
    }finally { 
        System.out.println("DONE"); 
    }        
}

The last printout was like this:

Running: 402
Running: 400
Running: 408
Running: 407
Running: 406
Running: 405
Running: 411
Shutting down.
Running: 410
Running: 409
Running: 413
Running: 412
shutdown.

Try to run the shutdown now method on a separate thread, but it still gives the same deadlock in most cases

Do you know what could lead to this impasse?

What do you think is the best way to cancel all scheduled but not yet running completable futures when an exception is thrown?

Considering iterative tasks and calling cancel. On each completablefuture But I don't like this. Cancelationexception is thrown from the connection

Solution

You should remember that

CompletableFuture<?> f = CompletableFuture.runAsync(runnable,executionService);

Basically equivalent to

CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
    if(!f.isDone()) {
        try {
            runnable.run();
            f.complete(null);
        }
        catch(Throwable t) {
            f.completeExceptionally(t);
        }
    }
});

Therefore, executorservice knows nothing about completable future, so it usually cannot cancel it All it has is some work, expressed as an implementation of runnable

In other words, shutdown now() will prevent the execution of pending jobs, and the remaining futures will not complete normally, but they will not be cancelled Then, you call join () on the future returned by allof, which will never return due to unfinished futures

But please note that the scheduled work does check whether the future has been completed before doing anything expensive

So if you change the code to

ExecutorService executionService = Executors.newFixedThreadPool(5);
Set<CompletableFuture<?>> tasks = ConcurrentHashMap.newKeySet();
AtomicBoolean canceled = new AtomicBoolean();

for(int i = 0; i < 1000; i++) {
    final int id = i;
    CompletableFuture<?> c = CompletableFuture
        .runAsync(() -> {
            System.out.println("Running: " + id); 
            if(id == 400) throw new RuntimeException("Exception from: " + id);
        },executionService);
        c.whenComplete((v,ex) -> {
            if(ex != null && canceled.compareAndSet(false,true)) {
                System.out.println("Canceling.");
                for(CompletableFuture<?> f: tasks) f.cancel(false);
                System.out.println("Canceled.");
            }
        });
    tasks.add(c);
    if(canceled.get()) {
        c.cancel(false);
        break;
    }
}

try {
    CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
} catch(Exception e) {
    System.out.println("Got async exception: " + e);
} finally {
    System.out.println("DONE");
}
executionService.shutdown();

Once the relevant future is cancelled, runnable will not be executed Since there is competition between cancellation and normal execution, it may be helpful to change the operation to

.runAsync(() -> {
    System.out.println("Running: " + id); 
    if(id == 400) throw new RuntimeException("Exception from: " + id);
    LockSupport.parkNanos(1000);
},executionService);

Simulate some actual workload You will then see that fewer operations are performed after an exception is encountered

Because asynchronous exceptions can occur even when the commit loop is still running, it uses atomicboolean to detect this situation and stop the loop in this case

Note that for completable future, there is no difference between cancellation and any other special completion Calling F. cancel (...) is equivalent to f. completeexceptional (New cancelationexception()) Therefore, due to completable future Allof reports any exception in an exception situation, so it is likely to be a cancelationexception rather than a trigger exception

If you replace two cancel (false) calls with complete (null), you will get a similar effect. Runnables will not execute on the completed futures, but allof will report the original exception because it is the only exception It also has another positive effect: it is much cheaper to use null value to complete than to construct cancelationexception (for each future), so it runs much faster to force completion through complete (null), thus preventing more future execution

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>