Java – a thread pool that handles “duplicate” tasks
I want to perform some different tasks in parallel, but there is a concept that if a task has been queued or is being processed, it will not be re queued I've read some Java APIs and proposed the following code, which seems to work
import java.util.HashMap; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestExecution implements Runnable { String key1; String key2; static HashMap<TestExecution,Future<?>> executions = new HashMap<TestExecution,Future<?>>(); static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2,5,1,TimeUnit.MINUTES,q); public static void main(String[] args) { try { execute(new TestExecution("A","A")); execute(new TestExecution("A","A")); execute(new TestExecution("B","B")); Thread.sleep(8000); execute(new TestExecution("B","B")); } catch (InterruptedException e) { e.printStackTrace(); } } static boolean execute(TestExecution e) { System.out.println("Handling "+e.key1+":"+e.key2); if (executions.containsKey(e)) { Future<?> f = (Future<?>) executions.get(e); if (f.isDone()) { System.out.println("PrevIoUs execution has completed"); executions.remove(e); } else { System.out.println("PrevIoUs execution still running"); return false; } } else { System.out.println("No prevIoUs execution"); } Future<?> f = tpe.submit(e); executions.put(e,f); return true; } public TestExecution(String key1,String key2) { this.key1 = key1; this.key2 = key2; } public boolean equals(Object obj) { if (obj instanceof TestExecution) { TestExecution t = (TestExecution) obj; return (key1.equals(t.key1) && key2.equals(t.key2)); } return false; } public int hashCode () { return key1.hashCode()+key2.hashCode(); } public void run() { try { System.out.println("Start processing "+key1+":"+key2); Thread.sleep(4000); System.out.println("Finish processing "+key1+":"+key2); } catch (InterruptedException e) { e.printStackTrace(); } } }
Follow up the following comments: the plan is to trigger the execution task, which will be handled by cron calling restful web services For example, the following is the setting of a task triggered at 9:30 every day and the task scheduled every two minutes
0/2 * * * * restclient.pl key11 key12 30 09 * * * restclient.pl key21 key22
In this case, if the task key11: Key12 is running or has been queued for running, I do not want to queue another instance I know we have other scheduling options, but we tend to use cron for other tasks, so I want to try to keep it
Second update As for replying to comments, I have rewritten the code so far. Can you comment on any problems with the updated solution below?
import java.util.concurrent.LinkedBlockingQueue; public class TestExecution implements Runnable { String key1; String key2; static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>()); public static void main(String[] args) { try { tpe.execute(new TestExecution("A","A")); tpe.execute(new TestExecution("A","A")); tpe.execute(new TestExecution("B","B")); Thread.sleep(8000); tpe.execute(new TestExecution("B","B")); } catch (InterruptedException e) { e.printStackTrace(); } } public TestExecution(String key1,String key2) { this.key1 = key1; this.key2 = key2; } public boolean equals(Object obj) { if (obj instanceof TestExecution) { TestExecution t = (TestExecution) obj; return (key1.equals(t.key1) && key2.equals(t.key2)); } return false; } public int hashCode () { return key1.hashCode()+key2.hashCode(); } public void run() { try { System.out.println("Start processing "+key1+":"+key2); Thread.sleep(4000); System.out.println("Finish processing "+key1+":"+key2); } catch (InterruptedException e) { e.printStackTrace(); } } } import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestThreadPoolExecutor extends ThreadPoolExecutor { Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>()); public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) { super(2,q); } public void execute(Runnable command) { if (executions.contains(command)) { System.out.println("PrevIoUs execution still running"); return; } else { System.out.println("No prevIoUs execution"); } super.execute(command); executions.add(command); } protected void afterExecute(Runnable r,Throwable t) { super.afterExecute(r,t); executions.remove(r); } }
Solution
Several comments:
>In the execute method, if multiple threads call this method at the same time, a contention condition will occur between reading "executions" (containskey) and writing (remove or put) You need to wrap all calls in "synchronized executions", which should be atoms in a synchronized block (in your case, synchronizing methods will work) http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html >You should use singletons instead of static (that is, global) variables to handle state
But I really want to know your design and what you want to achieve Why are tasks queued for execution multiple times?