Java thread pool explanation and common methods
preface
Recently, I was asked about thread pool. So I'm going to start writing some articles related to multithreading. This article will introduce the basic use of a thread pool.
Executors
Executors is a class under the concurrent package, which provides us with a simple method to create a thread pool. Executors can create four commonly used thread pools: (1) newcachedthreadpool creates a cacheable thread pool. If the length of the thread pool exceeds the processing needs, idle threads can be recycled flexibly. If there is no recyclable thread, new threads can be created. If there is no upper limit, the submitted tasks will be executed immediately. (2) Newfixedthreadpool creates a fixed length thread pool to control the maximum concurrent number of threads, and the exceeded threads will wait in the queue. (3) newscheduledthreadpool creates a fixed length thread pool to support scheduled and periodic task execution. (4) newsinglethreadexecutor creates a single thread pool to execute tasks.
Disadvantages of executors
Normally, we should not create a thread pool in this way. We should use ThreadPoolExecutor to create a thread pool. The thread pool created by executors is also the constructor of the called threadpoolexcutor. It can be seen from the original.
ThreadPoolExecutor
From the source code, we can see that the ThreadPoolExecutor has a complete constructor as follows:
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Explain the meaning of the parameter corepoolsize: the number of threads maintained by the thread pool for a long time. Even if the thread is in idle state, it will not be recycled. Maximumpoolsize: the maximum number of threads keepalivetime: the idle time. If the idle time is exceeded, the thread will be recycled. Unit: the time unit of idle time workqueue: the queue of tasks. When all threads are running, the free threads will be taken from the queue summary. Threadfactory: when the core thread is less than the full thread, additional threads are required, You need to get the thread handler from the factory: the reject policy, and the policy when there are too many threads
Thread pool refers to the execution order of tasks
First, after the task comes, check whether the corepoolsize is free. If so, execute it. If not, put it in the task queue. Then the task queue will notify the thread factory and quickly create several threads to execute. When the task exceeds the maximum number of threads, the reject policy is executed to reject execution.
Submit method
After the thread pool is established, we need to submit tasks to the thread pool. You can use the submit method of the thread pool. The submit method receives two types of runable and callable. The differences are as follows: runable is the run method that implements the interface, and callable is the call method that implements the interface. Callable allows the use of return values. Callable allows exceptions to be thrown.
How to submit tasks
Future submit (callable task): the returned results can be obtained in this way. Void execute (runnable command): you can't get it this way. Future Submit (runnable task): this method can get, but it is always null.
Restrictions on blockqueue
When we create a thread pool, if we use executors. An unbounded queue is created, which is easy to cause oom. So we have to execute the size of the queue ourselves.
BlockingQueue queue = new ArrayBlockingQueue<>(512)
Reject policy
When the queue of the task queue is full, the reject policy will be triggered when the task is submitted. The default rejection policy in the queue is abortpolicy. Is a strategy of throwing exceptions directly. If you want to implement a custom policy, you can implement the rejectedexecutionhandler interface. Thread pool provides the following strategies to choose from. Abortpolicy: the default policy. Rejectedexecutionexception is thrown. Discardpolicy: ignore the currently submitted task. Discardoldestpolicy: discard the oldest task in the task queue and make room for the new task. Callerrunspolicy: the task is executed by the task submitter
ExecutorService executorService = new ThreadPoolExecutor(2,2,TimeUnit.SECONDS,new ArrayBlockingQueue<>(512),new ThreadPoolExecutor.DiscardPolicy());
Catch exception
As mentioned earlier, the implementation of the callable interface can get the results and exceptions. You can get it through the get method of the returned future.
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            throw new RuntimeException("exception");// 该异常会在调用Future.get()时传递给调用者
        }
    });
	
try {
  Object result = future.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
  e.printStackTrace();
}
How to construct thread pool correctly
int poolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
executorService = new ThreadPoolExecutor(poolSize,poolSize,queue,policy);
Get a single result
After submitting a task through submit, you can get a future. Calling the get method will block and wait for the execution result. Get (long timeout, timeunit unit) can specify the timeout for waiting.
Get multiple results
You can call it sequentially using a loop, or you can use executorcompletionservice. The take method of this class will block waiting for a task to complete. After submitting tasks to completionservice in batches, you only need to call completionservice.com for the same number of times The take () method can obtain the execution results of all tasks in any order, depending on the completion order of the tasks.
void solve(Executor executor,Collection<Callable<Result>> solvers)
   throws InterruptedException,ExecutionException {
   
   CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 构造器
   
   for (Callable<Result> s : solvers)// 提交所有任务
       ecs.submit(s);
	   
   int n = solvers.size();
   for (int i = 0; i < n; ++i) {// 获取每一个完成的任务
       Result r = ecs.take().get();
       if (r != null)
           use(r);
   }
}
This class is a wrapper for the thread pool. After wrapping, I've heard him submit and take.
Single task timeout
Future. get(long timeout,TimeUnit unit)。 Method can specify the timeout time to wait. If the timeout is not completed, a timeoutexception will be thrown.
Multiple tasks timed out
Wait for multiple tasks to complete, and set the maximum waiting time, which can be completed through countdownlatch:
public void testLatch(ExecutorService executorService,List<Runnable> tasks) 
	throws InterruptedException{
      
    CountDownLatch latch = new CountDownLatch(tasks.size());
      for(Runnable r : tasks){
          executorService.submit(new Runnable() {
              @Override
              public void run() {
                  try{
                      r.run();
                  }finally {
                      latch.countDown();// countDown
                  }
              }
          });
      }
	  latch.await(10,TimeUnit.SECONDS); // 指定超时时间
  }
Await is the total time, even if 100 tasks need to run for 20 minutes. I timed out for 10s and stopped.
