Java – call executorservice. Java from completable future shutdownNow
When one of the already running tasks throws an exception, I need to cancel all the scheduled but not yet running completable future tasks
Try the following example, but in most cases the main method will not exit (possibly due to some type of deadlock)
public static void main(String[] args) { ExecutorService executionService = Executors.newFixedThreadPool(5); Set< CompletableFuture<?> > tasks = new HashSet<>(); for (int i = 0; i < 1000; i++) { final int id = i; CompletableFuture<?> c = CompletableFuture .runAsync( () -> { System.out.println("Running: " + id); if ( id == 400 ) throw new RuntimeException("Exception from: " + id); },executionService ) .whenComplete( (v,ex) -> { if ( ex != null ) { System.out.println("Shutting down."); executionService.shutdownNow(); System.out.println("shutdown."); } } ); tasks.add(c); } try{ CompletableFuture.allOf( tasks.stream().toArray(CompletableFuture[]::new) ).join(); }catch(Exception e) { System.out.println("Got async exception: " + e); }finally { System.out.println("DONE"); } }
The last printout was like this:
Running: 402 Running: 400 Running: 408 Running: 407 Running: 406 Running: 405 Running: 411 Shutting down. Running: 410 Running: 409 Running: 413 Running: 412 shutdown.
Try to run the shutdown now method on a separate thread, but it still gives the same deadlock in most cases
Do you know what could lead to this impasse?
What do you think is the best way to cancel all scheduled but not yet running completable futures when an exception is thrown?
Considering iterative tasks and calling cancel. On each completablefuture But I don't like this. Cancelationexception is thrown from the connection
Solution
You should remember that
CompletableFuture<?> f = CompletableFuture.runAsync(runnable,executionService);
Basically equivalent to
CompletableFuture<?> f = new CompletableFuture<>(); executionService.execute(() -> { if(!f.isDone()) { try { runnable.run(); f.complete(null); } catch(Throwable t) { f.completeExceptionally(t); } } });
Therefore, executorservice knows nothing about completable future, so it usually cannot cancel it All it has is some work, expressed as an implementation of runnable
In other words, shutdown now() will prevent the execution of pending jobs, and the remaining futures will not complete normally, but they will not be cancelled Then, you call join () on the future returned by allof, which will never return due to unfinished futures
But please note that the scheduled work does check whether the future has been completed before doing anything expensive
So if you change the code to
ExecutorService executionService = Executors.newFixedThreadPool(5); Set<CompletableFuture<?>> tasks = ConcurrentHashMap.newKeySet(); AtomicBoolean canceled = new AtomicBoolean(); for(int i = 0; i < 1000; i++) { final int id = i; CompletableFuture<?> c = CompletableFuture .runAsync(() -> { System.out.println("Running: " + id); if(id == 400) throw new RuntimeException("Exception from: " + id); },executionService); c.whenComplete((v,ex) -> { if(ex != null && canceled.compareAndSet(false,true)) { System.out.println("Canceling."); for(CompletableFuture<?> f: tasks) f.cancel(false); System.out.println("Canceled."); } }); tasks.add(c); if(canceled.get()) { c.cancel(false); break; } } try { CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join(); } catch(Exception e) { System.out.println("Got async exception: " + e); } finally { System.out.println("DONE"); } executionService.shutdown();
Once the relevant future is cancelled, runnable will not be executed Since there is competition between cancellation and normal execution, it may be helpful to change the operation to
.runAsync(() -> { System.out.println("Running: " + id); if(id == 400) throw new RuntimeException("Exception from: " + id); LockSupport.parkNanos(1000); },executionService);
Simulate some actual workload You will then see that fewer operations are performed after an exception is encountered
Because asynchronous exceptions can occur even when the commit loop is still running, it uses atomicboolean to detect this situation and stop the loop in this case
Note that for completable future, there is no difference between cancellation and any other special completion Calling F. cancel (...) is equivalent to f. completeexceptional (New cancelationexception()) Therefore, due to completable future Allof reports any exception in an exception situation, so it is likely to be a cancelationexception rather than a trigger exception
If you replace two cancel (false) calls with complete (null), you will get a similar effect. Runnables will not execute on the completed futures, but allof will report the original exception because it is the only exception It also has another positive effect: it is much cheaper to use null value to complete than to construct cancelationexception (for each future), so it runs much faster to force completion through complete (null), thus preventing more future execution