Java – a thread pool that handles “duplicate” tasks

I want to perform some different tasks in parallel, but there is a concept that if a task has been queued or is being processed, it will not be re queued I've read some Java APIs and proposed the following code, which seems to work

import java.util.HashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestExecution implements Runnable {
   String key1;
   String key2;   
   static HashMap<TestExecution,Future<?>> executions = new HashMap<TestExecution,Future<?>>();
   static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
   static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2,5,1,TimeUnit.MINUTES,q);

   public static void main(String[] args) {
      try {
         execute(new TestExecution("A","A"));
         execute(new TestExecution("A","A"));
         execute(new TestExecution("B","B"));
         Thread.sleep(8000);
         execute(new TestExecution("B","B"));
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   static boolean execute(TestExecution e) {
      System.out.println("Handling "+e.key1+":"+e.key2);
      if (executions.containsKey(e)) {
         Future<?> f = (Future<?>) executions.get(e);
         if (f.isDone()) {
            System.out.println("PrevIoUs execution has completed");
            executions.remove(e);
         } else {
            System.out.println("PrevIoUs execution still running");
            return false;
         }         
      }
      else {
         System.out.println("No prevIoUs execution");
      }
      Future<?> f = tpe.submit(e);
      executions.put(e,f);            
      return true;
   }

   public TestExecution(String key1,String key2) {
      this.key1 = key1;
      this.key2 = key2;      
   }

   public boolean equals(Object obj)
   {
       if (obj instanceof TestExecution)
       {
          TestExecution t = (TestExecution) obj;
           return (key1.equals(t.key1) && key2.equals(t.key2));           
       }       
       return false;
   }

   public int hashCode ()
   {
      return key1.hashCode()+key2.hashCode();
   }

   public void run() {      
      try {
         System.out.println("Start processing "+key1+":"+key2);
         Thread.sleep(4000);
         System.out.println("Finish processing "+key1+":"+key2);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }      
   }              
}

Follow up the following comments: the plan is to trigger the execution task, which will be handled by cron calling restful web services For example, the following is the setting of a task triggered at 9:30 every day and the task scheduled every two minutes

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22

In this case, if the task key11: Key12 is running or has been queued for running, I do not want to queue another instance I know we have other scheduling options, but we tend to use cron for other tasks, so I want to try to keep it

Second update As for replying to comments, I have rewritten the code so far. Can you comment on any problems with the updated solution below?

import java.util.concurrent.LinkedBlockingQueue;

public class TestExecution implements Runnable {
   String key1;
   String key2;      
   static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>());

   public static void main(String[] args) {
      try {
         tpe.execute(new TestExecution("A","A"));
         tpe.execute(new TestExecution("A","A"));
         tpe.execute(new TestExecution("B","B"));
         Thread.sleep(8000);
         tpe.execute(new TestExecution("B","B"));
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   public TestExecution(String key1,String key2) {
      this.key1 = key1;
      this.key2 = key2;      
   }

   public boolean equals(Object obj)
   {
       if (obj instanceof TestExecution)
       {
          TestExecution t = (TestExecution) obj;
           return (key1.equals(t.key1) && key2.equals(t.key2));           
       }       
       return false;
   }

   public int hashCode ()
   {
      return key1.hashCode()+key2.hashCode();
   }

   public void run() {      
      try {
         System.out.println("Start processing "+key1+":"+key2);
         Thread.sleep(4000);
         System.out.println("Finish processing "+key1+":"+key2);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }      
   }
}


import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class TestThreadPoolExecutor extends ThreadPoolExecutor {
   Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>());

   public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {      
      super(2,q);      
   }

   public void execute(Runnable command) {
      if (executions.contains(command)) {
         System.out.println("PrevIoUs execution still running");
         return;
      }
      else {
         System.out.println("No prevIoUs execution");
      }
      super.execute(command);      
      executions.add(command);      
   }

   protected void afterExecute(Runnable r,Throwable t) {
      super.afterExecute(r,t);        
      executions.remove(r);
   }      
}

Solution

Several comments:

>In the execute method, if multiple threads call this method at the same time, a contention condition will occur between reading "executions" (containskey) and writing (remove or put) You need to wrap all calls in "synchronized executions", which should be atoms in a synchronized block (in your case, synchronizing methods will work) http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html >You should use singletons instead of static (that is, global) variables to handle state

But I really want to know your design and what you want to achieve Why are tasks queued for execution multiple times?

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>